Skip to content

Commit fe9e55b

Browse files
authored
Merge pull request #18 from Kaligo/feat/data-24770-fast-sync-transformation
feat(poc): support on the fly transformation in fast-sync mode [DATA-24770]
2 parents 3e9f6cd + dedbb11 commit fe9e55b

File tree

3 files changed

+306
-2
lines changed

3 files changed

+306
-2
lines changed

tap_postgres/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -612,6 +612,9 @@ def main_impl():
612612
"fast_sync_rds_s3_bucket": args.config.get("fast_sync_rds_s3_bucket"),
613613
"fast_sync_rds_s3_prefix": args.config.get("fast_sync_rds_s3_prefix", ""),
614614
"fast_sync_rds_s3_region": args.config.get("fast_sync_rds_s3_region"),
615+
"fast_sync_rds_transformations": args.config.get(
616+
"fast_sync_rds_transformations", {}
617+
),
615618
}
616619

617620
if conn_config["use_secondary"]:

tap_postgres/sync_strategies/fast_sync_rds.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,10 @@ def _convert_array_column_to_json(self, column_name: str) -> str:
123123
return f"array_to_json({column_identifier}) AS {column_identifier}"
124124

125125
def _build_sorted_column_expressions(
126-
self, desired_columns: List[str], md_map: Dict
126+
self,
127+
desired_columns: List[str],
128+
md_map: Dict,
129+
tap_stream_id: Optional[str] = None,
127130
) -> List[str]:
128131
"""
129132
Build SQL expressions for all columns (metadata + desired) in sorted order.
@@ -135,9 +138,13 @@ def _build_sorted_column_expressions(
135138
Array columns are automatically converted from PostgreSQL array format
136139
(e.g., {fashion}) to JSON format (e.g., ["fashion"]) using array_to_json().
137140
141+
If transformations are configured for the stream, they will be applied
142+
to the specified columns instead of the default column expressions.
143+
138144
Args:
139145
desired_columns: List of desired column names from the source table
140146
md_map: Metadata map for column transformations
147+
tap_stream_id: Optional stream ID to look up transformations
141148
142149
Returns:
143150
List of SQL expressions for columns in sorted order
@@ -147,10 +154,24 @@ def _build_sorted_column_expressions(
147154
# Sort columns to ensure the output CSV headers match target's schema order.
148155
all_column_names.sort()
149156

157+
# Get transformations for this stream if available
158+
transformations = {}
159+
if tap_stream_id:
160+
all_transformations = self.conn_config.get(
161+
"fast_sync_rds_transformations", {}
162+
)
163+
transformations = all_transformations.get(tap_stream_id, {})
164+
150165
column_expressions = []
151166
for name in all_column_names:
152167
if name in metadata_column_names:
153168
column_expressions.append(self._get_metadata_column_sql(name))
169+
elif name in transformations:
170+
transformation_sql = transformations[name]
171+
column_identifier = post_db.prepare_columns_sql(name)
172+
column_expressions.append(
173+
f"({transformation_sql}) AS {column_identifier}"
174+
)
154175
elif self._is_array_column(name, md_map):
155176
column_expressions.append(self._convert_array_column_to_json(name))
156177
else:
@@ -169,8 +190,11 @@ def _build_select_query( # pylint: disable=too-many-arguments,too-many-position
169190
replication_key: Optional[str] = None,
170191
replication_key_value: Optional[str] = None,
171192
replication_key_sql_datatype: Optional[str] = None,
193+
tap_stream_id: Optional[str] = None,
172194
) -> str:
173-
columns = self._build_sorted_column_expressions(desired_columns, md_map)
195+
columns = self._build_sorted_column_expressions(
196+
desired_columns, md_map, tap_stream_id
197+
)
174198

175199
return sync_common.get_query_for_replication_data(
176200
{
@@ -413,6 +437,7 @@ def sync_table( # pylint: disable=too-many-arguments,too-many-positional-argume
413437
replication_key=replication_key,
414438
replication_key_value=replication_key_value,
415439
replication_key_sql_datatype=replication_key_sql_datatype,
440+
tap_stream_id=stream["tap_stream_id"],
416441
)
417442

418443
s3_path = self._generate_s3_path(schema_name, table_name)

tests/unit/test_fast_sync_rds.py

Lines changed: 276 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -670,3 +670,279 @@ def test_sync_table_incremental_with_array_columns(
670670
r'array_to_json\s*\(\s*["\s]*name["\s]*\)',
671671
"Non-array column 'name' should not use array_to_json",
672672
)
673+
674+
@patch("tap_postgres.sync_strategies.fast_sync_rds.post_db.open_connection")
675+
def test_sync_table_full_with_transformations(self, mock_open_conn):
676+
"""Test sync_table_full applies transformations when configured"""
677+
# Configure transformations for the stream
678+
conn_config_with_transforms = self.conn_config.copy()
679+
conn_config_with_transforms["fast_sync_rds_transformations"] = {
680+
"test_schema-test_table": {
681+
"name": "UPPER(name)",
682+
"id": "id * 2",
683+
}
684+
}
685+
strategy = fast_sync_rds.FastSyncRdsStrategy(
686+
conn_config_with_transforms, self.s3_bucket, self.s3_prefix, self.s3_region
687+
)
688+
689+
mock_conn, mock_cursor = self._setup_mock_connection()
690+
mock_open_conn.return_value.__enter__.return_value = mock_conn
691+
692+
strategy.sync_table_full(
693+
stream=self.stream,
694+
state=self.state,
695+
desired_columns=self.desired_columns,
696+
md_map=self.md_map,
697+
)
698+
699+
# Verify transformation SQL is in the export query
700+
export_query = self._extract_export_query(mock_cursor)
701+
self.assertIsNotNone(export_query)
702+
self.assertIn("UPPER(name)", export_query)
703+
self.assertIn("id * 2", export_query)
704+
# Verify the transformations are wrapped with column aliases
705+
self.assertIn('(UPPER(name)) AS "name"', export_query)
706+
self.assertIn('(id * 2) AS "id"', export_query)
707+
708+
@patch("tap_postgres.sync_strategies.fast_sync_rds.post_db.open_connection")
709+
def test_sync_table_full_without_transformations(self, mock_open_conn):
710+
"""Test sync_table_full does not apply transformations when not configured"""
711+
mock_conn, mock_cursor = self._setup_mock_connection()
712+
mock_open_conn.return_value.__enter__.return_value = mock_conn
713+
714+
self.fast_sync_rds_strategy.sync_table_full(
715+
stream=self.stream,
716+
state=self.state,
717+
desired_columns=self.desired_columns,
718+
md_map=self.md_map,
719+
)
720+
721+
# Verify normal column references are used (not transformations)
722+
export_query = self._extract_export_query(mock_cursor)
723+
self.assertIsNotNone(export_query)
724+
# Should use normal column references, not transformation expressions
725+
self.assertIn('"id"', export_query)
726+
self.assertIn('"name"', export_query)
727+
# Should not have transformation-like patterns
728+
self.assertNotIn("UPPER", export_query)
729+
self.assertNotIn(" * 2", export_query)
730+
731+
@patch("tap_postgres.sync_strategies.fast_sync_rds.post_db.open_connection")
732+
def test_sync_table_full_transformations_different_stream(self, mock_open_conn):
733+
"""Test transformations are only applied to the configured stream"""
734+
# Configure transformations for a different stream
735+
conn_config_with_transforms = self.conn_config.copy()
736+
conn_config_with_transforms["fast_sync_rds_transformations"] = {
737+
"other_schema-other_table": {
738+
"name": "UPPER(name)",
739+
}
740+
}
741+
strategy = fast_sync_rds.FastSyncRdsStrategy(
742+
conn_config_with_transforms, self.s3_bucket, self.s3_prefix, self.s3_region
743+
)
744+
745+
mock_conn, mock_cursor = self._setup_mock_connection()
746+
mock_open_conn.return_value.__enter__.return_value = mock_conn
747+
748+
strategy.sync_table_full(
749+
stream=self.stream,
750+
state=self.state,
751+
desired_columns=self.desired_columns,
752+
md_map=self.md_map,
753+
)
754+
755+
# Verify transformation is NOT applied (different stream)
756+
export_query = self._extract_export_query(mock_cursor)
757+
self.assertIsNotNone(export_query)
758+
self.assertNotIn("UPPER(name)", export_query)
759+
# Should use normal column references
760+
self.assertIn('"name"', export_query)
761+
762+
@patch("tap_postgres.sync_strategies.fast_sync_rds.post_db.open_connection")
763+
@patch("tap_postgres.sync_strategies.fast_sync_rds.singer.get_bookmark")
764+
def test_sync_table_incremental_with_transformations(
765+
self, mock_get_bookmark, mock_open_conn
766+
):
767+
"""Test sync_table_incremental applies transformations when configured"""
768+
mock_get_bookmark.side_effect = lambda state, stream_id, key: {
769+
"version": 1234567890,
770+
"replication_key_value": "100",
771+
}.get(key)
772+
773+
# Configure transformations
774+
conn_config_with_transforms = self.conn_config.copy()
775+
conn_config_with_transforms["fast_sync_rds_transformations"] = {
776+
"test_schema-test_table": {
777+
"name": "LOWER(name)",
778+
}
779+
}
780+
strategy = fast_sync_rds.FastSyncRdsStrategy(
781+
conn_config_with_transforms, self.s3_bucket, self.s3_prefix, self.s3_region
782+
)
783+
784+
md_map_with_key = self.md_map.copy()
785+
md_map_with_key[()]["replication-key"] = "id"
786+
md_map_with_key[("properties", "id")] = {"sql-datatype": "integer"}
787+
788+
mock_replication_key_result = MagicMock()
789+
mock_replication_key_result.__getitem__.return_value = "200"
790+
791+
mock_conn, mock_cursor = self._setup_mock_connection(
792+
export_result=self._create_mock_export_result(rows=25, bytes_uploaded=1250),
793+
replication_key_result=mock_replication_key_result,
794+
)
795+
mock_open_conn.return_value.__enter__.return_value = mock_conn
796+
797+
strategy.sync_table_incremental(
798+
stream=self.stream,
799+
state=self.state,
800+
desired_columns=self.desired_columns,
801+
md_map=md_map_with_key,
802+
replication_key="id",
803+
replication_key_value="100",
804+
)
805+
806+
# Verify transformation SQL is in the export query
807+
export_query = self._extract_export_query(mock_cursor)
808+
self.assertIsNotNone(export_query)
809+
self.assertIn("LOWER(name)", export_query)
810+
self.assertIn('(LOWER(name)) AS "name"', export_query)
811+
812+
@patch("tap_postgres.sync_strategies.fast_sync_rds.post_db.open_connection")
813+
def test_sync_table_full_transformations_with_array_columns(self, mock_open_conn):
814+
"""Test transformations work correctly with array columns"""
815+
# Configure transformations for a non-array column
816+
conn_config_with_transforms = self.conn_config.copy()
817+
conn_config_with_transforms["fast_sync_rds_transformations"] = {
818+
"test_schema-test_table": {
819+
"name": "UPPER(name)",
820+
}
821+
}
822+
strategy = fast_sync_rds.FastSyncRdsStrategy(
823+
conn_config_with_transforms, self.s3_bucket, self.s3_prefix, self.s3_region
824+
)
825+
826+
desired_columns = ["id", "name", "_text"]
827+
md_map_with_arrays = {
828+
(): {"schema-name": "test_schema"},
829+
("properties", "id"): {"sql-datatype": "integer"},
830+
("properties", "name"): {"sql-datatype": "varchar"},
831+
("properties", "_text"): {"sql-datatype": "text[]"},
832+
}
833+
834+
mock_conn, mock_cursor = self._setup_mock_connection()
835+
mock_open_conn.return_value.__enter__.return_value = mock_conn
836+
837+
strategy.sync_table_full(
838+
stream=self.stream,
839+
state=self.state,
840+
desired_columns=desired_columns,
841+
md_map=md_map_with_arrays,
842+
)
843+
844+
# Verify transformation is applied to non-array column
845+
export_query = self._extract_export_query(mock_cursor)
846+
self.assertIsNotNone(export_query)
847+
self.assertIn("UPPER(name)", export_query)
848+
self.assertIn('(UPPER(name)) AS "name"', export_query)
849+
850+
# Verify array column still uses array_to_json (not transformation)
851+
self.assertIn("array_to_json", export_query)
852+
self.assertIn("_text", export_query)
853+
self.assertRegex(
854+
export_query,
855+
r'array_to_json\s*\(\s*["\s]*_text["\s]*\)',
856+
"Array column '_text' should use array_to_json, not transformation",
857+
)
858+
859+
@patch("tap_postgres.sync_strategies.fast_sync_rds.post_db.open_connection")
860+
def test_sync_table_full_transformations_with_metadata_columns(
861+
self, mock_open_conn
862+
):
863+
"""Test transformations work correctly with metadata columns present"""
864+
# Configure transformations
865+
conn_config_with_transforms = self.conn_config.copy()
866+
conn_config_with_transforms["fast_sync_rds_transformations"] = {
867+
"test_schema-test_table": {
868+
"name": "COALESCE(name, 'N/A')",
869+
}
870+
}
871+
strategy = fast_sync_rds.FastSyncRdsStrategy(
872+
conn_config_with_transforms, self.s3_bucket, self.s3_prefix, self.s3_region
873+
)
874+
875+
mock_conn, mock_cursor = self._setup_mock_connection()
876+
mock_open_conn.return_value.__enter__.return_value = mock_conn
877+
878+
strategy.sync_table_full(
879+
stream=self.stream,
880+
state=self.state,
881+
desired_columns=self.desired_columns,
882+
md_map=self.md_map,
883+
)
884+
885+
# Verify transformation is applied
886+
export_query = self._extract_export_query(mock_cursor)
887+
self.assertIsNotNone(export_query)
888+
# SQL escaping converts single quotes to double single quotes
889+
self.assertIn("COALESCE(name, ''N/A'')", export_query)
890+
self.assertRegex(
891+
export_query,
892+
r'\(COALESCE\(name,\s+\'\'N/A\'\'\)\)\s+AS\s+"name"',
893+
"Transformation for 'name' should be wrapped with alias",
894+
)
895+
896+
# Verify metadata columns are still present and not transformed
897+
self.assertIn("_sdc_batched_at", export_query)
898+
self.assertIn("_sdc_deleted_at", export_query)
899+
self.assertIn("_sdc_extracted_at", export_query)
900+
# Metadata columns should use their standard SQL, not transformations
901+
# SQL escaping converts single quotes to double single quotes
902+
self.assertIn(
903+
"current_timestamp at time zone ''UTC'' as _sdc_batched_at", export_query
904+
)
905+
906+
@patch("tap_postgres.sync_strategies.fast_sync_rds.post_db.open_connection")
907+
def test_sync_table_full_transformations_partial_columns(self, mock_open_conn):
908+
"""Test transformations can be applied to only some columns"""
909+
# Configure transformation for only one column
910+
conn_config_with_transforms = self.conn_config.copy()
911+
conn_config_with_transforms["fast_sync_rds_transformations"] = {
912+
"test_schema-test_table": {
913+
"name": "UPPER(name)",
914+
# No transformation for "id"
915+
}
916+
}
917+
strategy = fast_sync_rds.FastSyncRdsStrategy(
918+
conn_config_with_transforms, self.s3_bucket, self.s3_prefix, self.s3_region
919+
)
920+
921+
mock_conn, mock_cursor = self._setup_mock_connection()
922+
mock_open_conn.return_value.__enter__.return_value = mock_conn
923+
924+
strategy.sync_table_full(
925+
stream=self.stream,
926+
state=self.state,
927+
desired_columns=self.desired_columns,
928+
md_map=self.md_map,
929+
)
930+
931+
# Verify transformation is applied to "name"
932+
export_query = self._extract_export_query(mock_cursor)
933+
self.assertIsNotNone(export_query)
934+
self.assertIn("UPPER(name)", export_query)
935+
self.assertRegex(
936+
export_query,
937+
r'\(UPPER\(name\)\)\s+AS\s+"name"',
938+
"Transformation for 'name' should be wrapped with alias",
939+
)
940+
941+
# Verify "id" uses normal column reference (no transformation)
942+
self.assertIn('"id"', export_query)
943+
# Should not have transformation for id
944+
self.assertNotRegex(
945+
export_query,
946+
r'\(.*\)\s+AS\s+"id"',
947+
"Column 'id' should not have transformation",
948+
)

0 commit comments

Comments
 (0)