Managed Apache Flink
Kinesis Data Stream
CREATE TABLE - Source
%flink.ssql
CREATE TABLE input_table (
id INTEGER,
level VARCHAR(5),
path VARCHAR(13),
status INTEGER,
event_time TIMESTAMP(0),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
PARTITIONED BY (id)
WITH (
'connector' = 'kinesis',
'stream' = 'input-stream',
'aws.region' = 'ap-northeast-2',
'scan.stream.initpos' = 'LATEST',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
);
CREATE TABLE - Sink
%flink.ssql
CREATE TABLE output_table (
level VARCHAR(5),
window_start TIMESTAMP,
window_end TIMESTAMP,
counts BIGINT
)
WITH (
'connector' = 'kinesis',
'stream' = 'output-stream',
'aws.region' = 'ap-northeast-2',
'sink.partitioner' = 'random',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
);
TUMBLE
%flink.ssql
INSERT INTO output_table
SELECT level, window_start, window_end, count(*) AS counts
FROM TABLE(TUMBLE(TABLE input_table, DESCRIPTOR(event_time), INTERVAL '20' SECONDS))
GROUP BY level, window_start, window_end;
HOP
%flink.ssql
INSERT INTO output_table
SELECT level, window_start, window_end, count(*) AS counts
FROM TABLE(HOP(TABLE input_table, DESCRIPTOR(event_time), INTERVAL '10' SECONDS, INTERVAL '30' SECONDS))
GROUP BY level, window_start, window_end;
CUMULATE
%flink.ssql
INSERT INTO output_table
SELECT level, window_start, window_end, count(*) AS counts
FROM TABLE(CUMULATE(TABLE input_table, DESCRIPTOR(event_time), INTERVAL '20' SECONDS, INTERVAL '1' MINUTE))
GROUP BY level, window_start, window_end;
SESSION
INSERT INTO output_table
SELECT level,
SESSION_START(event_time, INTERVAL '30' SECONDS) AS window_start,
SESSION_ROWTIME(event_time, INTERVAL '30' SECONDS) AS window_end,
COUNT(*) AS counts
FROM input_table
GROUP BY level, SESSION(event_time, INTERVAL '30' SECONDS);