Skip to content

Commit 5e90124

Browse files
committed
ENH: Extend initdb script with separating raws
1 parent a86a587 commit 5e90124

File tree

2 files changed

+231
-15
lines changed

2 files changed

+231
-15
lines changed

intelmq/bin/intelmq_psql_initdb.py

Lines changed: 116 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@
1010
The SQL file is saved in `/tmp/initdb.sql` or a temporary name if the other one
1111
exists.
1212
"""
13+
import argparse
1314
import json
1415
import os
1516
import sys
1617
import tempfile
17-
import argparse
1818

1919
from intelmq import HARMONIZATION_CONF_FILE
2020

@@ -32,8 +32,104 @@
3232
"""
3333

3434

35-
def generate(harmonization_file=HARMONIZATION_CONF_FILE):
35+
def _generate_events_schema(fields: dict) -> list:
36+
sql_lines = []
37+
sql_lines.append("CREATE TABLE events (")
38+
sql_lines.append(' "id" BIGSERIAL UNIQUE PRIMARY KEY,')
39+
40+
for field, field_type in sorted(fields.items()):
41+
sql_lines.append(f' "{field}" {field_type},')
42+
43+
sql_lines[-1] = sql_lines[-1][:-1] # remove last ','
44+
sql_lines.append(");")
45+
46+
for index in INDICES:
47+
sql_lines.append('CREATE INDEX "idx_events_{0}" ON events USING btree ("{0}");'.format(index))
48+
return sql_lines
49+
50+
51+
RAW_TABLE = """
52+
CREATE TABLE public.raws (
53+
event_id bigint,
54+
raw text,
55+
PRIMARY KEY(event_id),
56+
CONSTRAINT raws_event_id_fkey FOREIGN KEY (event_id) REFERENCES public.events(id) ON DELETE CASCADE
57+
);
58+
"""
59+
60+
RAW_TRIGGER = """
61+
CREATE TRIGGER tr_events
62+
INSTEAD OF INSERT
63+
ON public.v_events
64+
FOR EACH ROW
65+
EXECUTE FUNCTION public.process_v_events_insert();
66+
"""
67+
68+
69+
def _generate_separated_raws_schema(fields: dict) -> list:
70+
sorted_fields = sorted(key for key in fields.keys() if key != "raw")
71+
sql_lines = ['-- Create the table holding only the "raw" values\n', RAW_TABLE]
72+
73+
sql_lines.extend([
74+
'',
75+
'-- Create the v_events view which joins the tables "events" and "raws"\n',
76+
'CREATE VIEW public.v_events AS',
77+
' SELECT',
78+
' events.id,',
79+
])
80+
for field in sorted_fields:
81+
sql_lines.append(f' events."{field}",')
82+
sql_lines.extend([
83+
' raws."event_id",',
84+
' raws."raw"',
85+
' FROM (',
86+
' public.events',
87+
' JOIN public.raws ON ((events.id = raws.event_id)));'
88+
])
89+
90+
sql_lines.extend([
91+
'',
92+
'-- Establish the INSERT trigger for the events table, splitting the data into events and raws',
93+
'',
94+
'CREATE FUNCTION public.process_v_events_insert()',
95+
' RETURNS trigger',
96+
' LANGUAGE plpgsql',
97+
' AS $$',
98+
' DECLARE event_id integer;',
99+
'',
100+
' BEGIN',
101+
' INSERT INTO',
102+
' events (',
103+
])
104+
for field in sorted_fields:
105+
sql_lines.append(f' "{field}"{"," if field != sorted_fields[-1] else ""}')
106+
sql_lines.extend([
107+
' )',
108+
' VALUES',
109+
' (',
110+
])
111+
for field in sorted_fields:
112+
sql_lines.append(f' NEW."{field}"{"," if field != sorted_fields[-1] else ""}')
113+
sql_lines.extend([
114+
' )',
115+
' RETURNING id INTO event_id;',
116+
' INSERT INTO',
117+
' raws ("event_id", "raw")',
118+
' VALUES',
119+
' (event_id, NEW.raw);',
120+
' RETURN NEW;',
121+
' END;',
122+
'$$;'
123+
])
124+
125+
sql_lines.append(RAW_TRIGGER)
126+
127+
return sql_lines
128+
129+
130+
def generate(harmonization_file=HARMONIZATION_CONF_FILE, skip_events=False, separate_raws=False):
36131
FIELDS = {}
132+
sql_lines = []
37133

