Dataflow

Apache Flink’s dataflow execution model is a sophisticated distributed computing framework that enables high-throughput, low-latency stream processing through parallel execution and optimized data exchange mechanisms.

Dataflow 图

数据流执行架构 (Dataflow Execution Architecture)

Flink的数据流执行是一个多层次的抽象系统,将用户定义的转换编译并优化为可执行的任务图。

执行层次结构

用户程序 (Program)
    ↓
作业图 (JobGraph) - 用户逻辑编译结果
    ↓
执行图 (ExecutionGraph) - 优化后的任务调度图
    ↓
物理执行 (Task Execution) - 实际的任务执行

核心组件

1. JobGraph (作业图)

  • 用户DataStream API程序编译后的逻辑视图
  • 包含算子(Operator)和中间结果(DataSet)
  • 连接点(Chaining)和分区策略(Partitioning)

2. ExecutionGraph (执行图)

  • JobGraph优化后的执行规划
  • 将JobGraph中的顶点(Vertices)转换为可执行的任务(Tasks)
  • 包含调度信息和依赖关系
  • 每个ExecutionVertex对应一个并行子任务(Subtask)

3. Task Execution (任务执行)

  • 实际执行单元,运行在TaskManager上
  • 每个Task处理一个分区(Partition)的数据
  • 支持Pipeline执行模式
  • 具备容错和状态管理能力

数据流执行模式

Pipeline执行模式

Source → Map → KeyBy → Window → Sum → Sink
   ↑     ↑     ↑      ↑     ↑     ↑
  Subtask → Subtask → Subtask → Subtask

批处理模式

Source → Map → Shuffle → Sum → Sink
    ↑     ↑     ↑      ↑    ↑
  Task  → Task → Task  → Task → Task

数据并行和任务并行

Data Parallelism vs Task Parallelism

数据并行性 (Data Parallelism)

核心概念

  • 将输入数据分割成多个分区,每个并行实例处理一个数据分区
  • 核心思想:数据分片,代码复制,结果合并
  • 每个并行实例执行相同的计算逻辑,但处理不同的数据子集

并行度配置

// 全局并行度设置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4); // 设置全局并行度为4
 
// 算子级并行度
DataStream<String> text = env.readTextFile("file.txt");
text.map(new MapFunction<String, String>() {
    // 并行度为2
}).setParallelism(2);
 
// KeyBy后的并行度
text.keyBy(value -> value.length()); // 基于键的并行分片

并行策略

策略描述适用场景API
Round Robin轮询分配数据数据均匀分布rebalance()
Key-based相同键分配到同一分区状态计算keyBy()
Custom自定义分区器特定业务需求customPartitioner()
Broadcast广播到所有分区配置数据分发broadcast()

任务并行性 (Task Parallelism)

核心概念

  • 将一个逻辑任务拆分为多个并行执行的子任务
  • 支持任务链化(Task Chaining)优化
  • 允许不同算子并行执行

任务链优化

// 任务链配置
env.getConfig().setExecutionMode(ExecutionMode.PIPELINED);
 
// 强制创建新链
source.forward().name("Source")
    .map(new MyMapper()).setParallelism(2).name("Mapper")
    .keyBy(0).name("KeyBy")
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .process(new MyProcessFunction()).name("Process")
    .forward().setParallelism(1).name("Sink");

并行度策略选择

策略优点缺点适用场景
高并行度高吞吐量,负载均衡网络开销大,状态管理复杂大规模数据,高吞吐
低并行度网络开销小,状态管理简单资源利用率低,吞吐量受限小数据量,状态密集
混合并行度灵活平衡配置复杂不同算子需求差异大

