Skip to content

Commit 50d87fd

Browse files
RostyslavKachanjdobes
authored andcommitted
chore: create new system_tag_set table for Cyndi migration
RHINENG-23538
1 parent 84f6738 commit 50d87fd

File tree

7 files changed

+93
-9
lines changed

7 files changed

+93
-9
lines changed

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ RUN mkdir -p /tmp/prometheus_multiproc && \
6161
ENV prometheus_multiproc_dir=/tmp/prometheus_multiproc
6262

6363
# minimal schema required by application, used for waiting in services until DB migration is finished
64-
ENV MINIMAL_SCHEMA=154
64+
ENV MINIMAL_SCHEMA=158
6565

6666
# Baked-in content for FedRAMP
6767
ARG STATIC_ASSETS=0

common/peewee_model.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,18 @@ class Meta:
148148
table_name = "system_group_set"
149149

150150

151+
class SystemTagSet(BaseModel):
152+
"""system_tag_set table"""
153+
154+
tags = BinaryJSONField(null=False)
155+
tags_checksum = TextField(null=False, unique=True)
156+
157+
class Meta:
158+
"""system_tag_set table metadata"""
159+
160+
table_name = "system_tag_set"
161+
162+
151163
class SystemPlatform(BaseModelRW):
152164
"""system_platform table"""
153165

@@ -176,6 +188,7 @@ class SystemPlatform(BaseModelRW):
176188
host_type = TextField(null=True)
177189
operating_system_id = ForeignKeyField(column_name="operating_system_id", model=OperatingSystem, field="id")
178190
group_set_id = ForeignKeyField(column_name="group_set_id", model=SystemGroupSet, field="id")
191+
tag_set_id = ForeignKeyField(column_name="tag_set_id", model=SystemTagSet, field="id")
179192

180193
class Meta:
181194
"""system_platform table metadata"""
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
CREATE TABLE IF NOT EXISTS system_tag_set (
2+
id BIGSERIAL,
3+
tags JSONB NOT NULL,
4+
tags_checksum TEXT NOT NULL,
5+
PRIMARY KEY (id),
6+
UNIQUE (tags_checksum)
7+
) TABLESPACE pg_default;
8+
9+
CREATE INDEX ON system_tag_set USING gin (tags jsonb_path_ops);
10+
11+
12+
ALTER TABLE system_platform ADD COLUMN tag_set_id BIGINT;
13+
ALTER TABLE system_platform ADD CONSTRAINT tag_set_id FOREIGN KEY (tag_set_id) REFERENCES system_tag_set (id);
14+
CREATE INDEX ON system_platform(tag_set_id);

database/schema/ve_db_postgresql.sql

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ CREATE TABLE IF NOT EXISTS db_version (
1717
) TABLESPACE pg_default;
1818

1919
-- set the schema version directly in the insert statement here!!
20-
INSERT INTO db_version (name, version) VALUES ('schema_version', 157);
20+
INSERT INTO db_version (name, version) VALUES ('schema_version', 158);
2121
-- INSERT INTO db_version (name, version) VALUES ('schema_version', :schema_version);
2222

2323

@@ -191,6 +191,16 @@ CREATE TABLE IF NOT EXISTS system_group_set (
191191

192192
CREATE INDEX ON system_group_set USING gin (groups jsonb_path_ops);
193193

194+
CREATE TABLE IF NOT EXISTS system_tag_set (
195+
id BIGSERIAL,
196+
tags JSONB NOT NULL,
197+
tags_checksum TEXT NOT NULL,
198+
PRIMARY KEY (id),
199+
UNIQUE (tags_checksum)
200+
) TABLESPACE pg_default;
201+
202+
CREATE INDEX ON system_tag_set USING gin (tags jsonb_path_ops);
203+
194204
-- system_platform
195205
CREATE TABLE IF NOT EXISTS system_platform (
196206
id BIGSERIAL,
@@ -220,6 +230,7 @@ CREATE TABLE IF NOT EXISTS system_platform (
220230
rule_results JSONB,
221231
operating_system_id INT,
222232
group_set_id BIGINT,
233+
tag_set_id BIGINT,
223234
UNIQUE (inventory_id),
224235
CONSTRAINT rh_account_id
225236
FOREIGN KEY (rh_account_id)
@@ -229,7 +240,10 @@ CREATE TABLE IF NOT EXISTS system_platform (
229240
REFERENCES operating_system (id),
230241
CONSTRAINT group_set_id
231242
FOREIGN KEY (group_set_id)
232-
REFERENCES system_group_set (id)
243+
REFERENCES system_group_set (id),
244+
CONSTRAINT tag_set_id
245+
FOREIGN KEY (tag_set_id)
246+
REFERENCES system_tag_set (id)
233247
) TABLESPACE pg_default;
234248

235249
CREATE INDEX ON system_platform(rh_account_id);
@@ -248,6 +262,8 @@ CREATE INDEX ON system_platform(operating_system_id);
248262

249263
CREATE INDEX ON system_platform(group_set_id);
250264

265+
CREATE INDEX ON system_platform(tag_set_id);
266+
251267
CREATE TRIGGER system_platform_set_first_reported
252268
BEFORE INSERT ON system_platform
253269
FOR EACH ROW EXECUTE PROCEDURE set_first_reported();

listener/inventory_processor.py

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ def __init__(self, *args, **kwargs):
5656
self.grouper = MQWriter(CFG.grouper_inventory_topic, partitioner=Partitioners.org_id_partitioner, loop=self.loop)
5757
self.repo_id_cache = {}
5858
self.group_set_cache = {}
59+
self.tag_set_cache = {}
5960

6061
async def init(self):
6162
"""Async constructor"""
@@ -282,11 +283,47 @@ async def _db_system_group_set_lookup(self, conn, groups: list[dict]) -> tuple[s
282283
result = await cur.fetchone()
283284
return groups_checksum, result["id"]
284285

286+
async def _db_system_tag_set_lookup(self, conn, tags: list[dict]) -> tuple[str, int]:
287+
tags_str = json.dumps(tags)
288+
tags_checksum = hashlib.sha256(tags_str.encode("utf-8")).hexdigest()
289+
if tags_checksum in self.tag_set_cache:
290+
return tags_checksum, self.tag_set_cache[tags_checksum]
291+
292+
async with conn.cursor(row_factory=dict_row) as cur:
293+
await cur.execute(
294+
"SELECT id FROM system_tag_set WHERE tags_checksum = %s",
295+
(tags_checksum,),
296+
)
297+
result = await cur.fetchone()
298+
if result:
299+
self.tag_set_cache[tags_checksum] = result["id"]
300+
return tags_checksum, result["id"]
301+
302+
await cur.execute(
303+
"""
304+
INSERT INTO system_tag_set (tags, tags_checksum)
305+
VALUES (%s, %s) ON CONFLICT (tags_checksum) DO UPDATE SET
306+
tags = EXCLUDED.tags
307+
RETURNING id
308+
""",
309+
(
310+
tags_str,
311+
tags_checksum,
312+
),
313+
)
314+
result = await cur.fetchone()
315+
return tags_checksum, result["id"]
316+
285317
async def _commit_system_group_set_cache(self, groups_checksum: str, group_set_id: int) -> None:
286318
"""Insert group_set_id to the cache after succesful commit"""
287319
if groups_checksum not in self.group_set_cache:
288320
self.group_set_cache[groups_checksum] = group_set_id
289321

322+
async def _commit_system_tag_set_cache(self, tags_checksum: str, tag_set_id: int) -> None:
323+
"""Insert tag_set_id to the cache after succesful commit"""
324+
if tags_checksum not in self.tag_set_cache:
325+
self.tag_set_cache[tags_checksum] = tag_set_id
326+
290327
async def _db_import_system(self, conn, fields: dict, org_id: str, inventory_id: str) -> (ImportStatus, int):
291328
"""Import system to system_platform table, update if exists"""
292329
rh_account_id, system_id = await self._db_account_system_lookup(conn, org_id, inventory_id)
@@ -299,16 +336,16 @@ async def _db_import_system(self, conn, fields: dict, org_id: str, inventory_id:
299336
INSERT INTO system_platform
300337
(rh_account_id, inventory_id, s3_url, vmaas_json, json_checksum, display_name,
301338
stale_timestamp, stale_warning_timestamp, culled_timestamp,
302-
host_type, last_upload, stale, operating_system_id, group_set_id)
339+
host_type, last_upload, stale, operating_system_id, group_set_id, tag_set_id)
303340
VALUES (%(rh_account_id)s, %(inventory_id)s, %(s3_url)s, %(vmaas_json)s, %(json_checksum)s, %(display_name)s,
304341
%(stale_timestamp)s, %(stale_warning_timestamp)s, %(culled_timestamp)s,
305-
%(host_type)s, %(last_upload)s, 'F', %(operating_system_id)s, %(group_set_id)s)
342+
%(host_type)s, %(last_upload)s, 'F', %(operating_system_id)s, %(group_set_id)s, %(tag_set_id)s)
306343
ON CONFLICT (inventory_id) DO UPDATE SET
307344
rh_account_id = EXCLUDED.rh_account_id, inventory_id = EXCLUDED.inventory_id, s3_url = EXCLUDED.s3_url, vmaas_json = EXCLUDED.vmaas_json,
308345
json_checksum = EXCLUDED.json_checksum, display_name = EXCLUDED.display_name, stale_timestamp = EXCLUDED.stale_timestamp,
309346
stale_warning_timestamp = EXCLUDED.stale_warning_timestamp, culled_timestamp = EXCLUDED.culled_timestamp,
310347
host_type = EXCLUDED.host_type, last_upload = EXCLUDED.last_upload, stale = EXCLUDED.stale, operating_system_id = EXCLUDED.operating_system_id,
311-
group_set_id = EXCLUDED.group_set_id
348+
group_set_id = EXCLUDED.group_set_id, tag_set_id = EXCLUDED.tag_set_id
312349
RETURNING (xmax = 0) AS inserted, unchanged_since, last_evaluation, id, when_deleted
313350
"""
314351
else:
@@ -325,7 +362,8 @@ async def _db_import_system(self, conn, fields: dict, org_id: str, inventory_id:
325362
last_upload = %(last_upload)s,
326363
stale = FALSE,
327364
operating_system_id = %(operating_system_id)s,
328-
group_set_id = %(group_set_id)s
365+
group_set_id = %(group_set_id)s,
366+
tag_set_id = %(tag_set_id)s
329367
WHERE id = %(system_id)s
330368
RETURNING (xmax = 0) AS inserted, unchanged_since, last_evaluation, id, when_deleted
331369
"""
@@ -439,8 +477,10 @@ async def _process_upload(self, msg: InventoryMsg):
439477
operating_system = msg.msg["host"]["system_profile"]["operating_system"]
440478
operating_system_id = await self._db_operating_system_lookup(conn, operating_system)
441479
groups_checksum, group_set_id = await self._db_system_group_set_lookup(conn, msg.msg["host"]["groups"])
480+
tags_checksum, tag_set_id = await self._db_system_tag_set_lookup(conn, msg.msg["host"]["tags"])
442481
insert_fields["operating_system_id"] = operating_system_id
443482
insert_fields["group_set_id"] = group_set_id
483+
insert_fields["tag_set_id"] = tag_set_id
444484
import_status, system_id = await self._db_import_system(conn, insert_fields, org_id, inventory_id)
445485
if ImportStatus.FAILED in import_status:
446486
return
@@ -454,6 +494,7 @@ async def _process_upload(self, msg: InventoryMsg):
454494
# Insert new items to the cache only after succesfully commited insert/update
455495
await self._commit_operating_system_cache(operating_system, operating_system_id)
456496
await self._commit_system_group_set_cache(groups_checksum, group_set_id)
497+
await self._commit_system_tag_set_cache(tags_checksum, tag_set_id)
457498
except Exception as exc:
458499
DATABASE_ERROR.inc()
459500
LOGGER.exception("Error importing system: %s", exc)

listener/listener.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class InventoryMsgValidator:
4747
"""Inventory kafka message validator"""
4848

4949
_created_updated_msg_fields = {
50-
"host": ["id", "org_id", "system_profile", "display_name", "reporter", "per_reporter_staleness", "groups"],
50+
"host": ["id", "org_id", "system_profile", "display_name", "reporter", "per_reporter_staleness", "groups", "tags"],
5151
"timestamp": [],
5252
"type": [],
5353
}

tests/taskomatic_tests/test_db_metrics.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,4 @@ def test_db_metrics(pg_db_conn, monkeypatch): # pylint: disable=unused-argument
2121

2222
assert dm.METRIC_SYSTEMS.collect()[0].samples[0].value == 33 # there are 33 systems in DB
2323
assert dm.METRIC_CYNDI_SYSTEMS.collect()[0].samples[0].value == 33 # there are also 33 systems syndicated
24-
assert len(dm.METRIC_TABLE_SIZE.collect()[0].samples) == 547
24+
assert len(dm.METRIC_TABLE_SIZE.collect()[0].samples) == 548 # number of tables in DB

0 commit comments

Comments
 (0)