上海网站设计公司电话,个人网站做项目,哪个网站做的win10系统好,网站制作结算确认函目录 1. 准备工作
生成数据
创建数据表
2. 创建数据表
创建数据源表
创建数据目标表
3. 计算
WITH子句 1. 准备工作
生成数据
source kafka json 数据格式 #xff1a;
topic case_kafka_mysql#xff1a;
{ts: 20201011,id…目录 1. 准备工作
生成数据
创建数据表
2. 创建数据表
创建数据源表
创建数据目标表
3. 计算
WITH子句 1. 准备工作
生成数据
source kafka json 数据格式
topic case_kafka_mysql
{ts: 20201011,id: 8,price_amt:211}
topic flink_test_2
{id: 8,coupon_price_amt:100} 注意针对双流中的每条记录都发触发
topic: case_kafka_mysql
docker exec -it 192d1369463a bashbash-5.1# cd /opt/kafka_2.12-2.5.0/binbash-5.1# ./kafka-console-producer.sh --broker-list localhost:9092 --topic case_kafka_mysql{ts: 20201011,id: 8,price_amt:211}
topic: flink_test_2
docker exec -it 192d1369463a bashbash-5.1# cd /opt/kafka_2.12-2.5.0/binbash-5.1# ./kafka-console-producer.sh --broker-list localhost:9092 --topic flink_test_2{id: 8,coupon_price_amt:100} 创建数据表
mysql 建表语句
CREATE TABLE sync_test_2 (id bigint(11) NOT NULL AUTO_INCREMENT,ts varchar(64) DEFAULT NULL,total_gmv bigint(11) DEFAULT NULL,PRIMARY KEY (id),UNIQUE KEY uidx (ts) USING BTREE) ENGINEInnoDB AUTO_INCREMENT5 DEFAULT CHARSETutf8mb4;CREATE TABLE sync_test_22 (id bigint(11) NOT NULL AUTO_INCREMENT,ts varchar(64) DEFAULT NULL,coupon_ratio double DEFAULT NULL,PRIMARY KEY (id),UNIQUE KEY uidx (ts) USING BTREE) ENGINEInnoDB AUTO_INCREMENT5 DEFAULT CHARSETutf8mb4; 2. 创建数据表
创建数据源表
create table flink_test_2_1 (id BIGINT,ts VARCHAR,price_amt BIGINT,proctime AS PROCTIME ()
)with (connector kafka,topic case_kafka_mysql,properties.bootstrap.servers 127.0.0.1:9092,properties.group.id flink_gp_test2-1,scan.startup.mode earliest-offset,format json,json.fail-on-missing-field false,json.ignore-parse-errors true,properties.zookeeper.connect 127.0.0.1:2181/kafka);create table flink_test_2_2 (id BIGINT,coupon_price_amt BIGINT,proctime AS PROCTIME ()
)with (connector kafka,topic flink_test_2,properties.bootstrap.servers 127.0.0.1:9092,properties.group.id flink_gp_test2-2,scan.startup.mode earliest-offset,format json,json.fail-on-missing-field false,json.ignore-parse-errors true,properties.zookeeper.connect 127.0.0.1:2181/kafka);
关键配置的说明
json.fail-on-missing-field在json缺失字段时是否报错
json.ignore-parse-errors在解析json失败时是否报错
一般无法保证json格式所以以上两个配置是比较重要的。
创建数据目标表
CREATE TABLE sync_test_2 (ts string,total_gmv bigint,PRIMARY KEY (ts) NOT ENFORCED) WITH (connector jdbc,url jdbc:mysql://127.0.0.1:3306/db01?characterEncodingUTF-8,table-name sync_test_2,username root,password Admin);CREATE TABLE sync_test_22 (ts string,coupon_ration bigint,PRIMARY KEY (ts) NOT ENFORCED) WITH (connector jdbc,url jdbc:mysql://127.0.0.1:3306/db01?characterEncodingUTF-8,table-name sync_test_2,username root,password Admin); 3. 计算
一个作业中写入一个Sink或多个Sink。
说明 写入多个Sink语句时需要以BEGIN STATEMENT SET;开头以END;结尾。
BEGIN STATEMENT SET; --写入多个Sink时必填。
INSERT INTO sync_test_2
SELECTts,SUM(price_amt - coupon_price_amt) AS total_gmv
FROM(SELECTa.ts as ts,a.price_amt as price_amt,b.coupon_price_amt as coupon_price_amtFROMflink_test_2_1 as aLEFT JOIN flink_test_2_2 b on b.id a.id)
GROUP BY ts;INSERT INTO sync_test_22
SELECTts,sum(coupon_price_amt)/sum(amount) AS coupon_ration
FROM(SELECTa.ts as ts,a.price_amt as price_amt,b.coupon_price_amt as coupon_price_amtFROMflink_test_2_1 as aLEFT JOIN flink_test_2_2 b on b.id a.id)
GROUP BY ts;;
END; --写入多个Sink时必填。 WITH子句
WITH提供了一种编写辅助语句以用于更大的查询的方法。这些语句通常被称为公共表表达式CTE可以被视为定义仅针对一个查询存在的临时视图。
改写上述查询
BEGIN STATEMENT SET; --写入多个Sink时必填。
with orders_with_coupon AS (SELECTa.ts as ts,a.price_amt as price_amt,b.coupon_price_amt as coupon_price_amtFROMflink_test_2_1 as aLEFT JOIN flink_test_2_2 b on b.id a.id
)INSERT INTO sync_test_2
SELECTts,SUM(price_amt - coupon_price_amt) AS total_gmv
FROM orders_with_coupon
GROUP BY ts;INSERT INTO sync_test_22
SELECTts,coupon_price_amt/price_amt AS coupon_ration
FROM orders_with_coupon
GROUP BY ts;;
END; --写入多个Sink时必填。