@@ -53,6 +53,7 @@ async def persist_messages(
5353 kv_client : KeyValue ,
5454 key_prefix : str ,
5555 refresh_mode : str ,
56+ validate_records : bool = True ,
5657) -> dict | None :
5758 """
5859 Process Singer messages.
@@ -114,10 +115,7 @@ async def persist_messages(
114115 continue
115116 if primary_key_value [0 ] == "$" :
116117 logger .warning (
117- (
118- "Ignoring record for stream %s with "
119- "primary key starting with $"
120- ),
118+ "Ignoring record for stream %s with primary key starting with $" ,
121119 stream ,
122120 )
123121 continue
@@ -133,20 +131,21 @@ async def persist_messages(
133131 )
134132 continue
135133
136- try :
137- validators [stream ].validate (o .record )
138- except jsonschema .ValidationError as e :
139- logger .warning (
140- (
141- "Ignoring record %s for stream %s that fails "
142- "schema validation: %s at .%s"
143- ),
144- primary_key_value ,
145- stream ,
146- e .message ,
147- "." .join (str (i ) for i in e .relative_path ),
148- )
149- continue
134+ if validate_records :
135+ try :
136+ validators [stream ].validate (o .record )
137+ except jsonschema .ValidationError as e :
138+ logger .warning (
139+ (
140+ "Ignoring record %s for stream %s that fails "
141+ "schema validation: %s at .%s"
142+ ),
143+ primary_key_value ,
144+ stream ,
145+ e .message ,
146+ "." .join (str (i ) for i in e .relative_path ),
147+ )
148+ continue
150149
151150 key = f"{ key_prefix } { stream } .{ primary_key_value } "
152151
@@ -163,13 +162,9 @@ async def persist_messages(
163162 bookmark = bookmarks [stream ]
164163 if bookmark is not None and len (bookmark ) > 0 :
165164 bookmark_attr = bookmark [0 ]
166- if bookmark_attr is not None and refresh_mode != "full" :
167- if bookmark_attr in o .record :
168- bookmark_source = o .record [bookmark_attr ]
169- else :
170- bookmark_source = ""
171-
172- # Compare data freshness using "bookmark" attribute.
165+ if refresh_mode != "full" :
166+ # Compare data using either the "bookmark" or a data
167+ # comparison.
173168 try :
174169 # Try to fetch current value.
175170 current = await kv_client .get (key )
@@ -197,20 +192,48 @@ async def persist_messages(
197192 )
198193 continue
199194
200- if bookmark_attr in current_record :
201- bookmark_target = current_record [bookmark_attr ]
195+ should_update = False
196+ if bookmark_attr is not None :
197+ # Compare bookmark values as strings to determine if the
198+ # new record should be refreshed based on the refresh mode.
199+ # Note, "newer" (which is the default) is implicit in the
200+ # "or" clause of this condition (if it's not "full" or
201+ # "same", it must be "newer").
202+ if bookmark_attr in o .record :
203+ bookmark_source = o .record [bookmark_attr ]
204+ else :
205+ bookmark_source = ""
206+
207+ if bookmark_attr in current_record :
208+ bookmark_target = current_record [bookmark_attr ]
209+ else :
210+ bookmark_target = ""
211+
212+ if (
213+ refresh_mode == "same"
214+ and str (bookmark_source ) >= str (bookmark_target )
215+ ) or str (bookmark_source ) > str (bookmark_target ):
216+ should_update = True
217+ elif refresh_mode == "same" :
218+ # If there is no bookmark, "same" means always update.
219+ should_update = True
202220 else :
203- bookmark_target = ""
204-
205- # Compare bookmark values as strings to determine if the
206- # new record should be refreshed based on the refresh mode.
207- # Note, "newer" (which is the default) is implicit in the
208- # "or" clause of this condition (if it's not "full" or
209- # "same", it must be "newer").
210- if (
211- refresh_mode == "same"
212- and str (bookmark_source ) >= str (bookmark_target )
213- ) or str (bookmark_source ) > str (bookmark_target ):
221+ # No bookmark, and refresh_mode=="newer" (by process of
222+ # elimination), so, compare the data itself.
223+ target_record_filtered = {
224+ k : v
225+ for k , v in current_record .items ()
226+ if not k .startswith ("_sdc_" )
227+ }
228+ source_record_filtered = {
229+ k : v
230+ for k , v in o .record .items ()
231+ if not k .startswith ("_sdc_" )
232+ }
233+ if target_record_filtered != source_record_filtered :
234+ should_update = True
235+
236+ if should_update :
214237 # Update with revision
215238 await kv_client .update (
216239 key = key ,
@@ -222,7 +245,7 @@ async def persist_messages(
222245 logger .debug (
223246 (
224247 "Skipping record for stream %s with key %s due to "
225- "not being newer ."
248+ "not needing update ."
226249 ),
227250 stream ,
228251 primary_key_value ,
@@ -236,8 +259,8 @@ async def persist_messages(
236259 value = json .dumps (o .record ).encode ("utf-8" ),
237260 )
238261 else :
239- # No bookmarks for this stream, or, user has requested "full"
240- # sync, so use regular put .
262+ # User has requested "full" sync, so use "put" without
263+ # data checks .
241264 await kv_client .put (
242265 key = key ,
243266 value = json .dumps (o .record ).encode ("utf-8" ),
@@ -278,6 +301,7 @@ async def run(config: dict) -> dict | None:
278301 bucket = config .get ("bucket" , "singer" )
279302 key_prefix = config .get ("key_prefix" , "" )
280303 refresh_mode = config .get ("refresh_mode" , "" )
304+ validate_records = config .get ("validate_records" , True )
281305 if user_credentials is not None :
282306 user_credentials = Path (user_credentials )
283307
@@ -298,6 +322,7 @@ async def run(config: dict) -> dict | None:
298322 kv_client ,
299323 key_prefix ,
300324 refresh_mode ,
325+ validate_records ,
301326 )
302327 return state
303328
0 commit comments