-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathtables-types-exploration.sql
More file actions
193 lines (169 loc) · 5.97 KB
/
tables-types-exploration.sql
File metadata and controls
193 lines (169 loc) · 5.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
-- Clean up previous objects for idempotency
DROP VIEW IF EXISTS all_vendors_hourly_qualification_rollup;
DROP VIEW IF EXISTS all_vendor_data;
DROP MATERIALIZED VIEW IF EXISTS vendor_a_hourly_qualification_rollup;
DROP MATERIALIZED VIEW IF EXISTS vendor_b_hourly_qualification_rollup;
DROP MATERIALIZED VIEW IF EXISTS vendor_a_hourly_ok;
DROP MATERIALIZED VIEW IF EXISTS vendor_b_hourly_ok;
DROP TABLE IF EXISTS vendor_a_data;
DROP TABLE IF EXISTS vendor_b_data;
DROP FUNCTION IF EXISTS qualify(vendor_a_data);
DROP FUNCTION IF EXISTS qualify(vendor_b_data);
DROP FUNCTION IF EXISTS set_qualification_vendor_a();
DROP FUNCTION IF EXISTS set_qualification_vendor_b();
DROP TYPE IF EXISTS qualified;
-- Custom enum type for qualification
CREATE TYPE qualified AS ENUM ('Discard', 'OK');
-- Vendor A table with qualification column
CREATE TABLE vendor_a_data (
time timestamptz NOT NULL,
id serial NOT NULL,
value numeric,
status text,
qualification qualified,
PRIMARY KEY (time, id, qualification)
);
-- Vendor B table with qualification column
CREATE TABLE vendor_b_data (
time timestamptz NOT NULL,
id serial NOT NULL,
reading numeric,
flag boolean,
qualification qualified,
PRIMARY KEY (time, id, qualification)
);
-- Per-table qualify() function for vendor_a_data
CREATE OR REPLACE FUNCTION qualify(vendor_a_data)
RETURNS qualified AS $$
DECLARE
row ALIAS FOR $1;
BEGIN
IF row.status = 'bad' OR row.value < 0 THEN
RETURN 'Discard';
ELSE
RETURN 'OK';
END IF;
END;
$$ LANGUAGE plpgsql IMMUTABLE;
-- Per-table qualify() function for vendor_b_data
CREATE OR REPLACE FUNCTION qualify(vendor_b_data)
RETURNS qualified AS $$
DECLARE
row ALIAS FOR $1;
BEGIN
IF NOT row.flag OR row.reading IS NULL THEN
RETURN 'Discard';
ELSE
RETURN 'OK';
END IF;
END;
$$ LANGUAGE plpgsql IMMUTABLE;
-- Trigger function for vendor_a_data
CREATE OR REPLACE FUNCTION set_qualification_vendor_a()
RETURNS TRIGGER AS $$
BEGIN
NEW.qualification := qualify(NEW.*);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
-- Trigger function for vendor_b_data
CREATE OR REPLACE FUNCTION set_qualification_vendor_b()
RETURNS TRIGGER AS $$
BEGIN
NEW.qualification := qualify(NEW.*);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
-- Create triggers
CREATE TRIGGER trigger_set_qualification_vendor_a
BEFORE INSERT OR UPDATE ON vendor_a_data
FOR EACH ROW
EXECUTE FUNCTION set_qualification_vendor_a();
CREATE TRIGGER trigger_set_qualification_vendor_b
BEFORE INSERT OR UPDATE ON vendor_b_data
FOR EACH ROW
EXECUTE FUNCTION set_qualification_vendor_b();
-- Insert sample data
INSERT INTO vendor_a_data (time, value, status) VALUES
('2024-06-01 10:00:00', 10, 'good'),
('2024-06-01 10:05:00', -5, 'good'),
('2024-06-01 10:10:00', 20, 'bad');
INSERT INTO vendor_b_data (time, reading, flag) VALUES
('2024-06-01 10:00:00', 100, true),
('2024-06-01 10:05:00', NULL, true),
('2024-06-01 10:10:00', 200, false);
-- Convert tables to hypertables (TimescaleDB) with space partitioning on qualification
SELECT create_hypertable('vendor_a_data', 'time', partitioning_column => 'qualification', number_partitions => 2, migrate_data => true, if_not_exists => TRUE);
SELECT create_hypertable('vendor_b_data', 'time', partitioning_column => 'qualification', number_partitions => 2, migrate_data => true, if_not_exists => TRUE);
-- Example: Continuous aggregate for vendor_a_data, counting only qualified rows
CREATE MATERIALIZED VIEW vendor_a_hourly_ok
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', time) AS bucket,
count(*) AS ok_count
FROM vendor_a_data
WHERE qualification = 'OK'
GROUP BY bucket
WITH NO DATA;
-- Example: Continuous aggregate for vendor_b_data, counting only qualified rows
CREATE MATERIALIZED VIEW vendor_b_hourly_ok
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', time) AS bucket,
count(*) AS ok_count
FROM vendor_b_data
WHERE qualification = 'OK'
GROUP BY bucket
WITH NO DATA;
-- Refresh continuous aggregates to populate them
CALL refresh_continuous_aggregate('vendor_a_hourly_ok', NULL, NULL);
CALL refresh_continuous_aggregate('vendor_b_hourly_ok', NULL, NULL);
-- Query the aggregates
SELECT * FROM vendor_a_hourly_ok;
SELECT * FROM vendor_b_hourly_ok;
-- Unified view for both vendors (optional)
CREATE OR REPLACE VIEW all_vendor_data AS
SELECT time, id, value AS reading, qualification, 'A' AS vendor
FROM vendor_a_data
UNION ALL
SELECT time, id, reading, qualification, 'B' AS vendor
FROM vendor_b_data;
-- Query unified view for OK rows
SELECT * FROM all_vendor_data WHERE qualification = 'OK';
-- Rollup continuous aggregate for vendor_a_data by qualification
CREATE MATERIALIZED VIEW vendor_a_hourly_qualification_rollup
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', time) AS bucket,
qualification,
count(*) AS count,
avg(value) AS avg_value
FROM vendor_a_data
GROUP BY bucket, qualification
WITH NO DATA;
-- Rollup continuous aggregate for vendor_b_data by qualification
CREATE MATERIALIZED VIEW vendor_b_hourly_qualification_rollup
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', time) AS bucket,
qualification,
count(*) AS count,
avg(reading) AS avg_reading
FROM vendor_b_data
GROUP BY bucket, qualification
WITH NO DATA;
-- Refresh the new rollup caggs
CALL refresh_continuous_aggregate('vendor_a_hourly_qualification_rollup', NULL, NULL);
CALL refresh_continuous_aggregate('vendor_b_hourly_qualification_rollup', NULL, NULL);
-- Query the new rollup caggs
SELECT * FROM vendor_a_hourly_qualification_rollup;
SELECT * FROM vendor_b_hourly_qualification_rollup;
-- Unified rollup view for both vendors
CREATE OR REPLACE VIEW all_vendors_hourly_qualification_rollup AS
SELECT bucket, qualification, count, avg_value AS avg, 'A' AS vendor
FROM vendor_a_hourly_qualification_rollup
UNION ALL
SELECT bucket, qualification, count, avg_reading AS avg, 'B' AS vendor
FROM vendor_b_hourly_qualification_rollup;
-- Query the unified rollup view
SELECT * FROM all_vendors_hourly_qualification_rollup;