@@ -1061,10 +1061,18 @@ function Base.notify(db::Doorbell)
1061
1061
end
1062
1062
end
1063
1063
1064
+ struct TaskSpecKey
1065
+ task_id:: Int
1066
+ task_spec:: Vector{Any}
1067
+ TaskSpecKey (task_spec:: Vector{Any} ) = new (task_spec[1 ], task_spec)
1068
+ end
1069
+ Base. getindex (key:: TaskSpecKey ) = key. task_spec
1070
+ Base. hash (key:: TaskSpecKey , h:: UInt ) = hash (key. task_id, hash (TaskSpecKey, h))
1071
+
1064
1072
struct ProcessorInternalState
1065
1073
ctx:: Context
1066
1074
proc:: Processor
1067
- queue:: LockedObject{PriorityQueue{Vector{Any} , UInt32, Base.Order.ForwardOrdering}}
1075
+ queue:: LockedObject{PriorityQueue{TaskSpecKey , UInt32, Base.Order.ForwardOrdering}}
1068
1076
reschedule:: Doorbell
1069
1077
tasks:: Dict{Int,Task}
1070
1078
proc_occupancy:: Base.RefValue{UInt32}
@@ -1175,7 +1183,8 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re
1175
1183
if length (queue) == 0
1176
1184
return nothing
1177
1185
end
1178
- task, occupancy = peek (queue)
1186
+ task_spec, occupancy = peek (queue)
1187
+ task = task_spec[]
1179
1188
scope = task[5 ]
1180
1189
if ! isa (constrain (scope, Dagger. ExactScope (to_proc)),
1181
1190
Dagger. InvalidScope) &&
@@ -1202,7 +1211,8 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re
1202
1211
end
1203
1212
1204
1213
@label execute
1205
- task, task_occupancy = task_and_occupancy
1214
+ task_spec, task_occupancy = task_and_occupancy
1215
+ task = task_spec[]
1206
1216
thunk_id = task[1 ]
1207
1217
time_util = task[2 ]
1208
1218
timespan_finish (ctx, :proc_run_fetch , to_proc, (;thunk_id, proc_occupancy= proc_occupancy[], task_occupancy))
@@ -1272,7 +1282,7 @@ function do_tasks(to_proc, return_queue, tasks)
1272
1282
uid = first (tasks)[18 ]
1273
1283
state = proc_states (uid) do states
1274
1284
get! (states, to_proc) do
1275
- queue = PriorityQueue {Vector{Any} , UInt32} ()
1285
+ queue = PriorityQueue {TaskSpecKey , UInt32} ()
1276
1286
queue_locked = LockedObject (queue)
1277
1287
reschedule = Doorbell ()
1278
1288
istate = ProcessorInternalState (ctx, to_proc,
@@ -1302,7 +1312,7 @@ function do_tasks(to_proc, return_queue, tasks)
1302
1312
end
1303
1313
end
1304
1314
should_launch || continue
1305
- enqueue! (queue, task, occupancy)
1315
+ enqueue! (queue, TaskSpecKey ( task) , occupancy)
1306
1316
timespan_finish (ctx, :enqueue , (;to_proc, thunk_id), nothing )
1307
1317
@dagdebug thunk_id :processor " Enqueued task"
1308
1318
end
0 commit comments