Skip to content

Commit dd710f8

Browse files
authored
Merge pull request #665 from NVIDIA/xutongr/time
Skip stale resource messages in multi-worker processing
2 parents a88f6b8 + 342dda4 commit dd710f8

File tree

4 files changed

+99
-36
lines changed

4 files changed

+99
-36
lines changed
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
{
2+
"operations": [
3+
{
4+
"add_column": {
5+
"table": "resources",
6+
"column": {
7+
"name": "last_updated",
8+
"type": "TIMESTAMP",
9+
"nullable": true
10+
}
11+
}
12+
},
13+
{
14+
"add_column": {
15+
"table": "resources",
16+
"column": {
17+
"name": "last_usage_updated",
18+
"type": "TIMESTAMP",
19+
"nullable": true
20+
}
21+
}
22+
}
23+
]
24+
}

src/service/agent/helpers.py

Lines changed: 61 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -206,25 +206,30 @@ def queue_update_group_job(message: backend_messages.UpdatePodBody):
206206
update_task.send_job_to_queue()
207207

208208

209-
def update_resource(backend: str, message: backend_messages.UpdateNodeBody):
209+
def update_resource(backend: str, message: backend_messages.UpdateNodeBody,
210+
message_timestamp: datetime.datetime | None = None):
210211
context = objects.WorkflowServiceContext.get()
211212
postgres = context.database
212213
# If delete flag is set, delegate to delete_resource and ignore all other fields
213214
if message.delete:
214215
delete_resource(backend, message)
215216
return
216217

218+
timestamp = message_timestamp if message_timestamp else common.current_time()
219+
217220
commit_cmd = '''
218221
INSERT INTO resources
219222
(name, backend, available, allocatable_fields, label_fields, usage_fields,
220-
non_workflow_usage_fields, taints, conditions)
221-
VALUES (%s, %s, %s, %s, %s, %s, %s, %s::jsonb[], %s::text[])
223+
non_workflow_usage_fields, taints, conditions, last_updated)
224+
VALUES (%s, %s, %s, %s, %s, %s, %s, %s::jsonb[], %s::text[], %s)
222225
ON CONFLICT (name, backend) DO UPDATE SET
223226
available = %s,
224227
allocatable_fields = %s,
225228
label_fields = %s,
226229
taints = %s::jsonb[],
227-
conditions = %s::text[]
230+
conditions = %s::text[],
231+
last_updated = %s
232+
WHERE resources.last_updated IS NULL OR resources.last_updated < %s
228233
'''
229234

230235
resource = {
@@ -249,48 +254,62 @@ def update_resource(backend: str, message: backend_messages.UpdateNodeBody):
249254
resource['non_workflow_usage_fields'],
250255
resource['taints'],
251256
resource['conditions'],
257+
timestamp,
252258
resource['available'],
253259
resource['allocatable_fields'],
254260
resource['label_fields'],
255261
resource['taints'],
256-
resource['conditions']
262+
resource['conditions'],
263+
timestamp,
264+
timestamp,
257265
)
258266

259-
postgres.execute_commit_command(commit_cmd, columns)
260-
pool_config = connectors.fetch_verbose_pool_config(postgres, backend)
261-
resource_entry = workflow.ResourcesEntry(hostname=message.hostname,
262-
label_fields=message.label_fields,
263-
backend=backend, taints=message.taints,
264-
# Dummy placeholder values below
265-
exposed_fields={},
266-
usage_fields={},
267-
non_workflow_usage_fields={},
268-
allocatable_fields={},
269-
pool_platform_labels={},
270-
resource_type=connectors.BackendResourceType.SHARED,
271-
conditions=message.conditions)
272-
config_helpers.update_node_pool_platform(resource_entry, backend, pool_config)
273-
274-
275-
def update_resource_usage(backend: str, message: backend_messages.UpdateNodeUsageBody):
267+
rowcount = postgres.execute_commit_command(commit_cmd, columns)
268+
if rowcount > 0:
269+
pool_config = connectors.fetch_verbose_pool_config(postgres, backend)
270+
resource_type = connectors.BackendResourceType.SHARED
271+
resource_entry = workflow.ResourcesEntry(
272+
hostname=message.hostname,
273+
label_fields=message.label_fields,
274+
backend=backend, taints=message.taints,
275+
# Dummy placeholder values below
276+
exposed_fields={},
277+
usage_fields={},
278+
non_workflow_usage_fields={},
279+
allocatable_fields={},
280+
pool_platform_labels={},
281+
resource_type=resource_type,
282+
conditions=message.conditions)
283+
config_helpers.update_node_pool_platform(resource_entry, backend, pool_config)
284+
285+
286+
def update_resource_usage(backend: str, message: backend_messages.UpdateNodeUsageBody,
287+
message_timestamp: datetime.datetime | None = None):
276288
context = objects.WorkflowServiceContext.get()
277289
postgres = context.database
290+
timestamp = message_timestamp if message_timestamp else common.current_time()
291+
278292
commit_cmd = '''
279293
INSERT INTO resources
280-
(name, backend, usage_fields, non_workflow_usage_fields)
281-
VALUES (%s, %s, %s, %s)
294+
(name, backend, usage_fields, non_workflow_usage_fields, last_usage_updated)
295+
VALUES (%s, %s, %s, %s, %s)
282296
ON CONFLICT (name, backend) DO UPDATE SET
283297
usage_fields = %s,
284-
non_workflow_usage_fields = %s
298+
non_workflow_usage_fields = %s,
299+
last_usage_updated = %s
300+
WHERE resources.last_usage_updated IS NULL OR resources.last_usage_updated < %s
285301
'''
286302

