CREATE TABLE kafka_source (
id bigint,
data ARRAY<row<c1 bigint,c2 string>>
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = 'hadoop101:9092',
'topic' = 't1',
'properties.group.id' = 'g1',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
CREATE TABLE sink_print (
id bigint,
c1 bigint,
c2 string
) WITH (
'connector' = 'print'
);
insert into sink_print
SELECT id, t.c1, t.c2
FROM kafka_source
CROSS JOIN UNNEST(data) AS t(c1,c2)
;
