Overview
更新时间:2023-12-07
支持的 Connectors
服务类型 | SPARK | FLINK | ||
---|---|---|---|---|
Source | Sink | Source | Sink | |
KAFKA | Y | Y | Y | Y |
BOS | Y | Y | Y | Y |
MQTT | Y | Y | Y | |
RDS | Y | Y | Y | Y |
ES | Y | Y | ||
PALO | Y | Y | Y | |
TSDB | Y | Y |
如何使用 Connector
SQL
1CREATE TABLE source_kafka_table (
2 `field01` STRING,
3 `field02` BIGINT,
4 `field03` FLOAT,
5 `field04` BINARY,
6 `field05` INT,
7 `field06` TINYINT,
8 `field07` BOOLEAN,
9 `field08` DATA,
10 `field09` DOUBLE,
11 `field10` SMALLINT
12) WITH (
13 'connector.type' = 'KAFKA',
14 'connector.topic' = 'xxxxxxxxxxxx__bsc-test-source',
15 'format.encode' = 'JSON',
16 'connector.properties.bootstrap.servers' = 'kafka.gz.baidubce.com:9092',
17 'connector.properties.ssl.filename' = 'kafka-key_gz.zip',
18 'connector.properties.group.id' = 'bsc123',
19 'connector.read.startup.mode' = 'earliest'
20);
字段类型
TYPE | SPARK | FLINK | MAPPING |
---|---|---|---|
TINYINT | Y | Y | BTYE / TINYINT |
SMALLINT | Y | Y | SHORT / SMALLINT |
INT | Y | Y | INT / INTEGER |
BIGINT | Y | Y | LONG / BIGINT |
FLOAT | Y | Y | FLOAT |
DOUBLE | Y | Y | DOUBLE |
STRING | Y | Y | STRING / CHAR / VARCHAR |
BINARY | Y | Y | BINARY / VARBINARY / BYTES |
BOOLEAN | Y | Y | BOOLEAN / BOOL |
TIMESTAMP | Y | Y | TIMESTAMP / SQL_TIMESTAMP |
DECIMAL | Y | Y | DECIMAL |
DATE | Y | Y | DATE / LOCAL_DATE |
TIME | Y | TIME / LOCAL_TIME | |
ARRAY | Y | Y | ARRAY |
MAP | Y | Y | MAP |
ROW | Y | ROW | |
STRUCT | Y | STRUCT |
时间属性
注意:SPARK 仅仅支持指定 source table 中某一时间类型的列作为 watermark 处理窗口的时间
属性如何支持 EVENTTIME 和 PROCTIME
属性名称 | 说明 | EVENTTIME 设置举例 | PROCTIME 设置举例 |
---|---|---|---|
watermark.field | 使用事件时间的字段作为时间提取 | 'field' | '' |
watermark.threshold | 时间窗口的允许最大延迟设置 | '2 seconds',支持单位有:milliseconds ,seconds ,minutes ,hours |
'' |
watermark.field.alias | SQL正文中使用的时间别名 | 'alias' | 'proctime' |
watermark.field.pattern | 设置日期模式进行转换时间戳 | 'yyyy-MM-dd HH:mm:ss' | '' |
watermark.field.timezone | 设置日期模式进行转换时区 | 'Asia/Shanghai' | '' |
EVENTTIME支持的时间字段类型及其对应参数设置
字段类型 | watermark.field.pattern | watermark.field.timezone | 说明 |
---|---|---|---|
BIGINT | 's'、'ms'、'second'、'millisecond' | '' | 使用的字段为LONG,转化为毫秒 |
STRING | 'yyyy-MM-dd HH:mm:ss' | 'Asia/Shanghai' | 使用的字段能够通过指定的模式转换为日期 |
TIMESTAMP | '' | '' | 使用的字段,必须符合TZ格式:2018-05-20T00:08:00Z |
日期格式对照标配表
pattern | timezone |
---|---|
yyyy-MM-dd'T'HH:mm:ss.SSS | Asia/Shanghai |
yyyy-MM-dd'T'HH:mm:ss.SSS'Z' | UTC |
yyyy-MM-dd'T'HH:mm:ss | Asia/Shanghai |
yyyy-MM-dd'T'HH:mm:ss'Z' | UTC |
yyyy-MM-dd HH:mm:ss.SSS | Asia/Shanghai |
yyyy-MM-dd HH:mm:ss.SSS'Z' | UTC |
yyyy-MM-dd HH:mm:ss | Asia/Shanghai |
yyyy-MM-dd HH:mm:ss'Z' | UTC |
带有EVENTTIME的 Connector
使用 EVENTTIME 时,需要指定 source 表中某一列作为时间戳,并配置其 watermark 等时间属性参数。
SPARK
1CREATE TABLE source_kafka_table (
2 `field01` STRING,
3 `field02` TIMESTAMP, -- SPARK 支持窗口数据类型为 TIMESTAMP
4 `field03` FLOAT,
5 `field04` BINARY,
6 `field05` INT,
7 `field06` TINYINT,
8 `field07` BOOLEAN,
9 `field08` DATA,
10 `field09` DOUBLE,
11 `field10` SMALLINT
12) WITH (
13 'connector.type' = 'KAFKA',
14 'format.encode' = 'JSON',
15 'connector.topic' = 'xxxxxxxxxxxx__bsc-test-source',
16 'connector.properties.bootstrap.servers' = 'kafka.bj.baidubce.com:9091',
17 'connector.properties.ssl.filename' = 'kafka-key-bj.zip',
18 'watermark.field' = 'field02',
19 'watermark.threshold' = '10 seconds'
20);
21CREATE TABLE sink_kafka_table (
22 `timestamp` TIMESTAMP,
23 `field01` STRING,
24 `count` BIGINT
25) WITH (
26 'connector.type' = 'KAFKA',
27 'format.encode' = 'JSON',
28 'connector.topic' = 'xxxxxxxxxxxx__bsc-test-sink',
29 'connector.properties.bootstrap.servers' = 'kafka.bj.baidubce.com:9091',
30 'connector.properties.ssl.filename' = 'kafka-key-bj.zip'
31);
32INSERT INTO
33 sink_kafka_table
34SELECT
35 window.start AS `timestamp`,
36 `field01`,
37 COUNT(`field05`) AS `count`
38FROM
39 source_kafka_table
40GROUP BY
41 window(`field02`, "1 MINUTE"),
42 `field01`
带有 PROCTIME 的 Connector
使用进程的处理时间作为时间戳,FLINK不需要指定 source 表中某一列,只需要加入 SET job.streamTimeType = 'PROCESSTIME'
语句即可。
FLINK
1SET job.stream.timeType = 'PROCESSTIME'; -- 通过 SET 语句指定 Flink 使用 PROCTIME
2CREATE TABLE source_mqtt_table (
3 `field01` STRING,
4 `field02` BIGINT,
5 `field03` FLOAT,
6 `field04` BINARY,
7 `field05` INT,
8 `field06` TINYINT,
9 `field07` BOOLEAN,
10 `field08` DATA,
11 `field09` DOUBLE,
12 `field10` SMALLINT
13) WITH (
14 'connector.type' = 'MQTT',
15 'format.encode' = 'JSON',
16 'connector.url' = 'tcp://xxxxxx.mqtt.iot.gz.baidubce.com:1883',
17 'connector.topic' = 'xxxx',
18 'connector.username' = 'xxxxxxxxx/bsc_test',
19 'connector.password' = 'xxxxxxxx',
20 'connector.semantic' = 'at_least_once'
21);