Skip to content

Managed Apache Flink

Kinesis Data Stream

CREATE TABLE - Source

%flink.ssql
CREATE TABLE demo_table (
    id INTEGER,
    level VARCHAR(5),
    path VARCHAR(13),
    status INTEGER,
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
PARTITIONED BY (id)
WITH (
    'connector' = 'kinesis',
    'stream' = 'demo-stream',
    'aws.region' = 'ap-northeast-2',
    'scan.stream.initpos' = 'LATEST',
    'format' = 'json',
    'json.timestamp-format.standard' = 'ISO-8601'
);

CREATE TABLE - Sink

%flink.ssql(type=update)
CREATE TABLE output_table (
    level VARCHAR(5),
    counts INTEGER,
    hop_time TIMESTAMP(3)
)
WITH (
    'connector' = 'kinesis',
    'stream' = 'output-stream',
    'aws.region' = 'ap-northeast-2',
    'sink.partitioner' = 'random',
    'format' = 'json',
    'json.timestamp-format.standard' = 'ISO-8601'
);

INSERT & SELECT

%flink.ssql(type=update)
INSERT INTO output_table
SELECT level,
       COUNT(*) AS counts,
       HOP_ROWTIME(event_time, INTERVAL '10' second, INTERVAL '1' minute) AS hop_time
FROM demo_table
GROUP BY HOP(event_time, INTERVAL '10' second, INTERVAL '1' minute), level;