Skip to content

Commit 8b06880

Browse files
committed
ENH: Partitioning and rerun in schema generation
Those changes improves the capabilities of the database schema creation. It's now easier to switch to the TimescaleDB or use partitioning in PostgreSQL. Creating scripts can be generated from the harmonisation, as previously just the most basic version. If requested, creation commands get IF NOT EXISTS or OR REPLACE directives to allow re-runing script.
1 parent 5e90124 commit 8b06880

File tree

3 files changed

+76
-26
lines changed

3 files changed

+76
-26
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ CHANGELOG
3434
### Tests
3535

3636
### Tools
37+
- `intelmq_psql_initdb` got support for providing custom harmonization file, generating view for storing `raw` fields separately, and adding `IF NOT EXISTS`/`OR REPLACE` clauses (PR by Kamil Mankowski).
3738

3839
### Contrib
3940

intelmq/bin/intelmq_psql_initdb.py

Lines changed: 41 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# SPDX-FileCopyrightText: 2015 Sebastian Wagner
1+
# SPDX-FileCopyrightText: 2015 Sebastian Wagner, 2023 CERT.at GmbH
22
#
33
# SPDX-License-Identifier: AGPL-3.0-or-later
44

@@ -32,48 +32,52 @@
3232
"""
3333

3434

35-
def _generate_events_schema(fields: dict) -> list:
35+
def _generate_events_schema(fields: dict, partition_key: str = None) -> list:
3636
sql_lines = []
37-
sql_lines.append("CREATE TABLE events (")
38-
sql_lines.append(' "id" BIGSERIAL UNIQUE PRIMARY KEY,')
37+
sql_lines.append("CREATE TABLE{if_not_exist} events (")
38+
sql_lines.append(f' "id" BIGSERIAL{" UNIQUE PRIMARY KEY" if not partition_key else ""},')
3939

4040
for field, field_type in sorted(fields.items()):
4141
sql_lines.append(f' "{field}" {field_type},')
4242

43-
sql_lines[-1] = sql_lines[-1][:-1] # remove last ','
43+
if not partition_key:
44+
sql_lines[-1] = sql_lines[-1][:-1] # remove last ','
45+
else:
46+
sql_lines.append(f' PRIMARY KEY ("id", "{partition_key}")')
4447
sql_lines.append(");")
4548

4649
for index in INDICES:
47-
sql_lines.append('CREATE INDEX "idx_events_{0}" ON events USING btree ("{0}");'.format(index))
50+
sql_lines.append(f'CREATE INDEX{{if_not_exist}} "idx_events_{index}" ON events USING btree ("{index}");')
4851
return sql_lines
4952

5053

51-
RAW_TABLE = """
52-
CREATE TABLE public.raws (
54+
RAW_TABLE_PART = """
55+
CREATE TABLE{if_not_exist} public.raws (
5356
event_id bigint,
5457
raw text,
55-
PRIMARY KEY(event_id),
56-
CONSTRAINT raws_event_id_fkey FOREIGN KEY (event_id) REFERENCES public.events(id) ON DELETE CASCADE
57-
);
58-
"""
58+
PRIMARY KEY(event_id)"""
5959

6060
RAW_TRIGGER = """
61-
CREATE TRIGGER tr_events
61+
CREATE{or_replace} TRIGGER tr_events
6262
INSTEAD OF INSERT
6363
ON public.v_events
6464
FOR EACH ROW
6565
EXECUTE FUNCTION public.process_v_events_insert();
6666
"""
6767

6868

69-
def _generate_separated_raws_schema(fields: dict) -> list:
69+
def _generate_separated_raws_schema(fields: dict, partition_key: str) -> list:
7070
sorted_fields = sorted(key for key in fields.keys() if key != "raw")
71-
sql_lines = ['-- Create the table holding only the "raw" values\n', RAW_TABLE]
71+
sql_lines = ['', '-- Create the table holding only the "raw" values', RAW_TABLE_PART]
72+
if not partition_key:
73+
sql_lines[-1] += ","
74+
sql_lines.append(" CONSTRAINT raws_event_id_fkey FOREIGN KEY (event_id) REFERENCES public.events(id) ON DELETE CASCADE")
75+
sql_lines.append(");")
7276

7377
sql_lines.extend([
7478
'',
7579
'-- Create the v_events view which joins the tables "events" and "raws"\n',
76-
'CREATE VIEW public.v_events AS',
80+
'CREATE{or_replace} VIEW public.v_events AS',
7781
' SELECT',
7882
' events.id,',
7983
])
@@ -91,7 +95,7 @@ def _generate_separated_raws_schema(fields: dict) -> list:
9195
'',
9296
'-- Establish the INSERT trigger for the events table, splitting the data into events and raws',
9397
'',
94-
'CREATE FUNCTION public.process_v_events_insert()',
98+
'CREATE{or_replace} FUNCTION public.process_v_events_insert()',
9599
' RETURNS trigger',
96100
' LANGUAGE plpgsql',
97101
' AS $$',
@@ -127,7 +131,8 @@ def _generate_separated_raws_schema(fields: dict) -> list:
127131
return sql_lines
128132

129133

130-
def generate(harmonization_file=HARMONIZATION_CONF_FILE, skip_events=False, separate_raws=False):
134+
def generate(harmonization_file=HARMONIZATION_CONF_FILE, skip_events=False,
135+
separate_raws=False, partition_key=None, skip_or_replace=False):
131136
FIELDS = {}
132137
sql_lines = []
133138

@@ -172,12 +177,17 @@ def generate(harmonization_file=HARMONIZATION_CONF_FILE, skip_events=False, sepa
172177
FIELDS[field] = dbtype
173178

174179
if not skip_events:
175-
sql_lines.extend(_generate_events_schema(FIELDS))
180+
sql_lines.extend(_generate_events_schema(FIELDS, partition_key))
176181

177182
if separate_raws:
178-
sql_lines.extend(_generate_separated_raws_schema(FIELDS))
183+
sql_lines.extend(_generate_separated_raws_schema(FIELDS, partition_key))
179184

180-
return "\n".join(sql_lines)
185+
existing_clause = " IF NOT EXISTS" if skip_or_replace else ""
186+
replace_clause = " OR REPLACE" if skip_or_replace else ""
187+
188+
return "\n".join(
189+
line.format(if_not_exist=existing_clause, or_replace=replace_clause) for line in sql_lines
190+
)
181191

182192

183193
def main():
@@ -193,12 +203,14 @@ def main():
193203
help="Skip generating the events table schema")
194204
parser.add_argument("--separate-raws", action="store_true", default=False,
195205
help="Generate v_events view to separate raw field from the rest of the data on insert")
196-
parser.add_argument("--partition-field", default=None,
197-
help="Add given field to all generated indexes. Useful when utilizing partitioning for TimescaleDB")
206+
parser.add_argument("--partition-key", default=None,
207+
help=("Add given field to the primary key of the events table. "
208+
"Useful when utilizing partitioning for TimescaleDB. "
209+
"If combined with --separate-raws, the v_events does not get foreign key"))
198210
parser.add_argument("--harmonization", default=HARMONIZATION_CONF_FILE,
199211
help="Path to the harmonization file")
200-
parser.add_argument("--if-not-exists", default=False,
201-
help="Add IF NOT EXISTS directive to created schemas")
212+
parser.add_argument("--skip-or-replace", default=False, action="store_true",
213+
help="Add IF NOT EXISTS or REPLACE directive to created schemas")
202214
args = parser.parse_args()
203215

204216
OUTPUTFILE = args.outputfile
@@ -213,7 +225,10 @@ def main():
213225
fp = open(OUTPUTFILE, 'w')
214226
psql = generate(args.harmonization,
215227
skip_events=args.no_events,
216-
separate_raws=args.separate_raws)
228+
separate_raws=args.separate_raws,
229+
partition_key=args.partition_key,
230+
skip_or_replace=args.skip_or_replace,
231+
)
217232
print("INFO - Writing %s file" % OUTPUTFILE)
218233
fp.write(psql)
219234
finally:

intelmq/tests/bin/test_psql_initdb.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ def test_separated_raws_view_schema(self):
101101
generated = psql_initdb.generate(self.harmonization_path, separate_raws=True)
102102
generated = self._normalize_leading_whitespaces(generated)
103103
self.assertIn("CREATE TABLE public.raws", generated) # static schema, check if added
104+
self.assertIn("CONSTRAINT raws_event_id_fkey", generated)
104105
self.assertIn(self._normalize_leading_whitespaces(expected_view), generated)
105106

106107
def test_separated_raws_trigger(self):
@@ -137,6 +138,39 @@ def test_separated_raws_trigger(self):
137138
self.assertIn(self._normalize_leading_whitespaces(expected_function), generated)
138139
self.assertIn("CREATE TRIGGER tr_events", generated) # Static, check if added
139140

141+
def test_partition_field(self):
142+
"""For paritioned table """
143+
expected_table = """
144+
CREATE TABLE events (
145+
"id" BIGSERIAL,
146+
"classification.identifier" text,
147+
"raw" text,
148+
"time.source" timestamp with time zone,
149+
PRIMARY KEY ("id", "time.source")
150+
);
151+
"""
152+
generated = psql_initdb.generate(self.harmonization_path, partition_key="time.source",
153+
separate_raws=True)
154+
generated = self._normalize_leading_whitespaces(generated)
155+
self.assertIn(self._normalize_leading_whitespaces(expected_table), generated)
156+
# Foreign key may not be supported on partitioned or Timescale tables, skipping
157+
self.assertNotIn("CONSTRAINT raws_event_id_fkey", generated)
158+
159+
def test_skip_or_replace(self):
160+
expected_creates = [
161+
"CREATE TABLE IF NOT EXISTS public.raws",
162+
"CREATE TABLE IF NOT EXISTS events",
163+
'CREATE INDEX IF NOT EXISTS "idx_events_classification.identifier"',
164+
# Do not support IF NOT EXISTS:
165+
"CREATE OR REPLACE TRIGGER tr_events",
166+
"CREATE OR REPLACE FUNCTION public.process_v_events_insert()",
167+
"CREATE OR REPLACE VIEW public.v_events",
168+
]
169+
generated = psql_initdb.generate(self.harmonization_path, separate_raws=True,
170+
skip_or_replace=True)
171+
for create in expected_creates:
172+
self.assertIn(create, generated)
173+
140174
@staticmethod
141175
def _normalize_leading_whitespaces(data: str) -> str:
142176
return re.sub(r"^(\s)*", " ", data.strip(), flags=re.MULTILINE)

0 commit comments

Comments
 (0)