Skip to content

Commit 4d79b87

Browse files
committed
fix unit tests
1 parent fcc0fca commit 4d79b87

File tree

1 file changed

+41
-42
lines changed

1 file changed

+41
-42
lines changed

tests/extra/job_management/test_thread_worker.py

Lines changed: 41 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ def test_job_download_success(self, requests_mock: Mocker, tmp_path: Path):
123123
assert result.df_idx == df_idx
124124

125125
# Verify stats update for the MultiBackendJobManager
126-
assert result.stats_update == {"job download": 1}
126+
assert result.stats_update == {'files downloaded': 1, "job download": 1}
127127

128128
# Verify download content (crucial part of the unit test)
129129
downloaded_file = download_dir / "result.data"
@@ -172,7 +172,7 @@ def test_job_download_failure(self, requests_mock: Mocker, tmp_path: Path):
172172
assert result.df_idx == df_idx
173173

174174
# Verify stats update for the MultiBackendJobManager
175-
assert result.stats_update == {"job download error": 1}
175+
assert result.stats_update == {'files downloaded': 0, "job download error": 1}
176176

177177
# Verify no file was created (or only empty/failed files)
178178
assert not any(p.is_file() for p in download_dir.glob("*"))
@@ -430,8 +430,8 @@ def test_submit_task_creates_pool(self, thread_pool):
430430
thread_pool.submit_task(task)
431431

432432
# Pool should be created
433-
assert thread_pool.list_pools() == ["NopTask"]
434-
assert "NopTask" in thread_pool._pools
433+
assert thread_pool.list_pools() == ["default"]
434+
assert "default" in thread_pool._pools
435435

436436
# Process to complete the task
437437
results = thread_pool.process_futures(timeout=0.1)
@@ -443,31 +443,33 @@ def test_submit_task_uses_config(self, configured_pool):
443443
task = NopTask(job_id="j-1", df_idx=1)
444444

445445
# Submit task - should create pool with configured workers
446-
configured_pool.submit_task(task)
446+
configured_pool.submit_task(task, "NopTask")
447+
448+
447449

448450
assert "NopTask" in configured_pool._pools
449451
assert "NopTask" in configured_pool.list_pools()
452+
assert "DummyTask" not in configured_pool.list_pools()
450453

451454
def test_submit_multiple_task_types(self, thread_pool):
452455
"""Test submitting different task types to different pools."""
453456
# Submit different task types
454457
task1 = NopTask(job_id="j-1", df_idx=1)
455458
task2 = DummyTask(job_id="j-2", df_idx=2)
456-
task3 = NopTask(job_id="j-3", df_idx=3)
459+
task3 = DummyTask(job_id="j-3", df_idx=3)
457460

458461
thread_pool.submit_task(task1) # Goes to "NopTask" pool
459462
thread_pool.submit_task(task2) # Goes to "DummyTask" pool
460-
thread_pool.submit_task(task3) # Goes to "NopTask" pool (existing)
463+
thread_pool.submit_task(task3, "seperate") # Goes to "DummyTask" pool
461464

462465
# Should have 2 pools
463466
pools = sorted(thread_pool.list_pools())
464-
assert pools == ["DummyTask", "NopTask"]
467+
assert pools == ["default", "seperate"]
465468

466469
# Check pending tasks
467-
assert thread_pool.num_pending_tasks() == 3
468-
assert thread_pool.num_pending_tasks("NopTask") == 2
469-
assert thread_pool.num_pending_tasks("DummyTask") == 1
470-
assert thread_pool.num_pending_tasks("NonExistent") == 0
470+
assert thread_pool.number_pending_tasks() == 3
471+
assert thread_pool.number_pending_tasks("default") == 2
472+
assert thread_pool.number_pending_tasks("seperate") == 1
471473

472474
def test_process_futures_updates_empty(self, thread_pool):
473475
"""Test process futures with no pools."""
@@ -491,8 +493,6 @@ def test_process_futures_updates_multiple_pools(self, thread_pool):
491493
assert len(dummy_results) == 1
492494

493495
# All tasks should be completed
494-
assert remaining == {"NopTask": 0, "DummyTask": 0}
495-
496496
def test_process_futures_updates_partial_completion(self):
497497
"""Test processing when some tasks are still running."""
498498
# Use a pool with blocking tasks
@@ -505,8 +505,8 @@ def test_process_futures_updates_partial_completion(self):
505505
# Create a quick task
506506
quick_task = NopTask(job_id="j-quick", df_idx=1)
507507

508-
pool.submit_task(blocking_task) # BlockingTask pool
509-
pool.submit_task(quick_task) # NopTask pool
508+
pool.submit_task(blocking_task, "blocking") # BlockingTask pool
509+
pool.submit_task(quick_task, "quick") # NopTask pool
510510

511511
# Process with timeout=0 - only quick task should complete
512512
results = pool.process_futures(timeout=0)
@@ -516,8 +516,8 @@ def test_process_futures_updates_partial_completion(self):
516516
assert results[0].job_id == "j-quick"
517517

518518
# Blocking task still pending
519-
assert pool.num_pending_tasks() == 1
520-
assert pool.num_pending_tasks("BlockingTask") == 1
519+
assert pool.number_pending_tasks() == 1
520+
assert pool.number_pending_tasks("blocking") == 1
521521

