flink api 转换算子操作讲解

flink 操作算子是数据处理流程中重要的一环,负责将数据进行多次转换,本文章主要通过代码实例来快速引导入门,同时引入多个典型的操作实例

转换算子类型

根据输入的数据类型以及输出数据类型,可以分为3类

单数据流操作

one to one, one to more

reduce操作

聚合操作,多对一

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<Tuple2<String, Integer>> source = env.fromElements(
Tuple2.of("a", 1),
Tuple2.of("b", 2),
Tuple2.of("c", 3),
Tuple2.of("a", 1)
);

source.keyBy(0).reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
return Tuple2.of(value1.f0, value1.f1 + value2.f1);
}
}).print();

env.execute("hello");

flink 内置了很多常用的聚合操作,例如max, sum等,上述累计器的代码可以等效如下

1
source.keyBy(0).sum(1).print();

union 操作

合并数据流,要求数据类型是一致的,可以多余两个以上的流。

1
2
3
4
5
6
7
8
9
10
11
12
13
DataStreamSource<Tuple2<String, Integer>> source1 = env.fromElements(
Tuple2.of("a", 1),
Tuple2.of("b", 2),
Tuple2.of("c", 3),
Tuple2.of("a", 1)
);

DataStreamSource<Tuple2<String,Integer>> source2 = env.fromElements(
Tuple2.of("e", 1),
Tuple2.of("a", 1)
);

source1.union(source2).print();

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
1> (b,2)
3> (e,1)
2> (c,3)
4> (a,1)
8> (a,1)
3> (a,1)
```
## connect 操作
可以合并两个不同数据类型的流(而且只限两个),输出`ConnectedStreams<In1, In2>`,这个合并流保留了输入的两个数据类型,需要通过转换变成`DAtaStreawm`类型才可以进行后续操作,通过map或者flatmap方法来处理数据
```java
ConnectedStreams<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>> connectedStreams =
source1.connect(source2);

DataStream<Object> connectedWithMapStream = connectedStreams.map(new CoMapFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, Object>() {
@Override
public Object map1(Tuple2<String, Integer> value) throws Exception {
return value;
}

@Override
public Object map2(Tuple3<String, Integer, Integer> value) throws Exception {
return Tuple2.of(value.f0, value.f1);
}
});

上述代码是将tuple3的元素的前两个元素看做是跟source1一样的数据类型tuple2。

其实上面经过CoMapFunction之后的类型可以写具体的Tuple2<String, Integer>,类型还是可以保留的,通过如下代码验证

1
2
3
4
5
6
7
connectedWithMapStream.map(new MapFunction<Object, Object>() {
@Override
public Object map(Object value) throws Exception {
System.out.println(value.getClass().getName());
return value;
}
}).print();

todo: connect支持共享状态的一说,这里后面补充具体例子。

注意:如果设置并发度是1,那么数据流的顺序是先输出source2的数据,接着输出source1的数据,如果并发度>1,则是乱序输出。

其实从上面例子看不出来connect的更加实际的例子,官方的说明是

An example for the use of connected streams would be to apply rules that change over time onto another stream. One of the connected streams has the rules, the other stream the elements to apply the rules to. The operation on the connected stream maintains the current set of rules in the state. It may receive either a rule update and update the state or a data element and apply the rules in the state to the element.

join 函数

Flink流计算编程–在双流中体会joinedStream与coGroupedStream

总结

FlatMap算子会比Map的算子多了参数Collector,因为输出多于1个输出元素

Process会比FlatMap多定义了Context和OnTimeContext

技巧

使用lambda来取代匿名函数

有同学可能遇到ide友善提示使用lambda来取代匿名类,例如下面代码

1
2
3
4
5
6
7
connectedWithMapStream.map(new MapFunction<Object, Object>() {
@Override
public Object map(Object value) throws Exception {
System.out.println(value.getClass().getName());
return value;
}
}).print();

可以取代为如下

1
connectedWithMapStream.map(in->{System.out.println(in.getClass().getName()); return in;}).print();

如果不需要打印语句,可以更加简单

1
connectedWithMapStream.map(in->in).print();

其实lambda的实现机制是通过FunctionalInterface注解来实现的,具体可以参考lambda的语法。

原创技术分享,您的支持将鼓励我继续创作