flink table api 跟 sql 使用

flink table api 跟 sql 使用

框架模式

DataStream/DataSet与Table转换关系

DataStream转Table

  1. DataStream注册成Table

    1
    2
    3
    4
    5
    6
    7
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
    DataStreamSource<Tuple2<String, Integer>> source = env.fromElements(
    Tuple2.of("a", 1),
    Tuple2.of("b", 2)
    );
    tEnv.registerDataStream("t1", source, "name, age");
  2. DataStream转换成Table,然后注册到tEnv

    1
    2
    3
    4
    5
    6
    7
    8
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
    DataStreamSource<Tuple2<String, Integer>> source = env.fromElements(
    Tuple2.of("a", 1),
    Tuple2.of("b", 2)
    );
    Table table = tEnv.fromDataStream(source, "name, age");
    tEnv.registerTable("t1", table);

Table转DataStream

支持将Table转换成Row,Pojo,Tuple等数据类型

  • 如果是Row类型,获取数据字段只能采用getField或者setField,通过字段索引(从0开始)

    1
    2
    3
    4
    tEnv.toAppendStream(tEnv.sqlQuery(sql), Row.class).map(value->{
    System.out.println(value.getField(0).getClass().getName());
    return value;
    });
  • 如果是pojo类型,就需要自定义的属性字段名称/字段类型/Getter/Settter要跟table的column的名称一样,例如假设table列名是name,那么pojo就需要有name的字段,getName函数跟setName函数,同时要求同一类型,否则会如下错误

    does not match the number[1] of requested type
    下面是测试代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.java.StreamTableEnvironment;
    import org.apache.flink.types.Row;

    public class TestTable {
    public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
    DataStreamSource<Tuple2<String, Integer>> source = env.fromElements(
    Tuple2.of("a", 1),
    Tuple2.of("b", 2)
    );
    Table table = tEnv.fromDataStream(source, "name, age");
    tEnv.registerTable("t1", table);

    String sql = "select name, age from t1";

    tEnv.toAppendStream(tEnv.sqlQuery(sql), Person.class).map(value->{
    System.out.println(value.getName());
    return value;
    });

    env.execute("testTableApi");
    }

    public static class Person {
    private String name;
    private Integer age;

    public Person() {
    }

    public String getName() {
    return name;
    }

    public void setName(String name) {
    this.name = name;
    }

    public Integer getAge1() {
    return age;
    }

    public void setAge(Integer age) {
    this.age = age;
    }
    }
    }
  • 输出Tuple类型

    1
    2
    3
    4
    tEnv.toAppendStream(tEnv.sqlQuery(sql), TypeInformation.of(new TypeHint<Tuple2<String, Integer>>(){})).map(value->{
    System.out.println(value.f0);
    return value;
    });

由于java的类型擦除问题,会报错

The return type of function ‘main(TestTable.java:26)’ could not be determined automatically, due to type erasure. You can give type information hints by using the returns(…) method on the result of the transformation call, or by letting your function implement the ‘ResultTypeQueryable’ interface.
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of ‘Tuple2’ are missing. In many cases lambda methods don’t provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the ‘org.apache.flink.api.common.functions.MapFunction’ interface. Otherwise the type has to be specified explicitly using type information.
通过原因也可以看到一个很坑的地方就是lambda缺少类型推断,所以只能修改成不用lambda的处理

1
2
3
4
5
6
7
DataStream<Tuple2<String, Integer>> ds = tEnv.toAppendStream(tEnv.sqlQuery(sql), TypeInformation.of(new TypeHint<Tuple2<String, Integer>>(){})).map(new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(Tuple2<String, Integer> value) throws Exception {
System.out.println(value.f0);
return value;
}
});

输出

使用JDBCAppendTableSink

刚才的操作有一个多的操作就是我们将输出从table转换成datastream,然后使用RichSinkFunction的派生类来实现,其实table api提供了JDBCAppendTableSink来实现,具体细节参考官网描述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
String driver = "com.mysql.jdbc.Driver";
String url = "jdbc:mysql://127.0.0.1:3306/test1?useSSL=false&serverTimezone=UTC";
String username = "root";
String password = "123456";

JDBCAppendTableSink tableSinker = JDBCAppendTableSink.builder()
.setDrivername(driver)
.setDBUrl(url)
.setUsername(username)
.setPassword(password)
.setBatchSize(1000)
.setQuery("replace into stock_result (id, pub, modify_time)values(?,?,?)")
.setParameterTypes(Types.STRING(), Types.SQL_DATE(), Types.SQL_DATE())
.build();

tEnv.registerTableSink("stock_result",
new String[]{"id", "pub", "modify_time"},
new TypeInformation[]{Types.STRING(), Types.SQL_DATE(), Types.SQL_DATE()},
tableSinker);

String sql = "select id, pub, CURRENT_DATE from mytable1";

Table table2 = tEnv.sqlQuery(sql);

table2.insertInto("stock_result");

这样就不需要转成datastream然后再sink到数据库上。

例子

例子1

从MySQL输入到输出到MySQL,中间加上逻辑处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// create a TableEnvironment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

// 添加数据源
DataStreamSource<StockInfo> source = env.addSource(new MySQLReader());

tEnv.registerDataStream("mytable1", source);

String sql = "select id, pub from mytable1";

// 通过sql
Table table2 = tEnv.sqlQuery(sql);

DataStream<Row> resultSet = tEnv.toAppendStream(table2, Row.class);

DataStream<StockResult> resultStream = resultSet.map(in->new StockResult((String)in.getField(0), (Date)in.getField(1),
new Date(new java.util.Date().getTime())));

resultStream.addSink(new MySQLWriter());

env.execute("testTableApi");

错误

SQL validation failed. Type is not supported: Date

检查下数据类型是否支持,参考官网

发现了使用了java.util.Date而不是java.sql.Date导致

缺少jar文件,添加到maven

1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.compat.version}</artifactId>
<version>${flink.version}</version>
</dependency>

Table is not an append-only table. Use the toRetractStream() in order to handle add and retract messages.

原创技术分享,您的支持将鼓励我继续创作