522522
# Release blocking task and process again
523523
event.set()
@@ -531,48 +531,47 @@ def test_process_futures_updates_partial_completion(self):
531531
def test_num_pending_tasks(self, thread_pool):
532532
"""Test counting pending tasks."""
533533
# Initially empty
534-
assert thread_pool.num_pending_tasks() == 0
535-
assert thread_pool.num_pending_tasks("NopTask") == 0
534+
assert thread_pool.number_pending_tasks() == 0
535+
assert thread_pool.number_pending_tasks("default") == 0
536536

537537
# Add some tasks
538538
thread_pool.submit_task(NopTask(job_id="j-1", df_idx=1))
539539
thread_pool.submit_task(NopTask(job_id="j-2", df_idx=2))
540-
thread_pool.submit_task(DummyTask(job_id="j-3", df_idx=3))
540+
thread_pool.submit_task(DummyTask(job_id="j-3", df_idx=3), "dummy")
541541

542542
# Check totals
543-
assert thread_pool.num_pending_tasks() == 3
544-
assert thread_pool.num_pending_tasks("NopTask") == 2
545-
assert thread_pool.num_pending_tasks("DummyTask") == 1
546-
assert thread_pool.num_pending_tasks("NonExistentPool") == 0
547-
543+
assert thread_pool.number_pending_tasks() == 3
544+
assert thread_pool.number_pending_tasks("default") == 2
545+
assert thread_pool.number_pending_tasks("dummy") == 1
546+
548547
# Process all
549548
thread_pool.process_futures(timeout=0.1)
550549

551550
# Should be empty
552-
assert thread_pool.num_pending_tasks() == 0
553-
assert thread_pool.num_pending_tasks("NopTask") == 0
551+
assert thread_pool.number_pending_tasks() == 0
552+
assert thread_pool.number_pending_tasks("default") == 0
554553

555554
def test_shutdown_specific_pool(self):
556555
"""Test shutting down a specific pool."""
557556
# Create fresh pool for destructive test
558557
pool = _JobManagerWorkerThreadPool()
559558

560559
# Create two pools
561-
pool.submit_task(NopTask(job_id="j-1", df_idx=1)) # NopTask pool
562-
pool.submit_task(DummyTask(job_id="j-2", df_idx=2)) # DummyTask pool
560+
pool.submit_task(NopTask(job_id="j-1", df_idx=1), "notask") # NopTask pool
561+
pool.submit_task(DummyTask(job_id="j-2", df_idx=2), "dummy") # DummyTask pool
563562

564-
assert sorted(pool.list_pools()) == ["DummyTask", "NopTask"]
563+
assert sorted(pool.list_pools()) == ["dummy", "notask"]
565564

566565
# Shutdown NopTask pool only
567-
pool.shutdown("NopTask")
566+
pool.shutdown("notask")
568567

569568
# Only DummyTask pool should remain
570-
assert pool.list_pools() == ["DummyTask"]
569+
assert pool.list_pools() == ["dummy"]
571570

572571
# Can't submit to shutdown pool
573572
# Actually, it will create a new pool since we deleted it
574573
pool.submit_task(NopTask(job_id="j-3", df_idx=3)) # Creates new NopTask pool
575-
assert sorted(pool.list_pools()) == ["DummyTask", "NopTask"]
574+
assert sorted(pool.list_pools()) == [ "default", "dummy"]
576575

577576
pool.shutdown()
578577

@@ -582,8 +581,8 @@ def test_shutdown_all(self):
582581
pool = _JobManagerWorkerThreadPool()
583582

584583
# Create multiple pools
585-
pool.submit_task(NopTask(job_id="j-1", df_idx=1))
586-
pool.submit_task(DummyTask(job_id="j-2", df_idx=2))
584+
pool.submit_task(NopTask(job_id="j-1", df_idx=1), "notask") # NopTask pool
585+
pool.submit_task(DummyTask(job_id="j-2", df_idx=2), "dummy")
587586

588587
assert len(pool.list_pools()) == 2
589588

@@ -604,11 +603,11 @@ def execute(self) -> _TaskResult:
604603
pool = _JobManagerWorkerThreadPool()
605604

606605
task = CustomTask(job_id="j-1", df_idx=1)
607-
pool.submit_task(task)
606+
pool.submit_task(task, "custom_pool")
608607

609608
# Pool should be named after class
610-
assert pool.list_pools() == ["CustomTask"]
611-
assert pool.num_pending_tasks() == 1
609+
assert pool.list_pools() == ["custom_pool"]
610+
assert pool.number_pending_tasks() == 1
612611

613612
# Process it
614613
results = pool.process_futures(timeout=0.1)
@@ -631,8 +630,8 @@ def submit_tasks(start_idx: int):
631630
concurrent.futures.wait(futures)
632631

633632
# Should have all tasks in one pool
634-
assert thread_pool.list_pools() == ["NopTask"]
635-
assert thread_pool.num_pending_tasks() == 15
633+
assert thread_pool.list_pools() == ["default"]
634+
assert thread_pool.number_pending_tasks() == 15
636635

637636
# Process them all
638637
results = thread_pool.process_futures(timeout=0.5)
@@ -657,7 +656,7 @@ def test_pool_parallelism_with_blocking_tasks(self):
657656
))
658657

659658
# Initially all pending
660-
assert pool.num_pending_tasks() == 5
659+
assert pool.number_pending_tasks() == 5
661660

662661
# Release all events at once
663662
for event in events:

0 commit comments

Comments
 (0)