flink与kafka的并行度问题

flink与kafka的并行度问题

kafka partitions == flink parallelism

这种情况很理想,因为每个消费者负责一个分区。如果您的消息在分区之间平衡,则工作将均匀分布在flink运算符上

kafka partitions < flink parallelism

您需要在任何操作之前调用输入流上的重新平衡,这会导致数据被重新分区,例如如下操作

1
2
3
4
5
6
7
8
9
10
11
12
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
DataStreamSource<String> dsSrc = env.addSource(
new FlinkKafkaConsumer<>("kafkatest", new SimpleStringSchema(), properties)
);

DataStream<BinLogEntity> dsLog = dsSrc.rebalance().flatMap(new FlatMapFunction<String, BinLogEntity>() {
@Override
public void flatMap(String value, Collector<BinLogEntity> out) throws Exception {
out.collect(mapper.readValue(value, BinLogEntity.class));
}
});

kafka partitions > flink parallelism

在这种情况下,某些实例将处理多个分区。再一次,您可以使用rebalance来均匀地传播消息。

原创技术分享,您的支持将鼓励我继续创作