Skip to content

Commit fc137c7

Browse files
authored
Merge pull request #207 from divakaivan/main
[week 4] fix aggregation_job
2 parents a3a98a7 + 2da3d0f commit fc137c7

File tree

1 file changed

+3
-3
lines changed

1 file changed

+3
-3
lines changed

bootcamp/materials/4-apache-flink-training/src/job/aggregation_job.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,16 +49,16 @@ def create_processed_events_source_kafka(t_env):
4949
kafka_key = os.environ.get("KAFKA_WEB_TRAFFIC_KEY", "")
5050
kafka_secret = os.environ.get("KAFKA_WEB_TRAFFIC_SECRET", "")
5151
table_name = "process_events_kafka"
52-
pattern = "yyyy-MM-dd HH:mm:ss"
52+
pattern = "yyyy-MM-dd''T''HH:mm:ss.SSS''Z''"
5353
sink_ddl = f"""
5454
CREATE TABLE {table_name} (
5555
ip VARCHAR,
56-
event_timestamp VARCHAR,
56+
event_time VARCHAR,
5757
referrer VARCHAR,
5858
host VARCHAR,
5959
url VARCHAR,
6060
geodata VARCHAR,
61-
window_timestamp AS TO_TIMESTAMP(event_timestamp, '{pattern}'),
61+
window_timestamp AS TO_TIMESTAMP(event_time, '{pattern}'),
6262
WATERMARK FOR window_timestamp AS window_timestamp - INTERVAL '15' SECOND
6363
) WITH (
6464
'connector' = 'kafka',

0 commit comments

Comments
 (0)