38134
try:
39135
print("INFO - Reading %s file" % harmonization_file)
@@ -75,17 +171,13 @@ def generate(harmonization_file=HARMONIZATION_CONF_FILE):
75171

76172
FIELDS[field] = dbtype
77173

78-
initdb = """CREATE TABLE events (
79-
"id" BIGSERIAL UNIQUE PRIMARY KEY,"""
80-
for field, field_type in sorted(FIELDS.items()):
81-
initdb += f'\n "{field}" {field_type},'
174+
if not skip_events:
175+
sql_lines.extend(_generate_events_schema(FIELDS))
82176

83-
initdb = initdb[:-1] # remove last ','
84-
initdb += "\n);\n"
177+
if separate_raws:
178+
sql_lines.extend(_generate_separated_raws_schema(FIELDS))
85179

86-
for index in INDICES:
87-
initdb += 'CREATE INDEX "idx_events_{0}" ON events USING btree ("{0}");\n'.format(index)
88-
return initdb
180+
return "\n".join(sql_lines)
89181

90182

91183
def main():
@@ -97,6 +189,16 @@ def main():
97189
help='Defines the Ouputfile',
98190
default='/tmp/initdb.sql'
99191
)
192+
parser.add_argument("--no-events", action="store_true", default=False,
193+
help="Skip generating the events table schema")
194+
parser.add_argument("--separate-raws", action="store_true", default=False,
195+
help="Generate v_events view to separate raw field from the rest of the data on insert")
196+
parser.add_argument("--partition-field", default=None,
197+
help="Add given field to all generated indexes. Useful when utilizing partitioning for TimescaleDB")
198+
parser.add_argument("--harmonization", default=HARMONIZATION_CONF_FILE,
199+
help="Path to the harmonization file")
200+
parser.add_argument("--if-not-exists", default=False,
201+
help="Add IF NOT EXISTS directive to created schemas")
100202
args = parser.parse_args()
101203

102204
OUTPUTFILE = args.outputfile
@@ -109,7 +211,9 @@ def main():
109211
fp = os.fdopen(os_fp, 'wt')
110212
else:
111213
fp = open(OUTPUTFILE, 'w')
112-
psql = generate()
214+
psql = generate(args.harmonization,
215+
skip_events=args.no_events,
216+
separate_raws=args.separate_raws)
113217
print("INFO - Writing %s file" % OUTPUTFILE)
114218
fp.write(psql)
115219
finally:

intelmq/tests/bin/test_psql_initdb.py

Lines changed: 115 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# SPDX-FileCopyrightText: 2016 Sebastian Wagner
1+
# SPDX-FileCopyrightText: 2016 Sebastian Wagner, 2023 CERT.at GmbH
22
#
33
# SPDX-License-Identifier: AGPL-3.0-or-later
44

