GitHub - lym0617/flinkStreamSQL: 基于开源的flink,对其实时sql进行扩展;主要实现了流与维表的join,支持原生flink SQL所有的语法
mvn clean package -Dmaven.test.skip
打包结束后,项目根目录下会产生plugins目录,plugins目录下存放编译好的数据同步插件包,在lib目下存放job提交的包
sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack/150_flinkplugin/sqlplugin -localSqlPluginPath D:\gitspace\flinkStreamSQL\plugins -addjar \["udf.jar\"\] -mode yarn -flinkconf D:\flink_home\kudu150etc -yarnconf D:\hadoop\etc\hadoopkudu -confProp \{\"time.characteristic\":\"EventTime\",\"sql.checkpoint.interval\":10000\} -yarnSessionConf \{\"yid\":\"application_1564971615273_38182\"}
CREATE (scala|table|aggregate) FUNCTION CHARACTER_LENGTH WITH com.dtstack.Kun;
CREATE TABLE MyTable(
name varchar,
channel varchar,
pv int,
xctime bigint,
CHARACTER_LENGTH(channel) AS timeLeng //自定义的函数
)WITH(
type ='kafka09',
bootstrapServers ='172.16.8.198:9092',
zookeeperQuorum ='172.16.8.198:2181/kafka',
offsetReset ='latest',
topic ='nbTest1',
parallelism ='1'
);
CREATE TABLE MyResult(
channel varchar,
pv varchar
)WITH(
type ='mysql',
url ='jdbc:mysql://172.16.8.104:3306/test?charset=utf8',
userName ='dtstack',
password ='abc123',
tableName ='pv2',
parallelism ='1'
);
CREATE TABLE workerinfo(
cast(logtime as TIMESTAMP) AS rtime,
cast(logtime) AS rtime
)WITH(
type ='hbase',
zookeeperQuorum ='rdos1:2181',
tableName ='workerinfo',
rowKey ='ce,de',
parallelism ='1',
zookeeperParent ='/hbase'
);
CREATE TABLE sideTable(
cf:name varchar as name,
cf:info varchar as info,
PRIMARY KEY(name),
PERIOD FOR SYSTEM_TIME //维表标识
)WITH(
type ='hbase',
zookeeperQuorum ='rdos1:2181',
zookeeperParent ='/hbase',
tableName ='workerinfo',
cache ='LRU',
cacheSize ='10000',
cacheTTLMs ='60000',
parallelism ='1'
);
insert
into
MyResult
select
d.channel,
d.info
from
( select
a.*,b.info
from
MyTable a
join
sideTable b
on a.channel=b.name
where
a.channel = 'xc2'
and a.pv=10 ) as d