Skip to content

Commit 1b21ad3

Browse files
authored
Merge pull request #16 from Kaligo/fix/data-24245-columns-order
fix: columns order in fast-sync CSV file mismatches the target schema [DATA-24245]
2 parents 498c49a + 7a5999a commit 1b21ad3

File tree

3 files changed

+141
-33
lines changed

3 files changed

+141
-33
lines changed

README.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,3 +291,24 @@ Install python dependencies and run python linter
291291
make venv
292292
make pylint
293293
```
294+
295+
### To integrate and run this locally with the Meltano project:
296+
297+
- Use `file://` as the `pip_url` to install the plugin from the local directory.
298+
- Run `meltano install` again to re-install it.
299+
300+
```yaml
301+
# In extract/extractors.meltano.yml
302+
plugins:
303+
extractors:
304+
- name: tap-postgres
305+
variant: transferwise
306+
inherit_from: tap-postgres
307+
pip_url: file:///home/dev/pipelinewise-tap-postgres
308+
```
309+
310+
### To create a GH release:
311+
312+
```
313+
gh release create v2.2.2 --generate-notes --repo Kaligo/pipelinewise-tap-postgres
314+
```

tap_postgres/sync_strategies/fast_sync_rds.py

Lines changed: 50 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import datetime
1212
import time
1313
import uuid
14-
from functools import partial
1514
from typing import Dict, List, Optional
1615

1716
import psycopg2
@@ -26,6 +25,9 @@
2625

2726
LOGGER = singer.get_logger("tap_postgres")
2827

28+
# It's important to use lowercase to match the default column order.
29+
METADATA_COLUMNS = {"_sdc_batched_at", "_sdc_deleted_at", "_sdc_extracted_at"}
30+
2931

