@@ -71,33 +71,33 @@ async def initialize(
71
71
)
72
72
self ._engine_state_slice = EngineStateSlice ()
73
73
74
- await self ._publish_runs_advise_refetch_async ()
74
+ await self ._publish_runs_advise_refetch_async (run_id = run_id )
75
75
76
- async def clean_up_current_run (self ) -> None :
77
- """Publish final refetch and unsubscribe flags."""
78
- await self ._publish_runs_advise_refetch_async ()
79
- await self ._publish_runs_advise_unsubscribe_async ()
76
+ async def clean_up_run (self , run_id : str ) -> None :
77
+ """Publish final refetch and unsubscribe flags for the given run ."""
78
+ await self ._publish_runs_advise_refetch_async (run_id = run_id )
79
+ await self ._publish_runs_advise_unsubscribe_async (run_id = run_id )
80
80
81
81
async def _publish_current_command (self ) -> None :
82
82
"""Publishes the equivalent of GET /runs/:runId/commands?cursor=null&pageLength=1."""
83
83
await self ._client .publish_advise_refetch_async (
84
84
topic = Topics .RUNS_CURRENT_COMMAND
85
85
)
86
86
87
- async def _publish_runs_advise_refetch_async (self ) -> None :
87
+ async def _publish_runs_advise_refetch_async (self , run_id : str ) -> None :
88
88
"""Publish a refetch flag for relevant runs topics."""
89
+ await self ._client .publish_advise_refetch_async (topic = Topics .RUNS )
90
+
89
91
if self ._run_hooks is not None :
90
- await self ._client .publish_advise_refetch_async (topic = Topics .RUNS )
91
92
await self ._client .publish_advise_refetch_async (
92
- topic = f"{ Topics .RUNS } /{ self . _run_hooks . run_id } "
93
+ topic = f"{ Topics .RUNS } /{ run_id } "
93
94
)
94
95
95
- async def _publish_runs_advise_unsubscribe_async (self ) -> None :
96
+ async def _publish_runs_advise_unsubscribe_async (self , run_id : str ) -> None :
96
97
"""Publish an unsubscribe flag for relevant runs topics."""
97
- if self ._run_hooks is not None :
98
- await self ._client .publish_advise_unsubscribe_async (
99
- topic = f"{ Topics .RUNS } /{ self ._run_hooks .run_id } "
100
- )
98
+ await self ._client .publish_advise_unsubscribe_async (
99
+ topic = f"{ Topics .RUNS } /{ run_id } "
100
+ )
101
101
102
102
async def _handle_current_command_change (self ) -> None :
103
103
"""Publish a refetch flag if the current command has changed."""
@@ -121,7 +121,9 @@ async def _handle_engine_status_change(self) -> None:
121
121
and self ._engine_state_slice .state_summary_status
122
122
!= current_state_summary .status
123
123
):
124
- await self ._publish_runs_advise_refetch_async ()
124
+ await self ._publish_runs_advise_refetch_async (
125
+ run_id = self ._run_hooks .run_id
126
+ )
125
127
self ._engine_state_slice .state_summary_status = (
126
128
current_state_summary .status
127
129
)
0 commit comments