-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhelper.txt
More file actions
92 lines (72 loc) · 1.93 KB
/
helper.txt
File metadata and controls
92 lines (72 loc) · 1.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
-------------------- KAFKA ENV -------------------------
# Kafka topic name
twitch-streams
# Kafka broker URL
kafka:29092
# Pinot transform function
fromDateTime(event_time, 'yyyy-MM-dd''T''HH:mm:ss')
# Kafka auto-create
export KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
------------------- FLINK SQL --------------------------
# Read from Kafka
CREATE TABLE twitch_streams (
id STRING,
user_id STRING,
user_name STRING,
game_id STRING,
game_name STRING,
started_at STRING,
viewer_count INT,
event_time STRING,
tag_id STRING
)
WITH (
'connector' = 'kafka',
'topic' = 'twitch-streams',
'properties.bootstrap.servers' = 'kafka:29092',
'properties.group.id' = 'flink-consumer-group',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset'
);
# Read from S3 (tags)
CREATE TABLE twitch_stream_tags (
tag_id STRING,
description STRING
)
WITH (
'connector' = 'filesystem',
'path' = 's3://data/',
'format' = 'json'
);
# Create table (join streams with tags)
CREATE TABLE twitch_streams_with_tags (
id STRING,
user_id STRING,
user_name STRING,
game_id STRING,
game_name STRING,
started_at STRING,
viewer_count INT,
event_time STRING,
tag_id STRING,
description STRING
)
WITH (
'connector' = 'kafka',
'topic' = 'twitch_streams_with_tags',
'properties.bootstrap.servers' = 'kafka:29092',
'key.format' = 'raw',
'key.fields' = 'id',
'value.format' = 'json'
);
# Join and insert
INSERT INTO twitch_streams_with_tags
select t.id, t.user_id, t.user_name, t.game_id, t.game_name, t.started_at,
t.viewer_count, t.event_time, g.tag_id, g.description from twitch_streams AS t INNER
JOIN twitch_stream_tags AS g ON t.tag_id = g.tag_id;
------------------------------ PINOT QUERY --------------------------
# Count, group by id
select count(*) as cnt_streams, id from twitch_streams group by id order by cnt_streams desc limit 1000000
---------------------------- SUPERSET ------------------------------
# SQL Alchemy URI
pinot://pinot:8000/query/sql?controller=http://pinot:9000/