Skip to content

Commit be28362

Browse files
authored
Add country field to EDLA schema (#98)
* Field country added to the EDLA schema.
1 parent 88edb67 commit be28362

File tree

4 files changed

+39
-10
lines changed

4 files changed

+39
-10
lines changed

conf/topic_dlchange.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@
2525
"type": "number",
2626
"description": "Timestamp of the event in epoch milliseconds"
2727
},
28+
"country": {
29+
"type": "string",
30+
"description": "The country the data is related to."
31+
},
2832
"catalog_id": {
2933
"type": "string",
3034
"description": "Identifier for the data definition (Glue/Hive) database and table name for example "

conf/topic_runs.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@
3939
"items": {
4040
"type": "object",
4141
"properties": {
42+
"country": {
43+
"type": "string",
44+
"description": "The country the data is related to."
45+
},
4246
"catalog_id": {
4347
"type": "string",
4448
"description": "Identifier for the data definition (Glue/Hive) database and table name for example"

src/writers/writer_postgres.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ def postgres_edla_write(cursor, table: str, message: Dict[str, Any]) -> None:
9696
source_app_version,
9797
environment,
9898
timestamp_event,
99+
country,
99100
catalog_id,
100101
operation,
101102
"location",
@@ -116,6 +117,7 @@ def postgres_edla_write(cursor, table: str, message: Dict[str, Any]) -> None:
116117
%s,
117118
%s,
118119
%s,
120+
%s,
119121
%s
120122
)""",
121123
(
@@ -125,6 +127,7 @@ def postgres_edla_write(cursor, table: str, message: Dict[str, Any]) -> None:
125127
message["source_app_version"],
126128
message["environment"],
127129
message["timestamp_event"],
130+
message.get("country", ""),
128131
message["catalog_id"],
129132
message["operation"],
130133
message.get("location"),
@@ -187,6 +190,7 @@ def postgres_run_write(cursor, table_runs: str, table_jobs: str, message: Dict[s
187190
INSERT INTO {table_jobs}
188191
(
189192
event_id,
193+
country,
190194
catalog_id,
191195
status,
192196
timestamp_start,
@@ -202,10 +206,12 @@ def postgres_run_write(cursor, table_runs: str, table_jobs: str, message: Dict[s
202206
%s,
203207
%s,
204208
%s,
209+
%s,
205210
%s
206211
)""",
207212
(
208213
message["event_id"],
214+
job.get("country", ""),
209215
job["catalog_id"],
210216
job["status"],
211217
job["timestamp_start"],

tests/writers/test_writer_postgres.py

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ def test_postgres_edla_write_with_optional_fields():
4747
"source_app_version": "1.0.0",
4848
"environment": "dev",
4949
"timestamp_event": 111,
50+
"country": "za",
5051
"catalog_id": "db.tbl",
5152
"operation": "append",
5253
"location": "s3://bucket/path",
@@ -57,12 +58,13 @@ def test_postgres_edla_write_with_optional_fields():
5758
writer_postgres.postgres_edla_write(cur, "table_a", message)
5859
assert len(cur.executions) == 1
5960
_sql, params = cur.executions[0]
60-
assert len(params) == 12
61+
assert len(params) == 13
6162
assert params[0] == "e1"
62-
assert params[8] == "s3://bucket/path"
63-
assert params[9] == "parquet"
64-
assert json.loads(params[10]) == {"compression": "snappy"}
65-
assert json.loads(params[11]) == {"foo": "bar"}
63+
assert params[6] == "za"
64+
assert params[9] == "s3://bucket/path"
65+
assert params[10] == "parquet"
66+
assert json.loads(params[11]) == {"compression": "snappy"}
67+
assert json.loads(params[12]) == {"foo": "bar"}
6668

6769

6870
def test_postgres_edla_write_missing_optional():
@@ -80,10 +82,11 @@ def test_postgres_edla_write_missing_optional():
8082
}
8183
writer_postgres.postgres_edla_write(cur, "table_a", message)
8284
_sql, params = cur.executions[0]
83-
assert params[8] is None
84-
assert params[9] == "delta"
85-
assert params[10] is None
85+
assert params[6] == ""
86+
assert params[9] is None
87+
assert params[10] == "delta"
8688
assert params[11] is None
89+
assert params[12] is None
8790

8891

8992
def test_postgres_run_write():
@@ -100,6 +103,7 @@ def test_postgres_run_write():
100103
"jobs": [
101104
{"catalog_id": "c1", "status": "succeeded", "timestamp_start": 1100, "timestamp_end": 1200},
102105
{
106+
"country": "bw",
103107
"catalog_id": "c2",
104108
"status": "failed",
105109
"timestamp_start": 1300,
@@ -111,12 +115,23 @@ def test_postgres_run_write():
111115
}
112116
writer_postgres.postgres_run_write(cur, "runs_table", "jobs_table", message)
113117
assert len(cur.executions) == 3
118+
119+
# Check run insert
114120
run_sql, run_params = cur.executions[0]
115121
assert "source_app_version" in run_sql
116122
assert run_params[3] == "runapp"
123+
124+
# Check first job
125+
_job1_sql, job1_params = cur.executions[1]
126+
assert job1_params[1] == ""
127+
assert job1_params[2] == "c1"
128+
129+
# Check second job
117130
_job2_sql, job2_params = cur.executions[2]
118-
assert job2_params[5] == "err"
119-
assert json.loads(job2_params[6]) == {"k": "v"}
131+
assert job2_params[1] == "bw"
132+
assert job2_params[2] == "c2"
133+
assert job2_params[6] == "err"
134+
assert json.loads(job2_params[7]) == {"k": "v"}
120135

121136

122137
def test_postgres_test_write():

0 commit comments

Comments
 (0)