@@ -8,26 +8,138 @@
88
99
@author: sebastian
1010
"""
11+
import json
1112
import os
13+
import re
14+
import tempfile
1215
import unittest
1316

1417
import pkg_resources
1518

16-
import intelmq.bin.intelmq_psql_initdb as psql_initd
19+
import intelmq.bin.intelmq_psql_initdb as psql_initdb
1720

1821

1922
class TestPsqlInit(unittest.TestCase):
2023
"""
2124
A TestCase for intelmq_psql_initdb.
2225
"""
2326

27+
def setUp(self) -> None:
28+
super().setUp()
29+
30+
self.tempdir = tempfile.TemporaryDirectory()
31+
self.addCleanup(self.tempdir.cleanup)
32+
33+
self.harmonization_path = f"{self.tempdir.name}/harmonization.conf"
34+
self._create_simple_harmonization()
35+
36+
def _create_simple_harmonization(self):
37+
simple_harmonization = {
38+
"event": {
39+
"classification.identifier": {
40+
"type": "String"
41+
},
42+
"time.source": {
43+
"type": "DateTime"
44+
},
45+
"raw": {
46+
"type": "Base64"
47+
}
48+
}
49+
}
50+
with open(self.harmonization_path, "w+") as f:
51+
json.dump(simple_harmonization, f)
52+
2453
def test_output(self):
2554
""" Compare output to cached one. """
2655
with open(os.path.join(os.path.dirname(__file__),
2756
'initdb.sql')) as handle:
2857
expected = handle.read()
2958
fname = pkg_resources.resource_filename('intelmq', 'etc/harmonization.conf')
30-
self.assertEqual(psql_initd.generate(fname).strip(), expected.strip())
59+
self.assertEqual(psql_initdb.generate(fname).strip(), expected.strip())
60+
61+
def test_generating_events_schema(self):
62+
expected_table = """
63+
CREATE TABLE events (
64+
"id" BIGSERIAL UNIQUE PRIMARY KEY,
65+
"classification.identifier" text,
66+
"raw" text,
67+
"time.source" timestamp with time zone
68+
);
69+
"""
70+
expected_table = self._normalize_leading_whitespaces(expected_table)
71+
expected_indexes = [
72+
"""CREATE INDEX "idx_events_classification.identifier" ON events USING btree ("classification.identifier");""",
73+
"""CREATE INDEX "idx_events_time.source" ON events USING btree ("time.source");"""
74+
]
75+
generated = psql_initdb.generate(self.harmonization_path)
76+
77+
self.assertTrue(self._normalize_leading_whitespaces(generated).startswith(expected_table))
78+
79+
for index in expected_indexes:
80+
self.assertIn(index, generated)
81+
82+
def test_skip_generating_events_table_schema(self):
83+
generated = psql_initdb.generate(self.harmonization_path, skip_events=True)
84+
85+
self.assertNotIn("CREATE TABLE events", generated)
86+
self.assertNotIn("CREATE INDEX", generated)
87+
88+
def test_separated_raws_view_schema(self):
89+
expected_view = """
90+
CREATE VIEW public.v_events AS
91+
SELECT
92+
events.id,
93+
events."classification.identifier",
94+
events."time.source",
95+
raws."event_id",
96+
raws."raw"
97+
FROM (
98+
public.events
99+
JOIN public.raws ON ((events.id = raws.event_id)));
100+
"""
101+
generated = psql_initdb.generate(self.harmonization_path, separate_raws=True)
102+
generated = self._normalize_leading_whitespaces(generated)
103+
self.assertIn("CREATE TABLE public.raws", generated) # static schema, check if added
104+
self.assertIn(self._normalize_leading_whitespaces(expected_view), generated)
105+
106+
def test_separated_raws_trigger(self):
107+
expected_function = """
108+
CREATE FUNCTION public.process_v_events_insert()
109+
RETURNS trigger
110+
LANGUAGE plpgsql
111+
AS $$
112+
DECLARE event_id integer;
113+
114+
BEGIN
115+
INSERT INTO
116+
events (
117+
"classification.identifier",
118+
"time.source"
119+
)
120+
VALUES
121+
(
122+
NEW."classification.identifier",
123+
NEW."time.source"
124+
)
125+
RETURNING id INTO event_id;
126+
INSERT INTO
127+
raws ("event_id", "raw")
128+
VALUES
129+
(event_id, NEW.raw);
130+
RETURN NEW;
131+
END;
132+
$$;
133+
"""
134+
135+
generated = psql_initdb.generate(self.harmonization_path, separate_raws=True)
136+
generated = self._normalize_leading_whitespaces(generated)
137+
self.assertIn(self._normalize_leading_whitespaces(expected_function), generated)
138+
self.assertIn("CREATE TRIGGER tr_events", generated) # Static, check if added
139+
140+
@staticmethod
141+
def _normalize_leading_whitespaces(data: str) -> str:
142+
return re.sub(r"^(\s)*", " ", data.strip(), flags=re.MULTILINE)
31143

32144

33145
if __name__ == '__main__': # pragma: no cover

0 commit comments

Comments
 (0)