kafka partitions == flink parallelism
这种情况很理想,因为每个消费者负责一个分区。如果您的消息在分区之间平衡,则工作将均匀分布在flink运算符上
kafka partitions < flink parallelism
您需要在任何操作之前调用输入流上的重新平衡,这会导致数据被重新分区,例如如下操作1
2
3
4
5
6
7
8
9
10
11
12StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
DataStreamSource<String> dsSrc = env.addSource(
new FlinkKafkaConsumer<>("kafkatest", new SimpleStringSchema(), properties)
);
DataStream<BinLogEntity> dsLog = dsSrc.rebalance().flatMap(new FlatMapFunction<String, BinLogEntity>() {
public void flatMap(String value, Collector<BinLogEntity> out) throws Exception {
out.collect(mapper.readValue(value, BinLogEntity.class));
}
});
kafka partitions > flink parallelism
在这种情况下,某些实例将处理多个分区。再一次,您可以使用rebalance来均匀地传播消息。