File tree Expand file tree Collapse file tree 1 file changed +9
-2
lines changed
airbyte/cli/universal_connector Expand file tree Collapse file tree 1 file changed +9
-2
lines changed Original file line number Diff line number Diff line change @@ -270,9 +270,16 @@ def write(
270270
271271 # Add to buffer
272272 buffer [stream_name ]["_airbyte_ab_id" ].append (str (uuid .uuid4 ()))
273- buffer [stream_name ]["_airbyte_emitted_at" ].append (
274- datetime .datetime .now (datetime .timezone .utc ).isoformat ()
273+ # Use the record's original emitted_at timestamp if available,
274+ # otherwise fall back to current time
275+ emitted_at = (
276+ datetime .datetime .fromtimestamp (
277+ record .emitted_at / 1000 , tz = datetime .timezone .utc
278+ )
279+ if record .emitted_at is not None
280+ else datetime .datetime .now (datetime .timezone .utc )
275281 )
282+ buffer [stream_name ]["_airbyte_emitted_at" ].append (emitted_at .isoformat ())
276283 buffer [stream_name ]["_airbyte_data" ].append (json .dumps (record .data ))
277284
278285 else :
You can’t perform that action at this time.
0 commit comments