diff --git a/tap_dynamodb/sync_strategies/log_based.py b/tap_dynamodb/sync_strategies/log_based.py index 563a8a2..35c3175 100644 --- a/tap_dynamodb/sync_strategies/log_based.py +++ b/tap_dynamodb/sync_strategies/log_based.py @@ -210,16 +210,16 @@ def sync(config, state, stream): streams_client, stream_arn, projection, deserializer, table_name, stream_version, state) - # Now that we have fully synced the shard, move it from the - # shard_seq_numbers to finished_shards. - finished_shard_bookmarks.append(shard['ShardId']) - state = singer.write_bookmark(state, table_name, 'finished_shards', finished_shard_bookmarks) - - if seq_number_bookmarks.get(shard['ShardId']): - seq_number_bookmarks.pop(shard['ShardId']) - state = singer.write_bookmark(state, table_name, 'shard_seq_numbers', seq_number_bookmarks) + # Now that we have fully synced the shard, move it from the + # shard_seq_numbers to finished_shards. + finished_shard_bookmarks.append(shard['ShardId']) + state = singer.write_bookmark(state, table_name, 'finished_shards', finished_shard_bookmarks) + + if seq_number_bookmarks.get(shard['ShardId']): + seq_number_bookmarks.pop(shard['ShardId']) + state = singer.write_bookmark(state, table_name, 'shard_seq_numbers', seq_number_bookmarks) - singer.write_state(state) + singer.write_state(state) for shardId in finished_shard_bookmarks: if shardId not in found_shards: