From e175e67283831054dc665a03de10f33922be825b Mon Sep 17 00:00:00 2001 From: Ben Allred Date: Thu, 12 Mar 2026 09:33:16 -0600 Subject: [PATCH 01/10] add warning logs for debugging --- tap_s3_csv/sync.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/tap_s3_csv/sync.py b/tap_s3_csv/sync.py index 4968120..da536cf 100644 --- a/tap_s3_csv/sync.py +++ b/tap_s3_csv/sync.py @@ -31,8 +31,10 @@ def sync_stream(config, state, table_spec, stream, sync_start_time): LOGGER.info('Syncing table "%s".', table_name) LOGGER.info('Getting files modified since %s.', modified_since) - s3_files = s3.get_input_files_for_table( - config, table_spec, modified_since) + s3_files = [x for x in s3.get_input_files_for_table(config, table_spec, modified_since)] + LOGGER.warning('XXXXX sync_stream config: %s', config) + LOGGER.warning('XXXXX sync_stream table_spec: %s', table_spec) + LOGGER.warning('XXXXX s3_files include %s files', len(s3_files)) records_streamed = 0 @@ -41,8 +43,8 @@ def sync_stream(config, state, table_spec, stream, sync_start_time): # we can sort in memory which is suboptimal. If we could bookmark # based on anything else then we could just sync files as we see them. for s3_file in sorted(s3_files, key=lambda item: item['last_modified']): - records_streamed += sync_table_file( - config, s3_file['key'], table_spec, stream) + LOGGER.warning('XXXXX attempting to sync %s', s3_file['key']) + records_streamed += sync_table_file(config, s3_file['key'], table_spec, stream) if s3_file['last_modified'] < sync_start_time: state = singer.write_bookmark(state, table_name, 'modified_since', s3_file['last_modified'].isoformat()) else: @@ -50,7 +52,7 @@ def sync_stream(config, state, table_spec, stream, sync_start_time): singer.write_state(state) if s3.skipped_files_count: - LOGGER.warn("%s files got skipped during the last sync.",s3.skipped_files_count) + LOGGER.warning("%s files got skipped during the last sync.",s3.skipped_files_count) LOGGER.info('Wrote %s records for table "%s".', records_streamed, table_name) @@ -202,7 +204,9 @@ def sync_csv_file(config, file_handle, s3_path, table_spec, stream): # memory consumption but that's acceptable as well. csv.field_size_limit(sys.maxsize) + LOGGER.warning('XXXXX syncing csv file') if "properties" in stream["schema"]: + LOGGER.warning('XXXXX properties keys: %s', stream["schema"]["properties"].keys()) iterator = csv_helper.get_row_iterator( file_handle, table_spec, stream["schema"]["properties"].keys(), True) else: From 7175d5fcec9a5d9acad62a0c98c0456bfcabe80e Mon Sep 17 00:00:00 2001 From: Ben Allred Date: Thu, 12 Mar 2026 11:39:48 -0600 Subject: [PATCH 02/10] more debuggery --- tap_s3_csv/s3.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tap_s3_csv/s3.py b/tap_s3_csv/s3.py index e339f0b..14a9955 100644 --- a/tap_s3_csv/s3.py +++ b/tap_s3_csv/s3.py @@ -616,7 +616,9 @@ def get_input_files_for_table(config, table_spec, modified_since=None): matched_files_count = 0 unmatched_files_count = 0 max_files_before_log = 30000 - for s3_object in list_files_in_bucket(config, table_spec.get('search_prefix')): + s3_files = [x for x in list_files_in_bucket(config, table_spec.get('search_prefix'))] + LOGGER.warning('XXXXX s3_files %s', len(s3_files)) + for s3_object in s3_files: key = s3_object['Key'] last_modified = s3_object['LastModified'] @@ -626,6 +628,9 @@ def get_input_files_for_table(config, table_spec, modified_since=None): unmatched_files_count += 1 continue + LOGGER.warning('XXXXX modified_since %s and last_modified %s', modified_since, last_modified) + if modified_since is not None and last_modified is not None: + LOGGER.warning('XXXXX modified_since < last_modified: %s', modified_since < last_modified) if matcher.search(key): matched_files_count += 1 if modified_since is None or modified_since < last_modified: From 69f2313d077181937bcc8a655e4321d61598c845 Mon Sep 17 00:00:00 2001 From: Ben Allred Date: Thu, 12 Mar 2026 11:46:42 -0600 Subject: [PATCH 03/10] even more debuggery --- tap_s3_csv/s3.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tap_s3_csv/s3.py b/tap_s3_csv/s3.py index 14a9955..85df1d5 100644 --- a/tap_s3_csv/s3.py +++ b/tap_s3_csv/s3.py @@ -278,8 +278,12 @@ def setup_s3fs_client_with_proxy(config): def get_sampled_schema_for_table(config, table_spec): LOGGER.info('Sampling records to determine table schema.') - s3_files_gen = get_input_files_for_table(config, table_spec) + s3_files_gen = [x for x in get_input_files_for_table(config, table_spec)] + LOGGER.warning('XXXXX sync_stream config: %s', config) + LOGGER.warning('XXXXX sync_stream table_spec: %s', table_spec) + LOGGER.warning('XXXXX s3_files include %s files', len(s3_files_gen)) samples = [sample for sample in sample_files(config, table_spec, s3_files_gen)] + LOGGER.warning('XXXXX samples include %s files', len(samples)) if skipped_files_count: LOGGER.warning("%s files got skipped during the last sampling.",skipped_files_count) From 270d77c8900f69dc3ef00832eca69b56395e2c95 Mon Sep 17 00:00:00 2001 From: Ben Allred Date: Thu, 12 Mar 2026 11:54:15 -0600 Subject: [PATCH 04/10] moar --- tap_s3_csv/s3.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tap_s3_csv/s3.py b/tap_s3_csv/s3.py index 85df1d5..6bbdc8e 100644 --- a/tap_s3_csv/s3.py +++ b/tap_s3_csv/s3.py @@ -632,10 +632,10 @@ def get_input_files_for_table(config, table_spec, modified_since=None): unmatched_files_count += 1 continue - LOGGER.warning('XXXXX modified_since %s and last_modified %s', modified_since, last_modified) - if modified_since is not None and last_modified is not None: - LOGGER.warning('XXXXX modified_since < last_modified: %s', modified_since < last_modified) if matcher.search(key): + LOGGER.warning('XXXXX key %s: modified_since %s and last_modified %s', key, modified_since, last_modified) + if modified_since is not None and last_modified is not None: + LOGGER.warning('XXXXX modified_since < last_modified: %s', modified_since < last_modified) matched_files_count += 1 if modified_since is None or modified_since < last_modified: LOGGER.info('Will download key "%s" as it was last modified %s', From 2bc49cc76f14536cfdbbf5ef9cbef1b40ca23675 Mon Sep 17 00:00:00 2001 From: Ben Allred Date: Thu, 12 Mar 2026 13:47:40 -0600 Subject: [PATCH 05/10] ensure bookmark for stream --- tap_s3_csv/__init__.py | 2 ++ tap_s3_csv/s3.py | 13 ++----------- tap_s3_csv/sync.py | 14 +++++--------- 3 files changed, 9 insertions(+), 20 deletions(-) diff --git a/tap_s3_csv/__init__.py b/tap_s3_csv/__init__.py index e9a7a23..6d4c448 100644 --- a/tap_s3_csv/__init__.py +++ b/tap_s3_csv/__init__.py @@ -40,6 +40,8 @@ def do_sync(config, catalog, state, sync_start_time): LOGGER.info("%s: Skipping - not selected", stream_name) continue + bookmark = singer.get_bookmark(state, stream_name, 'modified_since') or config['start_date'] + state = singer.set_bookmark(state, stream_name, 'modified_since', bookmark) singer.write_state(state) key_properties = metadata.get(mdata, (), 'table-key-properties') singer.write_schema(stream_name, stream['schema'], key_properties) diff --git a/tap_s3_csv/s3.py b/tap_s3_csv/s3.py index 6bbdc8e..e339f0b 100644 --- a/tap_s3_csv/s3.py +++ b/tap_s3_csv/s3.py @@ -278,12 +278,8 @@ def setup_s3fs_client_with_proxy(config): def get_sampled_schema_for_table(config, table_spec): LOGGER.info('Sampling records to determine table schema.') - s3_files_gen = [x for x in get_input_files_for_table(config, table_spec)] - LOGGER.warning('XXXXX sync_stream config: %s', config) - LOGGER.warning('XXXXX sync_stream table_spec: %s', table_spec) - LOGGER.warning('XXXXX s3_files include %s files', len(s3_files_gen)) + s3_files_gen = get_input_files_for_table(config, table_spec) samples = [sample for sample in sample_files(config, table_spec, s3_files_gen)] - LOGGER.warning('XXXXX samples include %s files', len(samples)) if skipped_files_count: LOGGER.warning("%s files got skipped during the last sampling.",skipped_files_count) @@ -620,9 +616,7 @@ def get_input_files_for_table(config, table_spec, modified_since=None): matched_files_count = 0 unmatched_files_count = 0 max_files_before_log = 30000 - s3_files = [x for x in list_files_in_bucket(config, table_spec.get('search_prefix'))] - LOGGER.warning('XXXXX s3_files %s', len(s3_files)) - for s3_object in s3_files: + for s3_object in list_files_in_bucket(config, table_spec.get('search_prefix')): key = s3_object['Key'] last_modified = s3_object['LastModified'] @@ -633,9 +627,6 @@ def get_input_files_for_table(config, table_spec, modified_since=None): continue if matcher.search(key): - LOGGER.warning('XXXXX key %s: modified_since %s and last_modified %s', key, modified_since, last_modified) - if modified_since is not None and last_modified is not None: - LOGGER.warning('XXXXX modified_since < last_modified: %s', modified_since < last_modified) matched_files_count += 1 if modified_since is None or modified_since < last_modified: LOGGER.info('Will download key "%s" as it was last modified %s', diff --git a/tap_s3_csv/sync.py b/tap_s3_csv/sync.py index da536cf..4968120 100644 --- a/tap_s3_csv/sync.py +++ b/tap_s3_csv/sync.py @@ -31,10 +31,8 @@ def sync_stream(config, state, table_spec, stream, sync_start_time): LOGGER.info('Syncing table "%s".', table_name) LOGGER.info('Getting files modified since %s.', modified_since) - s3_files = [x for x in s3.get_input_files_for_table(config, table_spec, modified_since)] - LOGGER.warning('XXXXX sync_stream config: %s', config) - LOGGER.warning('XXXXX sync_stream table_spec: %s', table_spec) - LOGGER.warning('XXXXX s3_files include %s files', len(s3_files)) + s3_files = s3.get_input_files_for_table( + config, table_spec, modified_since) records_streamed = 0 @@ -43,8 +41,8 @@ def sync_stream(config, state, table_spec, stream, sync_start_time): # we can sort in memory which is suboptimal. If we could bookmark # based on anything else then we could just sync files as we see them. for s3_file in sorted(s3_files, key=lambda item: item['last_modified']): - LOGGER.warning('XXXXX attempting to sync %s', s3_file['key']) - records_streamed += sync_table_file(config, s3_file['key'], table_spec, stream) + records_streamed += sync_table_file( + config, s3_file['key'], table_spec, stream) if s3_file['last_modified'] < sync_start_time: state = singer.write_bookmark(state, table_name, 'modified_since', s3_file['last_modified'].isoformat()) else: @@ -52,7 +50,7 @@ def sync_stream(config, state, table_spec, stream, sync_start_time): singer.write_state(state) if s3.skipped_files_count: - LOGGER.warning("%s files got skipped during the last sync.",s3.skipped_files_count) + LOGGER.warn("%s files got skipped during the last sync.",s3.skipped_files_count) LOGGER.info('Wrote %s records for table "%s".', records_streamed, table_name) @@ -204,9 +202,7 @@ def sync_csv_file(config, file_handle, s3_path, table_spec, stream): # memory consumption but that's acceptable as well. csv.field_size_limit(sys.maxsize) - LOGGER.warning('XXXXX syncing csv file') if "properties" in stream["schema"]: - LOGGER.warning('XXXXX properties keys: %s', stream["schema"]["properties"].keys()) iterator = csv_helper.get_row_iterator( file_handle, table_spec, stream["schema"]["properties"].keys(), True) else: From 2123eec453a11d5ebf255616fb1276331d44a87e Mon Sep 17 00:00:00 2001 From: Ben Allred Date: Thu, 12 Mar 2026 13:49:53 -0600 Subject: [PATCH 06/10] bump singer-python --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 3667a3e..3a7c5d8 100644 --- a/setup.py +++ b/setup.py @@ -14,7 +14,7 @@ 'boto3==1.39.8', 'urllib3==2.6.3', 'singer-encodings==0.3.0', - 'singer-python==5.14.3', + 'singer-python==5.19.0', 'voluptuous==0.15.2', 's3fs==2025.9.0' ], From 25c730954bad888eb1fcd3dcd122f805924dcff5 Mon Sep 17 00:00:00 2001 From: Ben Allred Date: Fri, 13 Mar 2026 17:35:14 +0000 Subject: [PATCH 07/10] add test --- tests/test_bookmarks.py | 50 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/tests/test_bookmarks.py b/tests/test_bookmarks.py index 3e81f7c..a19be74 100644 --- a/tests/test_bookmarks.py +++ b/tests/test_bookmarks.py @@ -1,3 +1,4 @@ +from datetime import datetime from tap_tester import connections, menagerie, runner from functools import reduce from singer import metadata @@ -102,3 +103,52 @@ def test_run(self): records = runner.get_records_from_target_output() messages = records.get('chickens', {}).get('messages', []) self.assertEqual(len(messages), 0, msg="Sync'd incorrect count of messages: {}".format(len(messages))) + +class S3BookmarksStartDateSucceedsModifiedDate(S3CSVBaseTest): + + table_entry = [{'table_name': 'skipped', 'search_prefix': 'tap-s3-csv', 'search_pattern': 'bookmarks_small\\.csv'}] + start_date = datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ") + + def setUp(self): + self.conn_id = connections.ensure_connection(self) + + def resource_name(self): + return "small.csv" + + def name(self): + return "tap_tester_s3_csv_bookmarks_skipped" + + def expected_check_streams(self): + return { + 'skipped' + } + + def expected_sync_streams(self): + return { + 'skipped' + } + + def expected_pks(self): + return { + 'skipped': set() + } + + def get_properties(self, original: bool = True): + return super().get_properties(original) | {'start_date': self.start_date} + + def test_run(self): + found_catalogs = self.run_and_verify_check_mode(self.conn_id) + + # Select our catalogs + our_catalogs = [c for c in found_catalogs if c.get('tap_stream_id') in self.expected_sync_streams()] + + self.perform_and_verify_table_and_field_selection(self.conn_id, our_catalogs) + + # Clear state before our run + menagerie.set_state(self.conn_id, {}) + + # Sync 0 records because start date is after file modified_date + self.run_and_verify_sync(self.conn_id, True) + + expected_state = {'bookmarks': {'skipped': {'modified_since': self.start_date}}} + self.assertEqual(menagerie.get_state(self.conn_id), expected_state) From a102a8d60b2083393230de4f936cb6efd55928a7 Mon Sep 17 00:00:00 2001 From: Ben Allred Date: Fri, 13 Mar 2026 17:57:09 +0000 Subject: [PATCH 08/10] use same bookmarks csv in both tests --- tests/test_bookmarks.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/test_bookmarks.py b/tests/test_bookmarks.py index a19be74..aef60c7 100644 --- a/tests/test_bookmarks.py +++ b/tests/test_bookmarks.py @@ -106,17 +106,16 @@ def test_run(self): class S3BookmarksStartDateSucceedsModifiedDate(S3CSVBaseTest): - table_entry = [{'table_name': 'skipped', 'search_prefix': 'tap-s3-csv', 'search_pattern': 'bookmarks_small\\.csv'}] - start_date = datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ") + table_entry = [{'table_name': 'skipped', 'search_prefix': 'tap_tester', 'search_pattern': 'tap_tester/bookmarks.*', 'key_properties': 'name'}] def setUp(self): self.conn_id = connections.ensure_connection(self) def resource_name(self): - return "small.csv" + return "bookmarks.csv" def name(self): - return "tap_tester_s3_csv_bookmarks_skipped" + return "tap_tester_s3_csv_bookmarks" def expected_check_streams(self): return { @@ -130,10 +129,11 @@ def expected_sync_streams(self): def expected_pks(self): return { - 'skipped': set() + 'skipped': {"name"} } def get_properties(self, original: bool = True): + self.start_date = datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ") return super().get_properties(original) | {'start_date': self.start_date} def test_run(self): From 0e6fc053d94effd5a37169cd7c1d456604c0f382 Mon Sep 17 00:00:00 2001 From: Ben Allred Date: Fri, 13 Mar 2026 18:37:06 +0000 Subject: [PATCH 09/10] correctly formatted start_date --- tests/test_bookmarks.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/test_bookmarks.py b/tests/test_bookmarks.py index aef60c7..c2b3d67 100644 --- a/tests/test_bookmarks.py +++ b/tests/test_bookmarks.py @@ -1,4 +1,4 @@ -from datetime import datetime +from datetime import datetime, date, timedelta, time from tap_tester import connections, menagerie, runner from functools import reduce from singer import metadata @@ -133,7 +133,9 @@ def expected_pks(self): } def get_properties(self, original: bool = True): - self.start_date = datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ") + tomorrow = date.today() + timedelta(days=1) + self.start_date = datetime.combine(tomorrow, time.min).strftime("%Y-%m-%dT%H:%M:%SZ") + return super().get_properties(original) | {'start_date': self.start_date} def test_run(self): From 84f1c46f8b0604c6dd20266267157f46583ffad5 Mon Sep 17 00:00:00 2001 From: Ben Allred Date: Mon, 16 Mar 2026 07:44:52 -0600 Subject: [PATCH 10/10] bump version and add changelog entry --- CHANGELOG.md | 3 +++ setup.py | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fe025c9..2e0bfeb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 2.2.4 + * Write bookmarks for streams that sync no records [#87](https://github.com/singer-io/tap-s3-csv/pull/87) + ## 2.2.3 * Bumps urllib3 dependency for twistlock compliance [#86](https://github.com/singer-io/tap-s3-csv/pull/86) diff --git a/setup.py b/setup.py index 3a7c5d8..bca5cc8 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from setuptools import setup setup(name='tap-s3-csv', - version='2.2.3', + version='2.2.4', description='Singer.io tap for extracting CSV files from S3', author='Stitch', url='https://singer.io',