2024年并行性优化趋势

  1. 增量处理优化

    • 阿里云实时计算引入增量处理能力
    • 在处理效率和资源利用间取得更好平衡
    • 支持连续流处理的状态优化
  2. 细粒度并行控制

    • 管道级和任务级并行度的精细控制
    • 更灵活的并行执行模型
    • 针对不同业务场景的优化策略
  3. 内存管理改进

    • 优化任务并行度的内存配置
    • 提高计算并行性的执行内存空间效率
    • 细粒度内存分配策略

数据交换策略

Data Exchange Strategies

网络缓冲区管理 (Network Buffer Management)

缓冲区架构

NetworkBufferPool (全局缓冲池)
    ↓
LocalBufferPool (本地缓冲池) × N
    ↓
Exclusive Buffers (独占缓冲区) - 主要数据传输
Floating Buffers (流动缓冲区) - 灵活分配

缓冲区配置

// 网络缓冲区配置
env.getConfig().setNetworkBufferSize(64 * 1024); // 64KB
env.getConfig().setNetworkBuffersPerGate(2); // 每个网关2个缓冲区
env.getConfig().setFloatingBuffersPerGate(1); // 每个网关1个流动缓冲区

数据交换策略类型

1. 一对一交换 (Forward Exchange)

  • 相同并行度下的直接数据传递
  • 最低开销,最高性能
  • 适用于连续任务链

2. 重分区交换 (Repartition Exchange)

  • 不同并行度间的数据重分配
  • 使用Round Robin或Key-based分区
  • 需要网络传输和序列化

3. 广播交换 (Broadcast Exchange)

  • 数据发送到所有下游并行实例
  • 适用于配置数据和需要全量数据的场景
  • 内存和网络开销较大

4. 自定义交换 (Custom Exchange)

  • 基于业务需求的自定义分区策略
  • 支持复杂的路由逻辑
  • 灵活性高,实现复杂

反压机制 (Backpressure Handling)

信用流控 (Credit-based Flow Control)

上游Task → Credit Request → 下游Task → Credit Grant → 数据传输

反压处理策略

  1. 动态缓冲区调整 - 根据下游处理能力调整发送速度
  2. 背压传播 - 反压信号向上游传播,控制数据产生速度
  3. 批处理优化 - 批量处理数据减少网络开销

2024年数据交换优化

Flink 2.0+ 改进

  • 更高效的网络缓冲区管理
  • 改进与外部系统的数据交换机制
  • 优化的协议支持(Protocol Buffers v27+)
  • 增强的流控和反压处理

生产级优化方案

StreamShield解决方案

  • 在字节跳动等大型企业得到验证
  • 弹性的数据交换解决方案
  • 针对大规模生产集群的性能优化

调优建议

// 网络缓冲区调优
// 1. 监控缓冲区使用情况
env.getConfig().setNetworkBufferSize(128 * 1024); // 128KB缓冲区
env.getConfig().setNetworkBuffersPerGate(4); // 每个网关4个缓冲区
env.getConfig().setFloatingBuffersPerGate(2); // 每个网关2个流动缓冲区
 
// 2. 反压监控
MonitoringUtils.monitorBackpressure();
 
// 3. 资源分配优化
ResourceProfile profile = ResourceProfile.newBuilder()
    .setNetworkMemory(64 * 1024 * 1024) // 64MB网络内存
    .build();

性能监控指标

指标描述优化目标
Network Buffers网络缓冲区使用率< 80%
Backpressure反压发生频率Minimize
Exchange Time数据交换耗时Minimize
Throughput吞吐量Maximize
Latency端到端延迟< 100ms

实践建议

并行度配置原则

  • 根据数据量和业务需求选择合适的并行度
  • 关键状态密集型算子保持较低并行度
  • 数据源和Sink算子可适当提高并行度

数据交换优化

  • 合理配置网络缓冲区大小
  • 使用适当的分区策略减少网络开销
  • 监控反压情况及时调整配置

性能调优要点

  • 优先使用任务链化减少数据交换开销
  • 合理使用Batch处理提高网络效率
  • 监控网络”怪兽”问题及时解决

Sources: