diff --git a/conf/topic_dlchange.json b/conf/topic_dlchange.json index 7cd7c22..dbf75f7 100644 --- a/conf/topic_dlchange.json +++ b/conf/topic_dlchange.json @@ -25,6 +25,10 @@ "type": "number", "description": "Timestamp of the event in epoch milliseconds" }, + "country": { + "type": "string", + "description": "The country the data is related to." + }, "catalog_id": { "type": "string", "description": "Identifier for the data definition (Glue/Hive) database and table name for example " diff --git a/conf/topic_runs.json b/conf/topic_runs.json index 6239cd8..8197c02 100644 --- a/conf/topic_runs.json +++ b/conf/topic_runs.json @@ -39,6 +39,10 @@ "items": { "type": "object", "properties": { + "country": { + "type": "string", + "description": "The country the data is related to." + }, "catalog_id": { "type": "string", "description": "Identifier for the data definition (Glue/Hive) database and table name for example" diff --git a/src/writers/writer_postgres.py b/src/writers/writer_postgres.py index ecce377..e7979c1 100644 --- a/src/writers/writer_postgres.py +++ b/src/writers/writer_postgres.py @@ -96,6 +96,7 @@ def postgres_edla_write(cursor, table: str, message: Dict[str, Any]) -> None: source_app_version, environment, timestamp_event, + country, catalog_id, operation, "location", @@ -116,6 +117,7 @@ def postgres_edla_write(cursor, table: str, message: Dict[str, Any]) -> None: %s, %s, %s, + %s, %s )""", ( @@ -125,6 +127,7 @@ def postgres_edla_write(cursor, table: str, message: Dict[str, Any]) -> None: message["source_app_version"], message["environment"], message["timestamp_event"], + message.get("country", ""), message["catalog_id"], message["operation"], message.get("location"), @@ -187,6 +190,7 @@ def postgres_run_write(cursor, table_runs: str, table_jobs: str, message: Dict[s INSERT INTO {table_jobs} ( event_id, + country, catalog_id, status, timestamp_start, @@ -202,10 +206,12 @@ def postgres_run_write(cursor, table_runs: str, table_jobs: str, message: Dict[s %s, %s, %s, + %s, %s )""", ( message["event_id"], + job.get("country", ""), job["catalog_id"], job["status"], job["timestamp_start"], diff --git a/tests/writers/test_writer_postgres.py b/tests/writers/test_writer_postgres.py index 678b664..bdb8d6d 100644 --- a/tests/writers/test_writer_postgres.py +++ b/tests/writers/test_writer_postgres.py @@ -47,6 +47,7 @@ def test_postgres_edla_write_with_optional_fields(): "source_app_version": "1.0.0", "environment": "dev", "timestamp_event": 111, + "country": "za", "catalog_id": "db.tbl", "operation": "append", "location": "s3://bucket/path", @@ -57,12 +58,13 @@ def test_postgres_edla_write_with_optional_fields(): writer_postgres.postgres_edla_write(cur, "table_a", message) assert len(cur.executions) == 1 _sql, params = cur.executions[0] - assert len(params) == 12 + assert len(params) == 13 assert params[0] == "e1" - assert params[8] == "s3://bucket/path" - assert params[9] == "parquet" - assert json.loads(params[10]) == {"compression": "snappy"} - assert json.loads(params[11]) == {"foo": "bar"} + assert params[6] == "za" + assert params[9] == "s3://bucket/path" + assert params[10] == "parquet" + assert json.loads(params[11]) == {"compression": "snappy"} + assert json.loads(params[12]) == {"foo": "bar"} def test_postgres_edla_write_missing_optional(): @@ -80,10 +82,11 @@ def test_postgres_edla_write_missing_optional(): } writer_postgres.postgres_edla_write(cur, "table_a", message) _sql, params = cur.executions[0] - assert params[8] is None - assert params[9] == "delta" - assert params[10] is None + assert params[6] == "" + assert params[9] is None + assert params[10] == "delta" assert params[11] is None + assert params[12] is None def test_postgres_run_write(): @@ -100,6 +103,7 @@ def test_postgres_run_write(): "jobs": [ {"catalog_id": "c1", "status": "succeeded", "timestamp_start": 1100, "timestamp_end": 1200}, { + "country": "bw", "catalog_id": "c2", "status": "failed", "timestamp_start": 1300, @@ -111,12 +115,23 @@ def test_postgres_run_write(): } writer_postgres.postgres_run_write(cur, "runs_table", "jobs_table", message) assert len(cur.executions) == 3 + + # Check run insert run_sql, run_params = cur.executions[0] assert "source_app_version" in run_sql assert run_params[3] == "runapp" + + # Check first job + _job1_sql, job1_params = cur.executions[1] + assert job1_params[1] == "" + assert job1_params[2] == "c1" + + # Check second job _job2_sql, job2_params = cur.executions[2] - assert job2_params[5] == "err" - assert json.loads(job2_params[6]) == {"k": "v"} + assert job2_params[1] == "bw" + assert job2_params[2] == "c2" + assert job2_params[6] == "err" + assert json.loads(job2_params[7]) == {"k": "v"} def test_postgres_test_write():