Skip to content

Commit 3e9f6cd

Browse files
authored
Merge pull request #17 from Kaligo/fix/data-24484-array-to-json
fix: convert array types to JSON in fast-sync flow [DATA-24484]
2 parents 1b21ad3 + fd81061 commit 3e9f6cd

File tree

2 files changed

+181
-6
lines changed

2 files changed

+181
-6
lines changed

tap_postgres/sync_strategies/fast_sync_rds.py

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,34 @@ def _get_metadata_column_sql(self, column_name: str) -> str:
9494

9595
raise ValueError(f"Unknown metadata column: {column_name}")
9696

97+
def _is_array_column(self, column_name: str, md_map: Dict) -> bool:
98+
"""
99+
Check if a column is an array type based on metadata map.
100+
"""
101+
column_metadata = md_map.get(("properties", column_name), {})
102+
sql_datatype = column_metadata.get("sql-datatype", "")
103+
# Array types in PostgreSQL have '[]' suffix in sql-datatype
104+
return sql_datatype.endswith("[]")
105+
106+
def _convert_array_column_to_json(self, column_name: str) -> str:
107+
"""
108+
Convert an array column to JSON format using PostgreSQL's array_to_json function.
109+
110+
This converts PostgreSQL array format (e.g., {fashion}) to JSON format (e.g., ["fashion"]).
111+
The conversion happens before CSV export, so the exported CSV will contain JSON arrays
112+
instead of PostgreSQL array format.
113+
114+
Args:
115+
column_name: Column name to convert
116+
117+
Returns:
118+
SQL expression that converts array to JSON format
119+
"""
120+
column_identifier = post_db.prepare_columns_sql(column_name)
121+
# array_to_json converts PostgreSQL arrays to JSON arrays
122+
# Returns NULL for NULL arrays (which is correct)
123+
return f"array_to_json({column_identifier}) AS {column_identifier}"
124+
97125
def _build_sorted_column_expressions(
98126
self, desired_columns: List[str], md_map: Dict
99127
) -> List[str]:
@@ -104,6 +132,9 @@ def _build_sorted_column_expressions(
104132
This ensures the exported data column order matches the table column
105133
order exactly.
106134
135+
Array columns are automatically converted from PostgreSQL array format
136+
(e.g., {fashion}) to JSON format (e.g., ["fashion"]) using array_to_json().
137+
107138
Args:
108139
desired_columns: List of desired column names from the source table
109140
md_map: Metadata map for column transformations
@@ -116,11 +147,18 @@ def _build_sorted_column_expressions(
116147
# Sort columns to ensure the output CSV headers match target's schema order.
117148
all_column_names.sort()
118149

119-
return [
120-
self._get_metadata_column_sql(name) if name in metadata_column_names
121-
else post_db.prepare_columns_for_select_sql(name, md_map=md_map)
122-
for name in all_column_names
123-
]
150+
column_expressions = []
151+
for name in all_column_names:
152+
if name in metadata_column_names:
153+
column_expressions.append(self._get_metadata_column_sql(name))
154+
elif self._is_array_column(name, md_map):
155+
column_expressions.append(self._convert_array_column_to_json(name))
156+
else:
157+
column_expressions.append(
158+
post_db.prepare_columns_for_select_sql(name, md_map=md_map)
159+
)
160+
161+
return column_expressions
124162

125163
def _build_select_query( # pylint: disable=too-many-arguments,too-many-positional-arguments
126164
self,

tests/unit/test_fast_sync_rds.py

Lines changed: 138 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,6 @@ def test_sync_table_full_column_ordering(self, mock_open_conn):
218218
f"Columns should be in alphabetical order. Found positions: {dict(zip(expected_columns, column_positions))}",
219219
)
220220

221-
222221
@patch("tap_postgres.sync_strategies.fast_sync_rds.post_db.open_connection")
223222
def test_sync_table_full_no_prefix(self, mock_open_conn):
224223
"""Test sync_table_full with empty prefix - verifies S3 path generation"""
@@ -533,3 +532,141 @@ def test_do_sync_incremental_backward_compatibility_with_fast_sync_s3_info(
533532
self.assertIn(
534533
"fast_sync_s3_info", result["bookmarks"]["test_schema-test_table"]
535534
)
535+
536+
@patch("tap_postgres.sync_strategies.fast_sync_rds.post_db.open_connection")
537+
def test_sync_table_full_with_array_columns(self, mock_open_conn):
538+
"""Test sync_table_full converts array columns to JSON format in export query"""
539+
# Test with mixed array types (text, integer, date, timestamp) and non-array columns
540+
desired_columns = ["id", "_text", "name", "_date", "_int", "_timestamp"]
541+
md_map_with_arrays = {
542+
(): {"schema-name": "test_schema"},
543+
("properties", "id"): {"sql-datatype": "integer"},
544+
("properties", "_text"): {"sql-datatype": "text[]"},
545+
("properties", "name"): {"sql-datatype": "varchar"},
546+
("properties", "_date"): {"sql-datatype": "date[]"},
547+
("properties", "_int"): {"sql-datatype": "integer[]"},
548+
("properties", "_timestamp"): {
549+
"sql-datatype": "timestamp without time zone[]"
550+
},
551+
}
552+
553+
mock_conn, mock_cursor = self._setup_mock_connection()
554+
mock_open_conn.return_value.__enter__.return_value = mock_conn
555+
556+
self.fast_sync_rds_strategy.sync_table_full(
557+
stream=self.stream,
558+
state=self.state,
559+
desired_columns=desired_columns,
560+
md_map=md_map_with_arrays,
561+
)
562+
563+
# Verify export query contains array_to_json for array columns
564+
export_query = self._extract_export_query(mock_cursor)
565+
self.assertIsNotNone(export_query)
566+
567+
# Array columns should use array_to_json conversion
568+
self.assertIn("array_to_json", export_query)
569+
570+
# Count array_to_json occurrences - should match number of array columns
571+
array_columns = ["_text", "_date", "_int", "_timestamp"]
572+
array_to_json_count = export_query.count("array_to_json")
573+
self.assertEqual(
574+
array_to_json_count,
575+
len(array_columns),
576+
"All array columns should use array_to_json",
577+
)
578+
579+
# Verify each array column is present and wrapped with array_to_json
580+
# (accounting for possible whitespace variations in SQL formatting)
581+
for col in array_columns:
582+
self.assertIn(col, export_query)
583+
self.assertRegex(
584+
export_query,
585+
rf'array_to_json\s*\(\s*["\s]*{col}["\s]*\)',
586+
f"Array column '{col}' should be wrapped with array_to_json",
587+
)
588+
589+
# Non-array columns should not be wrapped with array_to_json
590+
non_array_columns = ["id", "name"]
591+
for col in non_array_columns:
592+
self.assertIn(f'"{col}"', export_query)
593+
# Verify they are not wrapped
594+
self.assertNotRegex(
595+
export_query,
596+
rf'array_to_json\s*\(\s*["\s]*{col}["\s]*\)',
597+
f"Non-array column '{col}' should not use array_to_json",
598+
)
599+
600+
@patch("tap_postgres.sync_strategies.fast_sync_rds.post_db.open_connection")
601+
@patch("tap_postgres.sync_strategies.fast_sync_rds.singer.get_bookmark")
602+
def test_sync_table_incremental_with_array_columns(
603+
self, mock_get_bookmark, mock_open_conn
604+
):
605+
"""Test sync_table_incremental converts array columns to JSON format"""
606+
mock_get_bookmark.side_effect = lambda state, stream_id, key: {
607+
"version": 1234567890,
608+
"replication_key_value": "100",
609+
}.get(key)
610+
611+
desired_columns = ["id", "_text", "name", "_int"]
612+
md_map_with_arrays = {
613+
(): {"schema-name": "test_schema", "replication-key": "id"},
614+
("properties", "id"): {"sql-datatype": "integer"},
615+
("properties", "_text"): {"sql-datatype": "text[]"},
616+
("properties", "name"): {"sql-datatype": "varchar"},
617+
("properties", "_int"): {"sql-datatype": "integer[]"},
618+
}
619+
620+
mock_replication_key_result = MagicMock()
621+
mock_replication_key_result.__getitem__.return_value = "200"
622+
623+
mock_conn, mock_cursor = self._setup_mock_connection(
624+
export_result=self._create_mock_export_result(rows=25, bytes_uploaded=1250),
625+
replication_key_result=mock_replication_key_result,
626+
)
627+
mock_open_conn.return_value.__enter__.return_value = mock_conn
628+
629+
self.fast_sync_rds_strategy.sync_table_incremental(
630+
stream=self.stream,
631+
state=self.state,
632+
desired_columns=desired_columns,
633+
md_map=md_map_with_arrays,
634+
replication_key="id",
635+
replication_key_value="100",
636+
)
637+
638+
# Verify export query contains array_to_json for array columns
639+
export_query = self._extract_export_query(mock_cursor)
640+
self.assertIsNotNone(export_query)
641+
642+
# Array columns should use array_to_json
643+
self.assertIn("array_to_json", export_query)
644+
self.assertIn("_text", export_query)
645+
self.assertIn("_int", export_query)
646+
647+
# Verify array columns are wrapped correctly (with flexible whitespace)
648+
self.assertRegex(
649+
export_query,
650+
r'array_to_json\s*\(\s*["\s]*_text["\s]*\)',
651+
"Array column '_text' should be wrapped with array_to_json",
652+
)
653+
self.assertRegex(
654+
export_query,
655+
r'array_to_json\s*\(\s*["\s]*_int["\s]*\)',
656+
"Array column '_int' should be wrapped with array_to_json",
657+
)
658+
659+
# Non-array columns should not be wrapped
660+
self.assertIn('"id"', export_query)
661+
self.assertIn('"name"', export_query)
662+
# Verify they are not wrapped with array_to_json
663+
self.assertNotRegex(
664+
export_query,
665+
r'array_to_json\s*\(\s*["\s]*id["\s]*\)',
666+
"Non-array column 'id' should not use array_to_json",
667+
)
668+
self.assertNotRegex(
669+
export_query,
670+
r'array_to_json\s*\(\s*["\s]*name["\s]*\)',
671+
"Non-array column 'name' should not use array_to_json",
672+
)

0 commit comments

Comments
 (0)