11"""
22Test tap sets a bookmark and respects it for the next sync of a stream
33"""
4+ import math
45import json
56from pathlib import Path
67from random import random
@@ -84,6 +85,7 @@ def test_run(self):
8485 for _ in range (2 ): # create 3 records for each stream but only expect the 3rd
8586 for stream in self .streams_to_create :
8687 self .new_objects [stream ].append (create_object (stream ))
88+ # Why are we only expecting the last one?
8789 for stream in self .streams_to_create :
8890 new_object = create_object (stream )
8991 self .new_objects [stream ].append (stripe_obj_to_dict (new_object ))
@@ -93,6 +95,7 @@ def test_run(self):
9395
9496 # Instantiate connection with default start
9597 conn_id = connections .ensure_connection (self )
98+ self .conn_id = conn_id
9699
97100 # run in check mode
98101 found_catalogs = self .run_and_verify_check_mode (conn_id )
@@ -128,8 +131,11 @@ def test_run(self):
128131
129132 # Update one record from each stream prior to 2nd sync
130133 first_sync_created , _ = self .split_records_into_created_and_updated (first_sync_records )
134+
131135 for stream in self .streams_to_create .difference (cannot_update_streams ):
132136 # There needs to be some test data for each stream, otherwise this will break
137+ # TODO - first sync expected records is only the last record which would be synced no matter what
138+ # we should actually do the first or second created record which should not be synced unless updated
133139 record = expected_records_first_sync [stream ][0 ]
134140 if stream == 'payment_intents' :
135141 # updating the PaymentIntent object may require multiple attempts
@@ -171,6 +177,8 @@ def test_run(self):
171177 len (expected_records_first_sync .get (stream )) + len (created_records [stream ]),
172178 msg = "Expectations are invalid for full table stream {}" .format (stream )
173179 )
180+ else :
181+ raise NotImplementedError ("The replication method has no tests" )
174182
175183 # created_records[stream] = self.records_data_type_conversions(created_records.get(stream))
176184 # updated_records[stream] = self.records_data_type_conversions(updated_records.get(stream))
@@ -185,21 +193,28 @@ def test_run(self):
185193
186194 # Get the set of records from a second sync
187195 second_sync_records = runner .get_records_from_target_output ()
188- second_sync_created , second_sync_updated = self .split_records_into_created_and_updated (second_sync_records )
196+ second_sync_created , second_sync_updated = self .split_records_into_created_and_updated (
197+ second_sync_records )
189198
190199 # Loop first_sync_records and compare against second_sync_records
191200 for stream in self .streams_to_create .difference (untested_streams ):
192201 with self .subTest (stream = stream ):
193-
202+ # TODO - We should assert the bookmark value is correct, i.e. it is the value of
203+ # the latest record to come back from the sync. Add assetions.
194204 second_sync_data = [record .get ("data" ) for record
195205 in second_sync_records .get (stream , {}).get ("messages" , [])]
196- stream_replication_keys = self .expected_replication_keys ()
197- stream_primary_keys = self .expected_primary_keys ()
206+ second_sync_created_data = [record .get ("data" ) for record
207+ in second_sync_created .get (stream , {}).get ("messages" , [])]
208+ second_sync_updated_data = [record .get ("data" ) for record
209+ in second_sync_updated .get (stream , {}).get ("messages" , [])]
210+
211+ tap_replication_keys = self .expected_replication_keys ()
212+ tap_primary_keys = self .expected_primary_keys ()
198213
199214 # TESTING INCREMENTAL STREAMS
200215 if stream in self .expected_incremental_streams ():
201216
202- replication_keys = stream_replication_keys .get (stream )
217+ stream_replication_keys = tap_replication_keys .get (stream )
203218
204219 # Verify both syncs write / keep the same bookmark keys
205220 self .assertEqual (set (first_sync_state .get ('bookmarks' , {}).keys ()),
@@ -216,34 +231,70 @@ def test_run(self):
216231 msg = "first sync didn't have more records, bookmark usage not verified" )
217232
218233 if stream in self .streams_to_create .difference (cannot_update_streams ):
219- for replication_key in replication_keys :
234+ for replication_key in stream_replication_keys :
220235 updates_replication_key = "updates_created"
221236 updates_stream = stream + "_events"
222237
223- # Verify second sync's bookmarks move past the first sync's
224- self .assertGreater (
225- second_sync_state .get ('bookmarks' , {updates_stream : {}}).get (
226- updates_stream , {replication_key : - 1 }).get (updates_replication_key ),
227- first_sync_state .get ('bookmarks' , {updates_stream : {}}).get (
238+ sync_1_created_bookmark = list (first_sync_state .get ('bookmarks' , {stream : {}}).get (stream ).values ())
239+ assert len (sync_1_created_bookmark ) == 1 , sync_1_created_bookmark
240+ sync_1_value = sync_1_created_bookmark [0 ]
241+ sync_1_updated_value = first_sync_state .get ('bookmarks' , {updates_stream : {updates_replication_key : - 1 }}).get (
242+ updates_stream , {updates_replication_key : - 1 }).get (updates_replication_key )
243+ sync_2_created_bookmark = list (second_sync_state .get ('bookmarks' , {stream : {}}).get (stream ).values ())
244+ assert len (sync_2_created_bookmark ) == 1 , sync_2_created_bookmark
245+ sync_2_value = sync_2_created_bookmark [0 ]
246+ sync_2_updated_value = second_sync_state .get ('bookmarks' , {updates_stream : {updates_replication_key : - 1 }}).get (
228247 updates_stream , {updates_replication_key : - 1 }).get (updates_replication_key )
229- )
248+
249+ # Verify second sync's bookmarks move past the first sync's for update events
250+ self .assertGreater (sync_2_updated_value , sync_1_updated_value )
251+
252+ # Verify second sync's bookmarks move past the first sync's for create data
253+ self .assertGreater (sync_2_value , sync_1_value )
254+
230255
231256 # Verify that all data of the 2nd sync is >= the bookmark from the first sync
232- first_sync_bookmark = dt .fromtimestamp (
233- first_sync_state .get ('bookmarks' ).get (updates_stream ).get (updates_replication_key )
234- )
257+ first_sync_bookmark_created = dt .fromtimestamp (sync_1_value )
258+ print (f"*** TEST - 1st sync created: { first_sync_bookmark_created } " )
259+
260+ first_sync_bookmark_updated = dt .fromtimestamp (sync_1_updated_value )
261+ print (f"*** TEST - 1st sync updated: { first_sync_bookmark_updated } " )
262+
263+ # BUG - Remove following 2 code lines after bug fix
264+ # https://jira.talendforge.org/browse/TDL-21007
265+ first_sync_bookmark_created = min (first_sync_bookmark_created , first_sync_bookmark_updated )
266+ first_sync_bookmark_updated = min (first_sync_bookmark_created , first_sync_bookmark_updated )
267+
235268 # This assertion would fail for the child streams as it is replicated based on the parent i.e. it would fetch the parents based on
236269 # the bookmark and retrieve all the child records for th parent.
237270 # Hence skipping this assertion for child streams.
238271 if stream not in self .child_streams ().union ({'payout_transactions' }):
239- for record in second_sync_data :
272+ for record in second_sync_created_data :
273+ print ("2nd Sync Created Record Data" )
274+ print (f" updated: { record ['updated' ]} \n { replication_key } : { record [replication_key ]} " )
240275 date_value = record ["updated" ]
241276 self .assertGreaterEqual (date_value ,
242- dt .strftime (first_sync_bookmark , self .TS_COMPARISON_FORMAT ),
277+ dt .strftime (first_sync_bookmark_created , self .TS_COMPARISON_FORMAT ),
243278 msg = "A 2nd sync record has a replication-key that is less than or equal to the 1st sync bookmark." )
244279
280+ if stream not in self .child_streams ().union ({'payout_transactions' }):
281+ for record in second_sync_updated_data :
282+ print ("2nd Sync Updated Record Data" )
283+ print (f" updated: { record ['updated' ]} \n { replication_key } : { record [replication_key ]} " )
284+ date_value = record ["updated" ]
285+ self .assertGreaterEqual (date_value ,
286+ dt .strftime (first_sync_bookmark_updated , self .TS_COMPARISON_FORMAT ),
287+ msg = "A 2nd sync record has a replication-key that is less than or equal to the 1st sync bookmark." )
288+
289+ else :
290+ # TODO created streams that connot be updated tested here.
291+ pass
292+
293+
245294 elif stream in self .expected_full_table_streams ():
246295 raise Exception ("Expectations changed, but this test was not updated to reflect them." )
296+ else :
297+ raise Exception ("Replication method changed, but this test was not updated to reflect." )
247298
248299 # TESTING APPLICABLE TO ALL STREAMS
249300
@@ -253,9 +304,9 @@ def test_run(self):
253304 # dependencies between streams.
254305 # For full table streams we should see 1 more record than the first sync
255306 expected_records = expected_records_second_sync .get (stream )
256- primary_keys = stream_primary_keys .get (stream )
307+ stream_primary_keys = tap_primary_keys .get (stream )
257308
258- updated_pk_values = {tuple ([record .get (pk ) for pk in primary_keys ])
309+ updated_pk_values = {tuple ([record .get (pk ) for pk in stream_primary_keys ])
259310 for record in updated_records [stream ]}
260311 self .assertLessEqual (
261312 len (expected_records ), len (second_sync_data ),
@@ -266,7 +317,7 @@ def test_run(self):
266317 LOGGER .warn ('Second sync replicated %s records more than our create and update for %s' ,
267318 len (second_sync_data ), stream )
268319
269- if not primary_keys :
320+ if not stream_primary_keys :
270321 raise NotImplementedError ("PKs are needed for comparing records" )
271322
272323 # Verify that the inserted and updated records are replicated by the 2nd sync
0 commit comments