Skip to content

Commit e940657

Browse files
Merge pull request #19 from Kaligo/DATA-27839/define-on-fly-mapper-cols
feat: featuring adding new on-fly col if configured [DATA-27839]
2 parents fe9e55b + 57763b5 commit e940657

File tree

4 files changed

+135
-29
lines changed

4 files changed

+135
-29
lines changed

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
setup(
99
name="pipelinewise-tap-postgres",
10-
version="2.2.1",
10+
version="2.2.6",
1111
description="Singer.io tap for extracting data from PostgresSQL - PipelineWise compatible",
1212
long_description=long_description,
1313
long_description_content_type="text/markdown",

tap_postgres/__init__.py

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,14 @@ def do_fast_sync_rds(conn_config, stream, state, desired_columns, md_map, sync_m
150150
state = singer.write_bookmark(
151151
state, stream["tap_stream_id"], "replication_key", replication_key
152152
)
153+
154+
desired_columns = fast_sync_rds_strategy.prepare_stream_with_extra_columns(
155+
desired_columns, stream
156+
)
157+
153158
sync_common.send_schema_message(
154-
stream, [replication_key] if replication_key else []
159+
stream,
160+
[replication_key] if replication_key else [],
155161
)
156162

157163
LOGGER.info("Performing %s", sync_method)
@@ -598,15 +604,15 @@ def main_impl():
598604
),
599605
"use_secondary": args.config.get("use_secondary", False),
600606
"limit": int(limit) if limit else None,
601-
"skip_last_n_seconds": int(skip_last_n_seconds)
602-
if skip_last_n_seconds
603-
else None,
604-
"look_back_n_seconds": int(look_back_n_seconds)
605-
if look_back_n_seconds
606-
else None,
607-
"recover_mappings": ast.literal_eval(recover_mappings)
608-
if recover_mappings
609-
else {},
607+
"skip_last_n_seconds": (
608+
int(skip_last_n_seconds) if skip_last_n_seconds else None
609+
),
610+
"look_back_n_seconds": (
611+
int(look_back_n_seconds) if look_back_n_seconds else None
612+
),
613+
"recover_mappings": (
614+
ast.literal_eval(recover_mappings) if recover_mappings else {}
615+
),
610616
# Fast sync RDS config
611617
"fast_sync_rds": args.config.get("fast_sync_rds", False),
612618
"fast_sync_rds_s3_bucket": args.config.get("fast_sync_rds_s3_bucket"),

tap_postgres/sync_strategies/fast_sync_rds.py

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,25 @@ def _get_metadata_column_names(self) -> List[str]:
7777
if self.conn_config.get("fast_sync_rds_add_metadata_columns", True):
7878
return METADATA_COLUMNS
7979
return []
80+
81+
def _get_fast_sync_rds_transformation(self, tap_stream_id: str) -> Dict[str, str]:
82+
return self.conn_config.get("fast_sync_rds_transformations", {}).get(
83+
tap_stream_id, {}
84+
)
85+
86+
def _get_extra_column_properties(
87+
self, desired_columns: List[str], stream: Dict
88+
) -> List[str]:
89+
transformation_columns = self._get_fast_sync_rds_transformation(
90+
stream["tap_stream_id"]
91+
)
92+
all_columns = {*self._get_metadata_column_names(), *desired_columns}
93+
extra_columns = [
94+
col for col in transformation_columns if col not in all_columns
95+
]
96+
97+
# On-the-fly columns use string type; other types could be added if needed.
98+
return {col: {"type": ["string", "null"]} for col in extra_columns}
8099

81100
def _get_metadata_column_sql(self, column_name: str) -> str:
82101
"""Get SQL expression for a metadata column."""
@@ -155,12 +174,7 @@ def _build_sorted_column_expressions(
155174
all_column_names.sort()
156175

157176
# Get transformations for this stream if available
158-
transformations = {}
159-
if tap_stream_id:
160-
all_transformations = self.conn_config.get(
161-
"fast_sync_rds_transformations", {}
162-
)
163-
transformations = all_transformations.get(tap_stream_id, {})
177+
transformations = self._get_fast_sync_rds_transformation(tap_stream_id)
164178

165179
column_expressions = []
166180
for name in all_column_names:
@@ -380,6 +394,30 @@ def _track_latest_replication_key_value( # pylint: disable=invalid-name,too-man
380394

381395
return state
382396

397+
def prepare_stream_with_extra_columns(
398+
self, desired_columns: List[str], stream: Dict
399+
):
400+
"""
401+
Prepare stream with extra columns for fast sync RDS strategy.
402+
Extra columns are columns that are created on the fly (not in orginal catalog)
403+
provided by fast_sync_rds_transformations configuration.
404+
To make it simple, their type is always ["string", "null"].
405+
406+
Args:
407+
desired_columns: List of desired column names from the source table
408+
stream: Stream dictionary
409+
410+
Returns:
411+
List of desired column names with extra columns
412+
"""
413+
extra_columns_properties = self._get_extra_column_properties(
414+
desired_columns, stream
415+
)
416+
stream["schema"]["properties"].update(extra_columns_properties)
417+
desired_columns.extend(extra_columns_properties.keys())
418+
desired_columns.sort()
419+
return desired_columns
420+
383421
def sync_table( # pylint: disable=too-many-arguments,too-many-positional-arguments,too-many-locals
384422
self,
385423
stream: Dict,

0 commit comments

Comments
 (0)