Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions conf/topic_dlchange.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
"type": "number",
"description": "Timestamp of the event in epoch milliseconds"
},
"country": {
"type": "string",
"description": "The country the data is related to."
},
"catalog_id": {
"type": "string",
"description": "Identifier for the data definition (Glue/Hive) database and table name for example "
Expand Down
4 changes: 4 additions & 0 deletions conf/topic_runs.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@
"items": {
"type": "object",
"properties": {
"country": {
"type": "string",
"description": "The country the data is related to."
},
"catalog_id": {
"type": "string",
"description": "Identifier for the data definition (Glue/Hive) database and table name for example"
Expand Down
6 changes: 6 additions & 0 deletions src/writers/writer_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ def postgres_edla_write(cursor, table: str, message: Dict[str, Any]) -> None:
source_app_version,
environment,
timestamp_event,
country,
catalog_id,
operation,
"location",
Expand All @@ -116,6 +117,7 @@ def postgres_edla_write(cursor, table: str, message: Dict[str, Any]) -> None:
%s,
%s,
%s,
%s,
%s
)""",
(
Expand All @@ -125,6 +127,7 @@ def postgres_edla_write(cursor, table: str, message: Dict[str, Any]) -> None:
message["source_app_version"],
message["environment"],
message["timestamp_event"],
message.get("country", ""),
message["catalog_id"],
message["operation"],
message.get("location"),
Expand Down Expand Up @@ -187,6 +190,7 @@ def postgres_run_write(cursor, table_runs: str, table_jobs: str, message: Dict[s
INSERT INTO {table_jobs}
(
event_id,
country,
catalog_id,
status,
timestamp_start,
Expand All @@ -202,10 +206,12 @@ def postgres_run_write(cursor, table_runs: str, table_jobs: str, message: Dict[s
%s,
%s,
%s,
%s,
%s
)""",
(
message["event_id"],
job.get("country", ""),
job["catalog_id"],
job["status"],
job["timestamp_start"],
Expand Down
35 changes: 25 additions & 10 deletions tests/writers/test_writer_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def test_postgres_edla_write_with_optional_fields():
"source_app_version": "1.0.0",
"environment": "dev",
"timestamp_event": 111,
"country": "za",
"catalog_id": "db.tbl",
"operation": "append",
"location": "s3://bucket/path",
Expand All @@ -57,12 +58,13 @@ def test_postgres_edla_write_with_optional_fields():
writer_postgres.postgres_edla_write(cur, "table_a", message)
assert len(cur.executions) == 1
_sql, params = cur.executions[0]
assert len(params) == 12
assert len(params) == 13
assert params[0] == "e1"
assert params[8] == "s3://bucket/path"
assert params[9] == "parquet"
assert json.loads(params[10]) == {"compression": "snappy"}
assert json.loads(params[11]) == {"foo": "bar"}
assert params[6] == "za"
assert params[9] == "s3://bucket/path"
assert params[10] == "parquet"
assert json.loads(params[11]) == {"compression": "snappy"}
assert json.loads(params[12]) == {"foo": "bar"}


def test_postgres_edla_write_missing_optional():
Expand All @@ -80,10 +82,11 @@ def test_postgres_edla_write_missing_optional():
}
writer_postgres.postgres_edla_write(cur, "table_a", message)
_sql, params = cur.executions[0]
assert params[8] is None
assert params[9] == "delta"
assert params[10] is None
assert params[6] == ""
assert params[9] is None
assert params[10] == "delta"
assert params[11] is None
assert params[12] is None


def test_postgres_run_write():
Expand All @@ -100,6 +103,7 @@ def test_postgres_run_write():
"jobs": [
{"catalog_id": "c1", "status": "succeeded", "timestamp_start": 1100, "timestamp_end": 1200},
{
"country": "bw",
"catalog_id": "c2",
"status": "failed",
"timestamp_start": 1300,
Expand All @@ -111,12 +115,23 @@ def test_postgres_run_write():
}
writer_postgres.postgres_run_write(cur, "runs_table", "jobs_table", message)
assert len(cur.executions) == 3

# Check run insert
run_sql, run_params = cur.executions[0]
assert "source_app_version" in run_sql
assert run_params[3] == "runapp"

# Check first job
_job1_sql, job1_params = cur.executions[1]
assert job1_params[1] == ""
assert job1_params[2] == "c1"

# Check second job
_job2_sql, job2_params = cur.executions[2]
assert job2_params[5] == "err"
assert json.loads(job2_params[6]) == {"k": "v"}
assert job2_params[1] == "bw"
assert job2_params[2] == "c2"
assert job2_params[6] == "err"
assert json.loads(job2_params[7]) == {"k": "v"}


def test_postgres_test_write():
Expand Down