Skip to content

Commit fcc0fca

Browse files
committed
not use remaing in unit test
1 parent 63d742c commit fcc0fca

File tree

1 file changed

+29
-47
lines changed

1 file changed

+29
-47
lines changed

tests/extra/job_management/test_thread_worker.py

Lines changed: 29 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -222,31 +222,28 @@ def worker_pool(self) -> Iterator[_TaskThreadPool]:
222222
pool.shutdown()
223223

224224
def test_no_tasks(self, worker_pool):
225-
results, remaining = worker_pool.process_futures(timeout=10)
225+
results = worker_pool.process_futures(timeout=10)
226226
assert results == []
227-
assert remaining == 0
228227

229228
def test_submit_and_process(self, worker_pool):
230229
worker_pool.submit_task(DummyTask(job_id="j-123", df_idx=0))
231-
results, remaining = worker_pool.process_futures(timeout=10)
230+
results = worker_pool.process_futures(timeout=10)
232231
assert results == [
233232
_TaskResult(job_id="j-123", df_idx=0, db_update={"status": "dummified"}, stats_update={"dummy": 1}),
234233
]
235-
assert remaining == 0
236234

237235
def test_submit_and_process_zero_timeout(self, worker_pool):
238236
worker_pool.submit_task(DummyTask(job_id="j-123", df_idx=0))
239237
# Trigger context switch
240238
time.sleep(0.1)
241-
results, remaining = worker_pool.process_futures(timeout=0)
239+
results = worker_pool.process_futures(timeout=0)
242240
assert results == [
243241
_TaskResult(job_id="j-123", df_idx=0, db_update={"status": "dummified"}, stats_update={"dummy": 1}),
244242
]
245-
assert remaining == 0
246243

247244
def test_submit_and_process_with_error(self, worker_pool):
248245
worker_pool.submit_task(DummyTask(job_id="j-666", df_idx=0))
249-
results, remaining = worker_pool.process_futures(timeout=10)
246+
results = worker_pool.process_futures(timeout=10)
250247
assert results == [
251248
_TaskResult(
252249
job_id="j-666",
@@ -255,28 +252,26 @@ def test_submit_and_process_with_error(self, worker_pool):
255252
stats_update={"threaded task failed": 1},
256253
),
257254
]
258-
assert remaining == 0
255+
259256

260257
def test_submit_and_process_iterative(self, worker_pool):
261258
worker_pool.submit_task(NopTask(job_id="j-1", df_idx=1))
262-
results, remaining = worker_pool.process_futures(timeout=1)
259+
results = worker_pool.process_futures(timeout=1)
263260
assert results == [_TaskResult(job_id="j-1", df_idx=1)]
264-
assert remaining == 0
265261

266262
# Add some more
267263
worker_pool.submit_task(NopTask(job_id="j-22", df_idx=22))
268264
worker_pool.submit_task(NopTask(job_id="j-222", df_idx=222))
269-
results, remaining = worker_pool.process_futures(timeout=1)
265+
results = worker_pool.process_futures(timeout=1)
270266
assert results == [_TaskResult(job_id="j-22", df_idx=22), _TaskResult(job_id="j-222", df_idx=222)]
271-
assert remaining == 0
272267

273268
def test_submit_multiple_simple(self, worker_pool):
274269
# A bunch of dummy tasks
275270
for j in range(5):
276271
worker_pool.submit_task(NopTask(job_id=f"j-{j}", df_idx=j))
277272

278273
# Process all of them (non-zero timeout, which should be plenty of time for all of them to finish)
279-
results, remaining = worker_pool.process_futures(timeout=1)
274+
results = worker_pool.process_futures(timeout=1)
280275
expected = [_TaskResult(job_id=f"j-{j}", df_idx=j) for j in range(5)]
281276
assert sorted(results, key=lambda r: r.job_id) == expected
282277

@@ -297,25 +292,24 @@ def test_submit_multiple_blocking_and_failing(self, worker_pool):
297292
)
298293

299294
# Initial state: nothing happened yet
300-
results, remaining = worker_pool.process_futures(timeout=0)
301-
assert (results, remaining) == ([], n)
295+
results = worker_pool.process_futures(timeout=0)
296+
assert results == []
302297

303298
# No changes even after timeout
304-
results, remaining = worker_pool.process_futures(timeout=0.1)
305-
assert (results, remaining) == ([], n)
299+
results = worker_pool.process_futures(timeout=0.1)
300+
assert results == []
306301