3032
class FastSyncRdsStrategy:
3133
"""
@@ -71,21 +73,54 @@ def _generate_s3_path(self, schema_name: str, table_name: str) -> str:
7173
# Remove leading slash if prefix is empty to avoid double slashes
7274
return path.lstrip("/")
7375

74-
def _prepend_metadata_columns(
75-
self, columns: Optional[List[str]] = None
76-
) -> List[str]:
77-
# Metadata columns need to go first in the same order with
78-
# pipelinewise-target-redshift/target_redshift/__init__.py#add_metadata_columns_to_schema
79-
if columns is None:
80-
columns = []
76+
def _get_metadata_column_names(self) -> List[str]:
8177
if self.conn_config.get("fast_sync_rds_add_metadata_columns", True):
82-
columns[:0] = [
83-
"NOW() AT TIME ZONE 'UTC' AS _SDC_BATCHED_AT",
84-
"NULL AS _SDC_DELETED_AT",
85-
"NOW() AT TIME ZONE 'UTC' AS _SDC_EXTRACTED_AT",
86-
]
78+
return METADATA_COLUMNS
79+
return []
80+
81+
def _get_metadata_column_sql(self, column_name: str) -> str:
82+
"""Get SQL expression for a metadata column."""
83+
# Handle both lowercase and uppercase column names
84+
column_name_lower = column_name.lower()
85+
86+
metadata_sql_map = {
87+
"_sdc_batched_at": "current_timestamp at time zone 'UTC' as _sdc_batched_at",
88+
"_sdc_deleted_at": "null as _sdc_deleted_at",
89+
"_sdc_extracted_at": "current_timestamp at time zone 'UTC' as _sdc_extracted_at",
90+
}
91+
92+
if column_name_lower in metadata_sql_map:
93+
return metadata_sql_map[column_name_lower]
94+
95+
raise ValueError(f"Unknown metadata column: {column_name}")
96+
97+
def _build_sorted_column_expressions(
98+
self, desired_columns: List[str], md_map: Dict
99+
) -> List[str]:
100+
"""
101+
Build SQL expressions for all columns (metadata + desired) in sorted order.
87102
88-
return columns
103+
Columns are sorted alphabetically to match target's schema order.
104+
This ensures the exported data column order matches the table column
105+
order exactly.
106+
107+
Args:
108+
desired_columns: List of desired column names from the source table
109+
md_map: Metadata map for column transformations
110+
111+
Returns:
112+
List of SQL expressions for columns in sorted order
113+
"""
114+
metadata_column_names = self._get_metadata_column_names()
115+
all_column_names = [*metadata_column_names, *desired_columns]
116+
# Sort columns to ensure the output CSV headers match target's schema order.
117+
all_column_names.sort()
118+
119+
return [
120+
self._get_metadata_column_sql(name) if name in metadata_column_names
121+
else post_db.prepare_columns_for_select_sql(name, md_map=md_map)
122+
for name in all_column_names
123+
]
89124

90125
def _build_select_query( # pylint: disable=too-many-arguments,too-many-positional-arguments
91126
self,
@@ -97,14 +132,7 @@ def _build_select_query( # pylint: disable=too-many-arguments,too-many-position
97132
replication_key_value: Optional[str] = None,
98133
replication_key_sql_datatype: Optional[str] = None,
99134
) -> str:
100-
columns = self._prepend_metadata_columns([])
101-
escaped_columns = list(
102-
map(
103-
partial(post_db.prepare_columns_for_select_sql, md_map=md_map),
104-
desired_columns,
105-
)
106-
)
107-
columns.extend(escaped_columns)
135+
columns = self._build_sorted_column_expressions(desired_columns, md_map)
108136

109137
return sync_common.get_query_for_replication_data(
110138
{

tests/unit/test_fast_sync_rds.py

Lines changed: 70 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -144,14 +144,14 @@ def test_sync_table_full(self, mock_open_conn):
144144
# Verify metadata columns are in the query (tests _prepend_metadata_columns indirectly)
145145
export_query = self._extract_export_query(mock_cursor)
146146
self.assertIsNotNone(export_query)
147-
self.assertIn("_SDC_BATCHED_AT", export_query)
148-
self.assertIn("_SDC_DELETED_AT", export_query)
149-
self.assertIn("_SDC_EXTRACTED_AT", export_query)
150-
151-
# Verify metadata columns order
152-
batched_pos = export_query.find("_SDC_BATCHED_AT")
153-
deleted_pos = export_query.find("_SDC_DELETED_AT")
154-
extracted_pos = export_query.find("_SDC_EXTRACTED_AT")
147+
self.assertIn("_sdc_batched_at", export_query)
148+
self.assertIn("_sdc_deleted_at", export_query)
149+
self.assertIn("_sdc_extracted_at", export_query)
150+
151+
# Verify metadata columns are present and in correct order
152+
batched_pos = export_query.find("_sdc_batched_at")
153+
deleted_pos = export_query.find("_sdc_deleted_at")
154+
extracted_pos = export_query.find("_sdc_extracted_at")
155155
id_pos = export_query.find('"id"')
156156

157157
self.assertGreater(batched_pos, 0)
@@ -160,6 +160,65 @@ def test_sync_table_full(self, mock_open_conn):
160160
if id_pos > 0:
161161
self.assertGreater(id_pos, extracted_pos)
162162

163+
@patch("tap_postgres.sync_strategies.fast_sync_rds.post_db.open_connection")
164+
def test_sync_table_full_column_ordering(self, mock_open_conn):
165+
"""Test sync_table_full orders columns alphabetically including metadata columns"""
166+
desired_columns = ["zebra", "_id", "active"]
167+
md_map_with_columns = {
168+
(): {"schema-name": "test_schema"},
169+
("properties", "_id"): {"sql-datatype": "integer"},
170+
("properties", "active"): {"sql-datatype": "boolean"},
171+
("properties", "zebra"): {"sql-datatype": "varchar"},
172+
}
173+
174+
mock_conn, mock_cursor = self._setup_mock_connection()
175+
mock_open_conn.return_value.__enter__.return_value = mock_conn
176+
177+
self.fast_sync_rds_strategy.sync_table_full(
178+
stream=self.stream,
179+
state=self.state,
180+
desired_columns=desired_columns,
181+
md_map=md_map_with_columns,
182+
)
183+
184+
# Verify export query contains all columns
185+
export_query = self._extract_export_query(mock_cursor)
186+
self.assertIsNotNone(export_query)
187+
self.assertIn("_sdc_batched_at", export_query)
188+
self.assertIn("_sdc_deleted_at", export_query)
189+
self.assertIn("_sdc_extracted_at", export_query)
190+
self.assertIn('"_id"', export_query)
191+
self.assertIn('"active"', export_query)
192+
self.assertIn('"zebra"', export_query)
193+
194+
# Verify complete column ordering: all columns should be sorted alphabetically
195+
# Expected order: _id, _sdc_batched_at, _sdc_deleted_at, _sdc_extracted_at, active, zebra
196+
expected_columns = [
197+
'"_id"',
198+
"_sdc_batched_at",
199+
"_sdc_deleted_at",
200+
"_sdc_extracted_at",
201+
'"active"',
202+
'"zebra"',
203+
]
204+
205+
# Find positions of all columns
206+
column_positions = [export_query.find(col) for col in expected_columns]
207+
208+
self.assertEqual(len(column_positions), len(expected_columns))
209+
210+
# Verify all columns are found
211+
for i, (col, pos) in enumerate(zip(expected_columns, column_positions)):
212+
self.assertGreater(pos, 0, f"{col} should be found in query")
213+
214+
# Verify columns are in correct order (positions should be ascending)
215+
self.assertEqual(
216+
column_positions,
217+
sorted(column_positions),
218+
f"Columns should be in alphabetical order. Found positions: {dict(zip(expected_columns, column_positions))}",
219+
)
220+
221+
163222
@patch("tap_postgres.sync_strategies.fast_sync_rds.post_db.open_connection")
164223
def test_sync_table_full_no_prefix(self, mock_open_conn):
165224
"""Test sync_table_full with empty prefix - verifies S3 path generation"""
@@ -205,9 +264,9 @@ def test_sync_table_full_metadata_columns_disabled(self, mock_open_conn):
205264
# Verify metadata columns are NOT in the query
206265
export_query = self._extract_export_query(mock_cursor)
207266
self.assertIsNotNone(export_query)
208-
self.assertNotIn("_SDC_BATCHED_AT", export_query)
209-
self.assertNotIn("_SDC_DELETED_AT", export_query)
210-
self.assertNotIn("_SDC_EXTRACTED_AT", export_query)
267+
self.assertNotIn("_sdc_batched_at", export_query)
268+
self.assertNotIn("_sdc_deleted_at", export_query)
269+
self.assertNotIn("_sdc_extracted_at", export_query)
211270

212271
@patch("tap_postgres.sync_strategies.fast_sync_rds.post_db.open_connection")
213272
@patch("tap_postgres.sync_strategies.fast_sync_rds.singer.get_bookmark")

0 commit comments

Comments
 (0)