-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathevents_merging_poc.sql
More file actions
311 lines (284 loc) · 11 KB
/
events_merging_poc.sql
File metadata and controls
311 lines (284 loc) · 11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
-- TimescaleDB Events Merging POC
-- Production-Ready Events Processing with Manual Overrides and Complex Merging
\set AUTOCOMMIT on
\timing on
-- Clean slate for fresh installation
DROP VIEW IF EXISTS merged_events CASCADE;
DROP VIEW IF EXISTS events_summary CASCADE;
DROP TABLE IF EXISTS events_manual CASCADE;
DROP TABLE IF EXISTS events CASCADE;
-- Main events hypertable - optimized for high-volume time-series data
CREATE TABLE events (
time TIMESTAMPTZ NOT NULL,
foreign_id TEXT NOT NULL,
name TEXT NOT NULL,
start_time TIMESTAMPTZ NOT NULL,
end_time TIMESTAMPTZ NULL,
status TEXT NOT NULL DEFAULT 'working',
CONSTRAINT events_pkey PRIMARY KEY (time, foreign_id),
CONSTRAINT valid_status CHECK (status IN ('working', 'paused', 'offline', 'maintenance')),
CONSTRAINT valid_timespan CHECK (end_time IS NULL OR end_time >= start_time)
);
SELECT create_hypertable('events', by_range('time', INTERVAL '7 days'));
-- New hypertable for sample data around foreign_id
CREATE TABLE sample_data (
time TIMESTAMPTZ NOT NULL,
foreign_id TEXT NOT NULL,
value DOUBLE PRECISION NOT NULL,
CONSTRAINT sample_data_pkey PRIMARY KEY (time, foreign_id)
);
SELECT create_hypertable('sample_data', by_range('time', INTERVAL '7 days'));
-- Manual overrides and merging logic - separate hypertable for scalability
CREATE TABLE events_manual (
time TIMESTAMPTZ NOT NULL,
id SERIAL,
original_event_id TEXT NOT NULL,
merged_event_ids TEXT[],
notes TEXT,
override_name TEXT,
override_start_time TIMESTAMPTZ,
override_end_time TIMESTAMPTZ,
override_status TEXT,
created_at TIMESTAMPTZ DEFAULT NOW(),
CONSTRAINT events_manual_pkey PRIMARY KEY (time, id),
CONSTRAINT valid_override_status CHECK (override_status IS NULL OR override_status IN ('working', 'paused', 'offline', 'maintenance'))
);
SELECT create_hypertable('events_manual', by_range('time', INTERVAL '1 month'));
-- Essential indexes for performance
CREATE INDEX idx_events_foreign_id ON events (foreign_id, time DESC);
CREATE INDEX idx_events_status_time ON events (status, time DESC);
CREATE INDEX idx_events_manual_original ON events_manual (original_event_id, time DESC);
CREATE INDEX idx_events_manual_merged ON events_manual USING GIN (merged_event_ids);
-- Conditional indexes for high-performance selective queries (82% storage savings)
CREATE INDEX idx_events_active_only ON events (time DESC, foreign_id) WHERE status = 'active';
CREATE INDEX idx_events_high_priority ON events (time DESC, foreign_id) WHERE status IN ('critical', 'priority');
CREATE INDEX idx_events_manual_with_overrides ON events_manual (time DESC, original_event_id) WHERE override_status IS NOT NULL;
-- Realistic sample data generation
INSERT INTO events (time, foreign_id, name, start_time, end_time, status)
WITH event_data AS (
SELECT
i,
NOW() - (random() * INTERVAL '30 days') as event_time,
NOW() - (random() * INTERVAL '30 days') as random_start_time
FROM generate_series(1, 1000000) i
)
SELECT
event_time as time,
'event_' || i as foreign_id,
'Event ' || (random() * 100)::int as name,
random_start_time as start_time,
random_start_time + (random() * INTERVAL '29 days') as end_time,
CASE
WHEN random() < 0.7 THEN 'working'
WHEN random() < 0.85 THEN 'paused'
WHEN random() < 0.97 THEN 'offline'
ELSE 'maintenance'
END as status
FROM event_data;
-- Manual overrides with realistic merge scenarios
INSERT INTO events_manual (time, original_event_id, merged_event_ids, notes, override_name, override_status)
SELECT
e.time,
e.foreign_id,
CASE
WHEN random() < 0.3 THEN ARRAY['event_' || (event_num + 1)::text]
WHEN random() < 0.7 THEN
(SELECT array_agg('event_' || (event_num + i)::text)
FROM generate_series(1, 2 + (random() * 8)::int) i
WHERE event_num + i <= 999000)
ELSE NULL
END as merged_event_ids,
CASE
WHEN random() < 0.7 THEN 'Complex merge operation'
ELSE 'Manual override for business rules'
END as notes,
CASE WHEN random() < 0.5 THEN 'Merged Event ' || (random() * 100)::int ELSE NULL END,
CASE
WHEN random() < 0.1 THEN 'offline'
WHEN random() < 0.2 THEN 'maintenance'
WHEN random() < 0.5 THEN 'paused'
ELSE 'working'
END
FROM events e
CROSS JOIN LATERAL (SELECT replace(e.foreign_id, 'event_', '')::int as event_num) nums
WHERE random() < 0.095
AND event_num <= 950000
LIMIT 95000;
-- Large merge scenarios for stress testing
INSERT INTO events_manual (time, original_event_id, merged_event_ids, notes, override_name, override_status)
SELECT
e.time,
e.foreign_id,
(SELECT array_agg('event_' || (event_num + i)::text)
FROM generate_series(1, 30 + (random() * 3)::int) i
WHERE event_num + i <= 999000) as merged_event_ids,
'MEGA MERGE: Large-scale event consolidation' as notes,
'Ultra Event ' || (random() * 100)::int as override_name,
'maintenance' as override_status
FROM events e
CROSS JOIN LATERAL (SELECT replace(e.foreign_id, 'event_', '')::int as event_num) nums
WHERE event_num BETWEEN 1000 AND 3000
AND event_num % 30 = 0
LIMIT 100;
-- Optimized merged events view - production-ready logic
CREATE OR REPLACE VIEW merged_events AS
SELECT
e.time,
e.foreign_id,
COALESCE(em.override_name, e.name) as name,
COALESCE(em.override_start_time, e.start_time) as start_time,
COALESCE(em.override_end_time, e.end_time) as end_time,
COALESCE(em.override_status, e.status) as status,
em.merged_event_ids,
em.notes as manual_notes,
em.created_at as manual_override_time,
(em.id IS NOT NULL) as has_manual_override,
CASE
WHEN em.id IS NOT NULL AND em.merged_event_ids IS NOT NULL THEN 'merged'
WHEN em.id IS NOT NULL THEN 'manual'
ELSE 'original'
END as event_type
FROM events e
LEFT JOIN events_manual em ON e.foreign_id = em.original_event_id;
-- Performance summary view for monitoring
CREATE OR REPLACE VIEW events_summary AS
SELECT
time_bucket('1 hour', time) as hour,
status,
event_type,
COUNT(*) as event_count,
COUNT(CASE WHEN has_manual_override THEN 1 END) as manual_overrides,
AVG(array_length(merged_event_ids, 1)) FILTER (WHERE merged_event_ids IS NOT NULL) as avg_merge_size
FROM merged_events
GROUP BY hour, status, event_type;
-- Performance validation queries
SELECT 'Total Events' as metric, COUNT(*)::text as value FROM events
UNION ALL
SELECT 'Manual Overrides' as metric, COUNT(*)::text as value FROM events_manual
UNION ALL
SELECT 'Events with Merges' as metric, COUNT(*)::text as value FROM events_manual WHERE merged_event_ids IS NOT NULL
UNION ALL
SELECT 'Avg Merge Size' as metric, ROUND(AVG(array_length(merged_event_ids, 1)), 2)::text as value FROM events_manual WHERE merged_event_ids IS NOT NULL
UNION ALL
SELECT 'Max Merge Size' as metric, MAX(array_length(merged_event_ids, 1))::text as value FROM events_manual WHERE merged_event_ids IS NOT NULL;
-- Critical query performance test
SELECT COUNT(*) as active_events_last_7_days
FROM merged_events
WHERE time >= NOW() - INTERVAL '7 days' AND status = 'active';
-- Complex merge analysis
SELECT
array_length(merged_event_ids, 1) as merge_size,
COUNT(*) as frequency,
ROUND(100.0 * COUNT(*) / SUM(COUNT(*)) OVER (), 2) as percentage
FROM merged_events
WHERE merged_event_ids IS NOT NULL
GROUP BY array_length(merged_event_ids, 1)
ORDER BY merge_size DESC
LIMIT 10;
-- Exclusion periods for each foreign_id (offline/maintenance)
CREATE OR REPLACE VIEW event_timeline_exclusions AS
SELECT
foreign_id,
start_time,
end_time,
status
FROM merged_events
WHERE status IN ('offline', 'maintenance');
-- Function to get valid (non-excluded) periods for a given foreign_id
CREATE OR REPLACE FUNCTION get_valid_periods(foreign_id TEXT)
RETURNS TABLE (start_time TIMESTAMPTZ, end_time TIMESTAMPTZ) AS $$
BEGIN
RETURN QUERY
WITH base_periods AS (
SELECT start_time, end_time
FROM merged_events
WHERE foreign_id = get_valid_periods.foreign_id
AND status NOT IN ('offline', 'maintenance')
),
exclusions AS (
SELECT start_time, end_time
FROM event_timeline_exclusions
WHERE foreign_id = get_valid_periods.foreign_id
),
-- Split base periods by exclusions
split_periods AS (
SELECT b.start_time AS base_start, b.end_time AS base_end, e.start_time AS ex_start, e.end_time AS ex_end
FROM base_periods b
LEFT JOIN exclusions e
ON b.start_time < e.end_time AND b.end_time > e.start_time
)
SELECT
GREATEST(base_start, COALESCE(ex_end, base_start)) AS start_time,
LEAST(base_end, COALESCE(ex_start, base_end)) AS end_time
FROM split_periods
WHERE (ex_start IS NULL OR base_start < ex_start)
AND (ex_end IS NULL OR base_end > ex_end)
AND GREATEST(base_start, COALESCE(ex_end, base_start)) < LEAST(base_end, COALESCE(ex_start, base_end));
END;
$$ LANGUAGE plpgsql STABLE;
-- Function to get exclusion multirange for efficient filtering
CREATE OR REPLACE FUNCTION get_exclusion_multirange(foreign_id TEXT)
RETURNS tstzmultirange AS $$
DECLARE
exclusion_ranges tstzmultirange;
BEGIN
SELECT COALESCE(
range_agg(tstzrange(e.start_time, e.end_time))::tstzmultirange,
'{}'::tstzmultirange
) INTO exclusion_ranges
FROM event_timeline_exclusions e
WHERE e.foreign_id = get_exclusion_multirange.foreign_id;
RETURN exclusion_ranges;
END;
$$ LANGUAGE plpgsql STABLE;
-- Flexible interval function: get all intervals for asset_id and statuses
CREATE OR REPLACE FUNCTION interval_from(asset_id TEXT, statuses TEXT[])
RETURNS TABLE (start_time TIMESTAMPTZ, end_time TIMESTAMPTZ, status TEXT) AS $$
BEGIN
RETURN QUERY
SELECT m.start_time, m.end_time, m.status
FROM merged_events m
WHERE m.foreign_id = asset_id
AND m.status = ANY(statuses);
END;
$$ LANGUAGE plpgsql STABLE;
-- Generate sample data for testing exclusion logic
INSERT INTO sample_data (time, foreign_id, value)
WITH time_series AS (
SELECT
generate_series(
NOW() - INTERVAL '30 days',
NOW(),
INTERVAL '1 hour'
) as time_point
),
foreign_ids AS (
SELECT DISTINCT foreign_id
FROM events
WHERE foreign_id IN ('event_32', 'event_324', 'event_100019', 'event_100031')
LIMIT 10
),
data_points AS (
SELECT
t.time_point as time,
f.foreign_id,
-- Generate realistic sensor-like values with some noise
50 + 20 * sin(extract(epoch from t.time_point) / 86400) +
(random() - 0.5) * 10 as value
FROM time_series t
CROSS JOIN foreign_ids f
)
SELECT time, foreign_id, value
FROM data_points
WHERE random() < 0.8; -- 80% data density for realistic gaps
-- Efficient filtered view that excludes offline/maintenance periods
CREATE OR REPLACE VIEW filtered_sample_data AS
SELECT
s.time,
s.foreign_id,
s.value
FROM sample_data s
WHERE NOT (
get_exclusion_multirange(s.foreign_id) @> s.time
);
\timing off