[fix#827][jdbc] jdbc PreparedStmtProxy can't resolve DELETE statement.this is make flink retraction failed. by chaozwn · Pull Request #824 · DTStack/chunjun

mysql create ddl:
CREATE DATABASE flinkx /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci */;

create table kuaidi
(
kuaidi_name varchar(20),
product_id varchar(100)
);

create table result
(
kuaidi_name varchar(20),
count int,
primary key (kuaidi_name)
);

INSERT INTO flinkx.kuaidi (kuaidi_name, product_id) VALUES ('申通', '1');
INSERT INTO flinkx.kuaidi (kuaidi_name, product_id) VALUES ('顺丰', '2');
INSERT INTO flinkx.kuaidi (kuaidi_name, product_id) VALUES ('中通', '3');
INSERT INTO flinkx.kuaidi (kuaidi_name, product_id) VALUES ('中通', '1');

flinkx.sql:
CREATE TABLE kuaidi
(
product_id string,
kuaidi_name string
) WITH (
'connector' = 'mysql-x',
'url' = 'jdbc:mysql://localhost:3306/flinkx?useCursorFetch=true&fetchSize=1000&useUnicode=true&characterEncoding=UTF-8',
'table-name' = 'kuaidi',
'username' = 'root',
'password' = '123456',
'scan.parallelism' = '1'
);

CREATE TABLE product (
kuaidi_name string,
count bigint,
primary key (kuaidi_name) NOT ENFORCED
) WITH (
'connector' = 'mysql-x',
'url' = 'jdbc:mysql://localhost:3306/flinkx?useUnicode=true&characterEncoding=UTF-8',
'table-name' = 'result',
'username' = 'root',
'password' = '123456',
'sink.buffer-flush.max-rows' = '1', -- 批量写数据条数,默认:1024
'sink.buffer-flush.interval' = '1000000', -- 批量写时间间隔,默认:10000毫秒
'sink.all-replace' = 'true', -- 解释如下(其他rdb数据库类似):默认:false。定义了PRIMARY KEY才有效,否则是追加语句
-- sink.all-replace = 'true' 生成如:INSERT INTO result3(mid, mbb, sid, sbb) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE mid=VALUES(mid), mbb=VALUES(mbb), sid=VALUES(sid), sbb=VALUES(sbb) 。会将所有的数据都替换。
-- sink.all-replace = 'false' 生成如:INSERT INTO result3(mid, mbb, sid, sbb) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE mid=IFNULL(VALUES(mid),mid), mbb=IFNULL(VALUES(mbb),mbb), sid=IFNULL(VALUES(sid),sid), sbb=IFNULL(VALUES(sbb),sbb) 。如果新值为null,数据库中的旧值不为null,则不会覆盖。
'sink.parallelism' = '1' -- 写入结果的并行度,默认:null
);

INSERT into product
select
kuaidi_name,
count(1) count
from
(select
product_id,
last_value(kuaidi_name) kuaidi_name
from kuaidi
group by product_id)
group by kuaidi_name;

error picture:
image

correct picture:
image