Skip to content

Commit 2da3d0f

Browse files
authored
fix aggregation_job
1. change the pattern to `"yyyy-MM-dd''T''HH:mm:ss.SSS''Z''"` 2. change event_timestamp col name to event_time to match schema The above changes make the aggregation job work.
1 parent a3a98a7 commit 2da3d0f

File tree

1 file changed

+3
-3
lines changed

1 file changed

+3
-3
lines changed

bootcamp/materials/4-apache-flink-training/src/job/aggregation_job.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,16 +49,16 @@ def create_processed_events_source_kafka(t_env):
4949
kafka_key = os.environ.get("KAFKA_WEB_TRAFFIC_KEY", "")
5050
kafka_secret = os.environ.get("KAFKA_WEB_TRAFFIC_SECRET", "")
5151
table_name = "process_events_kafka"
52-
pattern = "yyyy-MM-dd HH:mm:ss"
52+
pattern = "yyyy-MM-dd''T''HH:mm:ss.SSS''Z''"
5353
sink_ddl = f"""
5454
CREATE TABLE {table_name} (
5555
ip VARCHAR,
56-
event_timestamp VARCHAR,
56+
event_time VARCHAR,
5757
referrer VARCHAR,
5858
host VARCHAR,
5959
url VARCHAR,
6060
geodata VARCHAR,
61-
window_timestamp AS TO_TIMESTAMP(event_timestamp, '{pattern}'),
61+
window_timestamp AS TO_TIMESTAMP(event_time, '{pattern}'),
6262
WATERMARK FOR window_timestamp AS window_timestamp - INTERVAL '15' SECOND
6363
) WITH (
6464
'connector' = 'kafka',

0 commit comments

Comments
 (0)