|
40 | 40 | def test_publish_event( |
41 | 41 | dask_client: distributed.Client, job_id: str, task_owner: TaskOwner |
42 | 42 | ): |
43 | | - dask_pub = distributed.Pub("some_topic", client=dask_client) |
44 | | - dask_sub = distributed.Sub("some_topic", client=dask_client) |
45 | 43 | event_to_publish = TaskProgressEvent( |
46 | 44 | job_id=job_id, |
47 | 45 | msg="the log", |
48 | 46 | progress=1, |
49 | 47 | task_owner=task_owner, |
50 | 48 | ) |
51 | | - publish_event(dask_pub=dask_pub, event=event_to_publish) |
| 49 | + dask_client.log_event("some_topic", event_to_publish.model_dump_json()) |
| 50 | + # publish_event(dask_pub=dask_pub, event=event_to_publish) |
52 | 51 |
|
53 | 52 | # NOTE: this tests runs a sync dask client, |
54 | 53 | # and the CI seems to have sometimes difficulties having this run in a reasonable time |
55 | 54 | # hence the long time out |
56 | | - message = dask_sub.get(timeout=DASK_TESTING_TIMEOUT_S) |
57 | | - assert message is not None |
58 | | - assert isinstance(message, str) |
59 | | - received_task_log_event = TaskProgressEvent.model_validate_json(message) |
| 55 | + events = dask_client.get_events("some_topic") |
| 56 | + assert events is not None |
| 57 | + assert isinstance(events, tuple) |
| 58 | + assert len(events) == 1 |
| 59 | + assert isinstance(events[0], tuple) |
| 60 | + received_task_log_event = TaskProgressEvent.model_validate_json(events[0][1]) |
60 | 61 | assert received_task_log_event == event_to_publish |
61 | 62 |
|
| 63 | + # message = dask_sub.get(timeout=DASK_TESTING_TIMEOUT_S) |
| 64 | + # assert message is not None |
| 65 | + # assert isinstance(message, str) |
| 66 | + # received_task_log_event = TaskProgressEvent.model_validate_json(message) |
| 67 | + # assert received_task_log_event == event_to_publish |
| 68 | + |
62 | 69 |
|
63 | 70 | async def test_publish_event_async( |
64 | 71 | async_dask_client: distributed.Client, job_id: str, task_owner: TaskOwner |
|
0 commit comments