307302
# Set one event and wait for corresponding result
308303
events[0].set()
309-
results, remaining = worker_pool.process_futures(timeout=0.1)
304+
results = worker_pool.process_futures(timeout=0.1)
310305
assert results == [
311306
_TaskResult(job_id="j-0", df_idx=0, db_update={"status": "all fine"}),
312307
]
313-
assert remaining == n - 1
314308

315309
# Release all but one event
316310
for j in range(n - 1):
317311
events[j].set()
318-
results, remaining = worker_pool.process_futures(timeout=0.1)
312+
results = worker_pool.process_futures(timeout=0.1)
319313
assert results == [
320314
_TaskResult(job_id="j-1", df_idx=1, db_update={"status": "all fine"}),
321315
_TaskResult(job_id="j-2", df_idx=2, db_update={"status": "all fine"}),
@@ -326,22 +320,20 @@ def test_submit_multiple_blocking_and_failing(self, worker_pool):
326320
stats_update={"threaded task failed": 1},
327321
),
328322
]
329-
assert remaining == 1
330323

331324
# Release all events
332325
for j in range(n):
333326
events[j].set()
334-
results, remaining = worker_pool.process_futures(timeout=0.1)
327+
results = worker_pool.process_futures(timeout=0.1)
335328
assert results == [
336329
_TaskResult(job_id="j-4", df_idx=4, db_update={"status": "all fine"}),
337330
]
338-
assert remaining == 0
339331

340332
def test_shutdown(self, worker_pool):
341333
# Before shutdown
342334
worker_pool.submit_task(NopTask(job_id="j-123", df_idx=0))
343-
results, remaining = worker_pool.process_futures(timeout=0.1)
344-
assert (results, remaining) == ([_TaskResult(job_id="j-123", df_idx=0)], 0)
335+
results = worker_pool.process_futures(timeout=0.1)
336+
assert results == [_TaskResult(job_id="j-123", df_idx=0)]
345337

346338
worker_pool.shutdown()
347339

@@ -355,7 +347,7 @@ def test_job_start_task(self, worker_pool, dummy_backend, caplog):
355347
task = _JobStartTask(job_id=job.job_id, df_idx=0, root_url=dummy_backend.connection.root_url, bearer_token=None)
356348
worker_pool.submit_task(task)
357349

358-
results, remaining = worker_pool.process_futures(timeout=1)
350+
results = worker_pool.process_futures(timeout=1)
359351
assert results == [
360352
_TaskResult(
361353
job_id="job-000",
@@ -364,7 +356,6 @@ def test_job_start_task(self, worker_pool, dummy_backend, caplog):
364356
stats_update={"job start": 1},
365357
)
366358
]
367-
assert remaining == 0
368359
assert caplog.messages == []
369360

370361
def test_job_start_task_failure(self, worker_pool, dummy_backend, caplog):
@@ -375,13 +366,12 @@ def test_job_start_task_failure(self, worker_pool, dummy_backend, caplog):
375366
task = _JobStartTask(job_id=job.job_id, df_idx=0, root_url=dummy_backend.connection.root_url, bearer_token=None)
376367
worker_pool.submit_task(task)
377368

378-
results, remaining = worker_pool.process_futures(timeout=1)
369+
results = worker_pool.process_futures(timeout=1)
379370
assert results == [
380371
_TaskResult(
381372
job_id="job-000", df_idx=0, db_update={"status": "start_failed"}, stats_update={"start_job error": 1}
382373
)
383374
]
384-
assert remaining == 0
385375
assert caplog.messages == [
386376
"Failed to start job 'job-000': OpenEoApiError('[500] Internal: No job starting for you, buddy')"
387377
]
@@ -444,10 +434,9 @@ def test_submit_task_creates_pool(self, thread_pool):
444434
assert "NopTask" in thread_pool._pools
445435

446436
# Process to complete the task
447-
results, remaining = thread_pool.process_futures(timeout=0.1)
437+
results = thread_pool.process_futures(timeout=0.1)
448438
assert len(results) == 1
449439
assert results[0].job_id == "j-1"
450-
assert remaining == {"NopTask": 0}
451440

452441
def test_submit_task_uses_config(self, configured_pool):
453442
"""Test that pool creation uses configuration."""
@@ -482,9 +471,8 @@ def test_submit_multiple_task_types(self, thread_pool):
482471

483472
def test_process_futures_updates_empty(self, thread_pool):
484473
"""Test process futures with no pools."""
485-
results, remaining = thread_pool.process_futures(timeout=0)
474+
results = thread_pool.process_futures(timeout=0)
486475
assert results == []
487-
assert remaining == {}
488476

