-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtrino-queries.sql
More file actions
99 lines (94 loc) · 2.74 KB
/
trino-queries.sql
File metadata and controls
99 lines (94 loc) · 2.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
CREATE SCHEMA IF NOT EXISTS iceberg.market_data;
CREATE SCHEMA iceberg.market_data;
drop table iceberg.market_data.stock_ticks;
CREATE TABLE iceberg.market_data.stock_ticks (
source VARCHAR,
symbol VARCHAR,
ask DOUBLE,
bid DOUBLE,
mid DOUBLE,
askMarkup DOUBLE,
bidMarkup DOUBLE,
isTradable BOOLEAN,
number BIGINT,
dateTime TIMESTAMP,
receiveDateTime TIMESTAMP,
_kafka_timestamp TIMESTAMP
)
WITH (
format = 'PARQUET',
partitioning = ARRAY['day(dateTime)', 'source', 'symbol']
);
drop table iceberg.market_data.candle_m1;
CREATE TABLE iceberg.market_data.candle_m1 (
source VARCHAR,
symbol VARCHAR,
dateTime TIMESTAMP(3),
open DOUBLE,
high DOUBLE,
low DOUBLE,
close DOUBLE,
volume BIGINT,
_kafka_timestamp TIMESTAMP
)
WITH (
format = 'PARQUET',
partitioning = ARRAY['day(dateTime)', 'source', 'symbol']
);
-- every 1 minute
insert into iceberg.market_data.stock_ticks
select
source,
symbol,
ask,
bid,
mid,
askmarkup,
bidmarkup,
istradable,
number,
datetime,
receivedatetime,
"_timestamp"
from kafka.market_data.stock_ticks
where
_timestamp > (select coalesce(max("_kafka_timestamp"), TIMESTAMP '1970-01-01 00:00:00') from iceberg.market_data.stock_ticks)
;
-- m1
insert into iceberg.market_data.candle_m1
SELECT
source,
symbol,
-- Truncate timestamp to minute
date_trunc('minute', dateTime) AS dateTime,
-- Open: First price in the interval
first_value(mid) OVER (
PARTITION BY source, symbol, date_trunc('minute', dateTime)
ORDER BY dateTime
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) AS open,
-- High: Maximum price in the interval
max(mid) OVER (
PARTITION BY source, symbol, date_trunc('minute', dateTime)
) AS high,
-- Low: Minimum price in the interval
min(mid) OVER (
PARTITION BY source, symbol, date_trunc('minute', dateTime)
) AS low,
-- Close: Last price in the interval
last_value(mid) OVER (
PARTITION BY source, symbol, date_trunc('minute', dateTime)
ORDER BY dateTime
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) AS close,
-- Count ticks as volume
count(*) OVER (
PARTITION BY source, symbol, date_trunc('minute', dateTime)
) AS volume,
max(_timestamp) OVER (
PARTITION BY source, symbol, date_trunc('minute', dateTime)
) as _kafka_timestamp
from kafka.market_data.stock_ticks
where
_timestamp > (select coalesce(max("_kafka_timestamp"), TIMESTAMP '1970-01-01 00:00:00') from iceberg.market_data.candle_m1)
;