287303
columns = (
288304
message.hostname,
289305
backend,
290306
postgres.encode_hstore(message.usage_fields),
291307
postgres.encode_hstore(message.non_workflow_usage_fields),
308+
timestamp,
292309
postgres.encode_hstore(message.usage_fields),
293-
postgres.encode_hstore(message.non_workflow_usage_fields)
310+
postgres.encode_hstore(message.non_workflow_usage_fields),
311+
timestamp,
312+
timestamp,
294313
)
295314

296315
postgres.execute_commit_command(commit_cmd, columns)
@@ -331,9 +350,11 @@ def delete_resource(backend: str,
331350

332351

333352
def clean_resources(backend: str,
334-
message: backend_messages.NodeInventoryBody):
353+
message: backend_messages.NodeInventoryBody,
354+
message_timestamp: datetime.datetime | None = None):
335355
context = objects.WorkflowServiceContext.get()
336356
postgres = context.database
357+
timestamp = message_timestamp if message_timestamp else common.current_time()
337358

338359
# Track all resources from resources table
339360
cmd = 'SELECT name FROM resources where backend = %s'
@@ -343,8 +364,13 @@ def clean_resources(backend: str,
343364
# Find nodes that exist in the database but not in the message
344365
stale_nodes = db_node_names - set(message.hostnames)
345366
if stale_nodes:
346-
commit_cmd = 'DELETE FROM resources WHERE name IN %s and backend = %s'
347-
postgres.execute_commit_command(commit_cmd, (tuple(stale_nodes), backend))
367+
# Only delete nodes whose last_updated is older than this inventory message,
368+
# preventing a stale inventory from removing nodes added after it was created
369+
commit_cmd = '''
370+
DELETE FROM resources WHERE name IN %s AND backend = %s
371+
AND (last_updated IS NULL OR last_updated < %s)
372+
'''
373+
postgres.execute_commit_command(commit_cmd, (tuple(stale_nodes), backend, timestamp))
348374

349375

350376
def clean_tasks(backend: str,
@@ -622,11 +648,14 @@ async def get_messages():
622648
elif message_body.monitor_pod:
623649
create_monitor_job(message_body.monitor_pod)
624650
elif message_body.update_node:
625-
update_resource(name, message_body.update_node)
651+
update_resource(name, message_body.update_node,
652+
message_timestamp=message.timestamp)
626653
elif message_body.update_node_usage:
627-
update_resource_usage(name, message_body.update_node_usage)
654+
update_resource_usage(name, message_body.update_node_usage,
655+
message_timestamp=message.timestamp)
628656
elif message_body.node_inventory:
629-
clean_resources(name, message_body.node_inventory)
657+
clean_resources(name, message_body.node_inventory,
658+
message_timestamp=message.timestamp)
630659
elif message_body.task_list:
631660
clean_tasks(name, message_body.task_list)
632661
elif message_body.heartbeat:

src/service/agent/message_worker.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -227,12 +227,15 @@ def process_message(self, message_id: str, message_json: str, backend_name: str)
227227
helpers.send_pod_conditions(message_body.update_pod,
228228
self.workflow_config.max_event_log_lines)
229229
elif message_body.update_node:
230-
helpers.update_resource(backend_name, message_body.update_node)
230+
helpers.update_resource(backend_name, message_body.update_node,
231+
message_timestamp=message.timestamp)
231232
elif message_body.update_node_usage:
232233
helpers.update_resource_usage(
233-
backend_name, message_body.update_node_usage)
234+
backend_name, message_body.update_node_usage,
235+
message_timestamp=message.timestamp)
234236
elif message_body.node_inventory:
235-
helpers.clean_resources(backend_name, message_body.node_inventory)
237+
helpers.clean_resources(backend_name, message_body.node_inventory,
238+
message_timestamp=message.timestamp)
236239
elif message_body.pod_event:
237240
helpers.send_pod_event(
238241
message_body.pod_event, self.workflow_config.max_event_log_lines)

src/utils/connectors/postgres.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -475,14 +475,17 @@ def execute_fetch_command(self, command: str,
475475
cur.close()
476476

477477
@retry
478-
def execute_commit_command(self, command: str, args: Tuple):
478+
def execute_commit_command(self, command: str, args: Tuple) -> int:
479479
"""
480480
Connects and executes a command that updates the database.
481481
482482
Args:
483483
command (str): The command to execute.
484484
args (Tuple): Any args for the command.
485485
486+
Returns:
487+
int: The number of rows affected by the command.
488+
486489
Raises:
487490
OSMODatabaseError: Error while executing the database command.
488491
"""
@@ -491,8 +494,10 @@ def execute_commit_command(self, command: str, args: Tuple):
491494
try:
492495
cur = conn.cursor()
493496
cur.execute(command, args)
497+
rowcount = cur.rowcount
494498
cur.close()
495499
conn.commit()
500+
return rowcount
496501
except (psycopg2.DatabaseError, psycopg2.InterfaceError) as error:
497502
try:
498503
if cur is not None:
@@ -994,6 +999,8 @@ def _init_tables(self):
994999
usage_fields HSTORE,
9951000
non_workflow_usage_fields HSTORE,
9961001
conditions TEXT[],
1002+
last_updated TIMESTAMP,
1003+
last_usage_updated TIMESTAMP,
9971004
PRIMARY KEY (name, backend)
9981005
);
9991006
'''

0 commit comments

Comments
 (0)