flink中asyncio使用

flink支持使用异步io来提供吞吐。

为什么需要异步

场景:在数据处理或者流计算中会根据数据源数据来跟第三方数据来处理,这里数据源信息跟第三方数据有一定的关联性,例如看房预约的场景,我们从kafka拿到用户电话信息(电话信息),然后我们需要对里面是属于我们客户的用户提高优质服务,那么我们需要获取第三方数据源(Mysql),通过里面的用户信息表电话字段来查找到对应用户的信息,如果对于不存在的用户我们安排客户去电话处理。

上面这个场景处理过程,我们使用flink实现的话,就会遇到官网说到的吞吐高并发的问题。

如下是伪代码

1
2
3
4
5
6
7
8
DataStream<User> dsSrc = ... // from kafka
dsSrc.flatMap(new RichFlatMap<User, User>{
@Override
public void flatMap(User value, Collector<User> out) throws Exception {
// 访问数据库/缓存获取数据
// balabala
}
})

以下是官方的介绍

当与外部系统进行交互(例如使用存储在数据库中的数据丰富流事件)时, 需要注意的是, 与外部系统的通信延迟并不决定流应用程序的总体工作。

原始的访问外部系统中的数据,例如通过一个MapFunction来访问,通常意味着同步交互:将一个请求发送到数据库,MapFunction等待直到接收到响应为止。很多情况下,这种等待会占用很大一部分函数的时间。

与外部数据库系统进行异步交互意味着一个并行函数实例可以并发地处理多个请求和并发地接收多个响应。那样的话,等待时间就可以被其他的请求或者响应所覆盖。至少,等待时间可以被多个请求摊销,这在很多情况下会导致更高的流吞吐量。

img

注意:通过扩展MapFunction到一个很高的并发度来提高吞吐量在一定程度上是可行的,但是常常会导致很高的资源消耗:有很多的并行MapFunction实例意味着更多的任务、线程、Flink内部网络连接、与数据库之间的网络连接、缓存以及通常的内部开销。

Async I/O API

Flink的Async I/O允许用户在数据流中使用异步的请求客户端,这个API会处理与数据流的交互,同时还处理顺序、事件时间、容错等。

要实现一个通过异步I/O来操作数据库还需要三个步骤:

  1. 实现用来分发请求的AsyncFunction(例如使用线程池来实现)
  2. 获取操作结果的callback,并将它提交到AsyncCollector中(通过Future的complete方法)
  3. 将异步I/O操作作为转换操作应用于DataStream中(AsyncDataStream.unorderedWait或orderedWait方法来加工)

上面的模版的实际代码可以参考下文的实战例子。

实战: 从异步访问Mysql

继续上面的例子,我们定义下用户User类型

1
2
3
4
5
public class User {
private String name;
private Date regDate;
private Integer age;
}

然后我们定义通过mysql来获取用户的具体信息,假设数据库表的表结构如下:

1
2
3
4
5
CREATE TABLE `userinfo` (
`name` varchar(20) DEFAULT NULL,
`reg_date` datetime DEFAULT NULL,
UNIQUE KEY `userinfo_pk` (`name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

接着我们模拟简单的数据源

1
2
3
4
5
6
7
8
9
10
11
12
13
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

List<User> userList = new ArrayList<>();
User user = new User();
user.setName("roger");
user.setAge(18);
userList.add(user);
user = new User();
user.setName("peter");
user.setAge(28);
userList.add(user);

DataStreamSource<User> dsSrc = env.fromCollection(userList);

数据源提供了用户名跟用户年龄,我们需要从mysql补全用户的注册时间。

主角来了,就是通过继承RichAsyncFunction来实现访问数据库,这里我们需要定义线程池来处理每个数据来之后分配线程来处理,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@Slf4j
public class AsyncMysqlExecutor extends RichAsyncFunction<User, User> {
// 链接
private static String jdbcUrl = "jdbc:mysql://127.0.0.1:3306?useSSL=false";
private static String username = "root";
private static String password = "tencent";
private static String driverName = "com.mysql.jdbc.Driver";

java.sql.Connection conn;
private transient ExecutorService executorService;

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);

Class.forName(driverName);
conn = DriverManager.getConnection(jdbcUrl, username, password);

executorService = Executors.newFixedThreadPool(30);
}

@Override
public void asyncInvoke(User input, ResultFuture<User> resultFuture) throws Exception {
executorService.submit(()-> {
try {
PreparedStatement ps;
ps = conn.prepareStatement("select name, reg_date from test.userinfo where name = ?");

ps.setString(1, input.getName());
ResultSet rs = ps.executeQuery();

// 模拟异步问题
if (input.getName().equals("roger")) {
// wait for doing something
Thread.sleep(1000);
}
rs.isClosed());
if (rs.next()) {
input.setRegDate(rs.getTimestamp(2));
}
List<User> list = new ArrayList();
list.add(input);
// 放回 result 队列
resultFuture.complete(list);

log.info("record:" + input.toString() + ",conn:" + System.identityHashCode(conn));
} catch (Exception exp) {
resultFuture.completeExceptionally(exp);
}
});
}


@Override
public void timeout(User input, ResultFuture<User> resultFuture) throws Exception {
List<User> list = new ArrayList<>();
list.add(input);
resultFuture.complete(list);
}

@Override
public void close() throws Exception {
super.close();
this.conn.close();

log.info("i close myself");
ExecutorUtils.gracefulShutdown(1000, TimeUnit.MILLISECONDS, executorService); // 1秒关闭
}
}

从代码可以看出来异步本质就是通过MapFunction算子来管理线程池,然后将请求多并发的访问第三方源然后获取结果之后就往下处理,这里我们业务代码中加入了当用户名字为“roger”来模拟网络io的延迟问题。

主函数调用如下:

1
2
3
DataStream<User> dsUsers = AsyncDataStream.unorderedWait(dsSrc, new AsyncMysqlExecutor(), 5, TimeUnit.SECONDS).setParallelism(1);

dsUsers.print().setParallelism(1); // 目的是查看数据输出到sink的顺序

执行代码看到打印内容如下:

1
2
User(name=peter, regDate=Wed Jul 10 13:35:58 CST 2019, age=28)
User(name=roger, regDate=Wed Jul 10 13:35:46 CST 2019, age=18)

这里可以看到数据源本来是roger先,后面输出到sink时,是在后面的,不会影响peter的优先处理。

这里就会说到数据顺序问题

结果顺序

由AsyncFunction发出的并发请求经常是以无序的形式完成,取决于哪个请求先完成。为了控制发出请求结果的顺序,

Flink提供了两种模式:

  • Unordered:结果记录在异步请求完成后就发出,流中的记录的顺序通过异步I/O操作后会与先前的不一致。当使用处理时间作为时间特性时这种模式具有低延迟、低消耗特点。通过AsyncDataStream.unorderedWait(...)来使用这种模式。

  • Ordered:在这种情况下,流的顺序是保留的,结果记录发出的顺利与异步请求触发的顺序(算子输入记录的顺序)一致。为了实现这一点,算子会将结果记录缓存起来直到所有的处理记录都被发出(或者超时)为止。这常常会导致一定程度的延迟和checkpoint消耗,因为跟非排序模式相比,记录或者结果会被长时间保存在checkpoint State中。通过AsyncDataStream.orderedWait(...)来使用这种模式。

容错性保证

将异步的状态存储在checkpoint中,跟处理普通的状态一样,提供了精确一次的保证

原创技术分享,您的支持将鼓励我继续创作