489477
def test_process_futures_updates_multiple_pools(self, thread_pool):
490478
"""Test processing updates across multiple pools."""
@@ -493,7 +481,7 @@ def test_process_futures_updates_multiple_pools(self, thread_pool):
493481
thread_pool.submit_task(NopTask(job_id="j-2", df_idx=2)) # NopTask pool
494482
thread_pool.submit_task(DummyTask(job_id="j-3", df_idx=3)) # DummyTask pool
495483

496-
results, remaining = thread_pool.process_futures(timeout=0.1)
484+
results = thread_pool.process_futures(timeout=0.1)
497485

498486
assert len(results) == 3
499487

@@ -521,24 +509,22 @@ def test_process_futures_updates_partial_completion(self):
521509
pool.submit_task(quick_task) # NopTask pool
522510

523511
# Process with timeout=0 - only quick task should complete
524-
results, remaining = pool.process_futures(timeout=0)
512+
results = pool.process_futures(timeout=0)
525513

526514
# Only quick task completed
527515
assert len(results) == 1
528516
assert results[0].job_id == "j-quick"
529517

530518
# Blocking task still pending
531-
assert remaining == {"BlockingTask": 1, "NopTask": 0}
532519
assert pool.num_pending_tasks() == 1
533520
assert pool.num_pending_tasks("BlockingTask") == 1
534521

535522
# Release blocking task and process again
536523
event.set()
537-
results2, remaining2 = pool.process_futures(timeout=0.1)
524+
results2 = pool.process_futures(timeout=0.1)
538525

539526
assert len(results2) == 1
540527
assert results2[0].job_id == "j-block"
541-
assert remaining2 == {"BlockingTask": 0, "NopTask": 0}
542528

543529
pool.shutdown()
544530

@@ -625,7 +611,7 @@ def execute(self) -> _TaskResult:
625611
assert pool.num_pending_tasks() == 1
626612

627613
# Process it
628-
results, _ = pool.process_futures(timeout=0.1)
614+
results = pool.process_futures(timeout=0.1)
629615
assert len(results) == 1
630616
assert results[0].job_id == "j-1"
631617

@@ -649,10 +635,9 @@ def submit_tasks(start_idx: int):
649635
assert thread_pool.num_pending_tasks() == 15
650636

651637
# Process them all
652-
results, remaining = thread_pool.process_futures(timeout=0.5)
638+
results = thread_pool.process_futures(timeout=0.5)
653639

654640
assert len(results) == 15
655-
assert remaining == {"NopTask": 0}
656641

657642
def test_pool_parallelism_with_blocking_tasks(self):
658643
"""Test that multiple workers allow parallel execution."""
@@ -678,9 +663,8 @@ def test_pool_parallelism_with_blocking_tasks(self):
678663
for event in events:
679664
event.set()
680665

681-
results, remaining = pool.process_futures(timeout=0.5)
666+
results = pool.process_futures(timeout=0.5)
682667
assert len(results) == 5
683-
assert remaining == {"BlockingTask": 0}
684668

685669
for result in results:
686670
assert result.job_id.startswith("j-block-")
@@ -693,15 +677,14 @@ def test_task_with_error_handling(self, thread_pool):
693677
thread_pool.submit_task(DummyTask(job_id="j-666", df_idx=0))
694678

695679
# Process it
696-
results, remaining = thread_pool.process_futures(timeout=0.1)
680+
results = thread_pool.process_futures(timeout=0.1)
697681

698682
# Should get error result
699683
assert len(results) == 1
700684
result = results[0]
701685
assert result.job_id == "j-666"
702686
assert result.db_update == {"status": "threaded task failed"}
703687
assert result.stats_update == {"threaded task failed": 1}
704-
assert remaining == {"DummyTask": 0}
705688

706689
def test_mixed_success_and_error_tasks(self, thread_pool):
707690
"""Test mix of successful and failing tasks."""
@@ -711,11 +694,10 @@ def test_mixed_success_and_error_tasks(self, thread_pool):
711694
thread_pool.submit_task(DummyTask(job_id="j-3", df_idx=3)) # Success
712695

713696
# Process all
714-
results, remaining = thread_pool.process_futures(timeout=0.1)
697+
results = thread_pool.process_futures(timeout=0.1)
715698

716699
# Should get 3 results
717700
assert len(results) == 3
718-
assert remaining == {"DummyTask": 0}
719701

720702
# Check results
721703
success_results = [r for r in results if r.job_id != "j-666"]

0 commit comments

Comments
 (0)