@@ -95,76 +95,77 @@ async def callback(message: AbstractIncomingMessage):
9595
9696 if "event_type" in msg and msg ["event_type" ] == "file_indexed" :
9797 logger .info (f"This is an event type file indexed!" )
98-
99- job_id = msg ["job_id" ]
100- message_str = msg ["status" ]
101- timestamp = datetime .strptime (
102- msg ["start" ], "%Y-%m-%dT%H:%M:%S%z"
103- ) # incoming format: '2023-01-20T08:30:27-05:00'
104- timestamp = timestamp .replace (tzinfo = datetime .utcnow ().tzinfo )
105-
106- # TODO: Updating an event message could go in rabbitmq/listeners
107-
108- # Check if the job exists, and update if so
109- job = await EventListenerJobDB .find_one (
110- EventListenerJobDB .id == PydanticObjectId (job_id )
111- )
112- if job :
113- # Update existing job with new info
114- job .updated = timestamp
115- parsed = parse_message_status (message_str )
116- cleaned_msg = parsed ["cleaned_msg" ]
117- incoming_status = parsed ["status" ]
118-
119- # Don't override a finished status if a message comes in late
120- if job .status in [
121- EventListenerJobStatus .SUCCEEDED ,
122- EventListenerJobStatus .ERROR ,
123- EventListenerJobStatus .SKIPPED ,
124- ]:
125- cleaned_status = job .status
126- else :
127- cleaned_status = incoming_status
128-
129- # Prepare fields to update based on status (don't overwrite whole object to avoid async issues)
130- field_updates = {
131- EventListenerJobDB .status : cleaned_status ,
132- EventListenerJobDB .latest_message : cleaned_msg ,
133- EventListenerJobDB .updated : timestamp ,
134- }
135-
136- if job .started is not None :
137- field_updates [EventListenerJobDB .duration ] = (
138- timestamp - job .started
139- ).total_seconds ()
140- elif incoming_status == EventListenerJobStatus .STARTED :
141- field_updates [EventListenerJobDB .duration ] = 0
142-
143- logger .info (f"[{ job_id } ] { timestamp } { incoming_status .value } { cleaned_msg } " )
144-
145- # Update the job timestamps/duration depending on what status we received
146- if incoming_status == EventListenerJobStatus .STARTED :
147- field_updates [EventListenerJobDB .started ] = timestamp
148- elif incoming_status in [
149- EventListenerJobStatus .SUCCEEDED ,
150- EventListenerJobStatus .ERROR ,
151- EventListenerJobStatus .SKIPPED ,
152- ]:
153- # job.finished = timestamp
154- field_updates [EventListenerJobDB .finished ] = timestamp
155-
156- await job .set (field_updates )
157-
158- # Add latest message to the job updates
159- event_msg = EventListenerJobUpdateDB (
160- job_id = job_id , status = cleaned_msg , timestamp = timestamp
161- )
162- await event_msg .insert ()
163- return True
98+ # TODO - process file indexed event here
16499 else :
165- # We don't know what this job is. Reject the message.
166- logger .error ("Job ID %s not found in database, skipping message." % job_id )
167- return False
100+ job_id = msg ["job_id" ]
101+ message_str = msg ["status" ]
102+ timestamp = datetime .strptime (
103+ msg ["start" ], "%Y-%m-%dT%H:%M:%S%z"
104+ ) # incoming format: '2023-01-20T08:30:27-05:00'
105+ timestamp = timestamp .replace (tzinfo = datetime .utcnow ().tzinfo )
106+
107+ # TODO: Updating an event message could go in rabbitmq/listeners
108+
109+ # Check if the job exists, and update if so
110+ job = await EventListenerJobDB .find_one (
111+ EventListenerJobDB .id == PydanticObjectId (job_id )
112+ )
113+ if job :
114+ # Update existing job with new info
115+ job .updated = timestamp
116+ parsed = parse_message_status (message_str )
117+ cleaned_msg = parsed ["cleaned_msg" ]
118+ incoming_status = parsed ["status" ]
119+
120+ # Don't override a finished status if a message comes in late
121+ if job .status in [
122+ EventListenerJobStatus .SUCCEEDED ,
123+ EventListenerJobStatus .ERROR ,
124+ EventListenerJobStatus .SKIPPED ,
125+ ]:
126+ cleaned_status = job .status
127+ else :
128+ cleaned_status = incoming_status
129+
130+ # Prepare fields to update based on status (don't overwrite whole object to avoid async issues)
131+ field_updates = {
132+ EventListenerJobDB .status : cleaned_status ,
133+ EventListenerJobDB .latest_message : cleaned_msg ,
134+ EventListenerJobDB .updated : timestamp ,
135+ }
136+
137+ if job .started is not None :
138+ field_updates [EventListenerJobDB .duration ] = (
139+ timestamp - job .started
140+ ).total_seconds ()
141+ elif incoming_status == EventListenerJobStatus .STARTED :
142+ field_updates [EventListenerJobDB .duration ] = 0
143+
144+ logger .info (f"[{ job_id } ] { timestamp } { incoming_status .value } { cleaned_msg } " )
145+
146+ # Update the job timestamps/duration depending on what status we received
147+ if incoming_status == EventListenerJobStatus .STARTED :
148+ field_updates [EventListenerJobDB .started ] = timestamp
149+ elif incoming_status in [
150+ EventListenerJobStatus .SUCCEEDED ,
151+ EventListenerJobStatus .ERROR ,
152+ EventListenerJobStatus .SKIPPED ,
153+ ]:
154+ # job.finished = timestamp
155+ field_updates [EventListenerJobDB .finished ] = timestamp
156+
157+ await job .set (field_updates )
158+
159+ # Add latest message to the job updates
160+ event_msg = EventListenerJobUpdateDB (
161+ job_id = job_id , status = cleaned_msg , timestamp = timestamp
162+ )
163+ await event_msg .insert ()
164+ return True
165+ else :
166+ # We don't know what this job is. Reject the message.
167+ logger .error ("Job ID %s not found in database, skipping message." % job_id )
168+ return False
168169
169170
170171async def listen_for_messages ():
0 commit comments