Skip to content

Commit 2854546

Browse files
committed
Rename run_replayer, remove need to return self from init
1 parent 602fd42 commit 2854546

File tree

7 files changed

+40
-70
lines changed

7 files changed

+40
-70
lines changed

README.md

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1516,9 +1516,8 @@ class AuthenticationPlugin(Plugin):
15161516
def __init__(self, api_key: str):
15171517
self.api_key = api_key
15181518
1519-
def init_client_plugin(self, next: Plugin) -> Plugin:
1519+
def init_client_plugin(self, next: Plugin) -> None:
15201520
self.next_client_plugin = next
1521-
return self
15221521
15231522
def configure_client(self, config: ClientConfig) -> ClientConfig:
15241523
# Modify client configuration
@@ -1552,16 +1551,15 @@ Here's an example of a worker plugin that adds custom monitoring:
15521551
import temporalio
15531552
from contextlib import asynccontextmanager
15541553
from typing import AsyncIterator
1555-
from temporalio.worker import Plugin, WorkerConfig, Worker, ReplayerConfig, Worker, Replayer, WorkflowReplayResult
1554+
from temporalio.worker import Plugin, WorkerConfig, ReplayerConfig, Worker, Replayer, WorkflowReplayResult
15561555
import logging
15571556

15581557
class MonitoringPlugin(Plugin):
15591558
def __init__(self):
15601559
self.logger = logging.getLogger(__name__)
15611560

1562-
def init_worker_plugin(self, next: Plugin) -> Plugin:
1561+
def init_worker_plugin(self, next: Plugin) -> None:
15631562
self.next_worker_plugin = next
1564-
return self
15651563

15661564
def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
15671565
# Modify worker configuration
@@ -1581,14 +1579,14 @@ class MonitoringPlugin(Plugin):
15811579
return self.next_worker_plugin.configure_replayer(config)
15821580

15831581
@asynccontextmanager
1584-
async def workflow_replay(
1582+
async def run_replayer(
15851583
self,
15861584
replayer: Replayer,
15871585
histories: AsyncIterator[temporalio.client.WorkflowHistory],
15881586
) -> AsyncIterator[AsyncIterator[WorkflowReplayResult]]:
15891587
self.logger.info("Starting replay execution")
15901588
try:
1591-
async with self.next_worker_plugin.workflow_replay(replayer, histories) as results:
1589+
async with self.next_worker_plugin.run_replayer(replayer, histories) as results:
15921590
yield results
15931591
finally:
15941592
self.logger.info("Replay execution completed")
@@ -1614,13 +1612,11 @@ from temporalio.worker import Plugin as WorkerPlugin, WorkerConfig, ReplayerConf
16141612

16151613

16161614
class UnifiedPlugin(ClientPlugin, WorkerPlugin):
1617-
def init_client_plugin(self, next: ClientPlugin) -> ClientPlugin:
1615+
def init_client_plugin(self, next: ClientPlugin) -> None:
16181616
self.next_client_plugin = next
1619-
return self
16201617

1621-
def init_worker_plugin(self, next: WorkerPlugin) -> WorkerPlugin:
1618+
def init_worker_plugin(self, next: WorkerPlugin) -> None:
16221619
self.next_worker_plugin = next
1623-
return self
16241620

16251621
def configure_client(self, config: ClientConfig) -> ClientConfig:
16261622
# Client-side customization
@@ -1646,12 +1642,12 @@ class UnifiedPlugin(ClientPlugin, WorkerPlugin):
16461642
config["data_converter"] = pydantic_data_converter
16471643
return config
16481644

1649-
async def workflow_replay(
1645+
async def run_replayer(
16501646
self,
16511647
replayer: Replayer,
16521648
histories: AsyncIterator[temporalio.client.WorkflowHistory],
16531649
) -> AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]]:
1654-
return self.next_worker_plugin.workflow_replay(replayer, histories)
1650+
return self.next_worker_plugin.run_replayer(replayer, histories)
16551651

16561652
# Create client with the unified plugin
16571653
client = await Client.connect(

temporalio/client.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,8 @@ async def connect(
191191

192192
root_plugin: Plugin = _RootPlugin()
193193
for plugin in reversed(plugins):
194-
root_plugin = plugin.init_client_plugin(root_plugin)
194+
plugin.init_client_plugin(root_plugin)
195+
root_plugin = plugin
195196

196197
service_client = await root_plugin.connect_service_client(connect_config)
197198

@@ -235,7 +236,8 @@ def __init__(
235236

236237
root_plugin: Plugin = _RootPlugin()
237238
for plugin in reversed(plugins):
238-
root_plugin = plugin.init_client_plugin(root_plugin)
239+
plugin.init_client_plugin(root_plugin)
240+
root_plugin = plugin
239241

240242
self._init_from_config(root_plugin.configure_client(config))
241243

@@ -7399,7 +7401,7 @@ def name(self) -> str:
73997401
return type(self).__module__ + "." + type(self).__qualname__
74007402

74017403
@abstractmethod
7402-
def init_client_plugin(self, next: Plugin) -> Plugin:
7404+
def init_client_plugin(self, next: Plugin) -> None:
74037405
"""Initialize this plugin in the plugin chain.
74047406
74057407
This method sets up the chain of responsibility pattern by storing a reference
@@ -7408,9 +7410,6 @@ def init_client_plugin(self, next: Plugin) -> Plugin:
74087410
74097411
Args:
74107412
next: The next plugin in the chain to delegate to.
7411-
7412-
Returns:
7413-
This plugin instance for method chaining.
74147413
"""
74157414

74167415
@abstractmethod
@@ -7447,7 +7446,7 @@ async def connect_service_client(
74477446

74487447

74497448
class _RootPlugin(Plugin):
7450-
def init_client_plugin(self, next: Plugin) -> Plugin:
7449+
def init_client_plugin(self, next: Plugin) -> None:
74517450
raise NotImplementedError()
74527451

74537452
def configure_client(self, config: ClientConfig) -> ClientConfig:

temporalio/contrib/openai_agents/_temporal_openai_agents.py

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -237,25 +237,19 @@ def __init__(
237237
self._model_params = model_params
238238
self._model_provider = model_provider
239239

240-
def init_client_plugin(
241-
self, next: temporalio.client.Plugin
242-
) -> temporalio.client.Plugin:
240+
def init_client_plugin(self, next: temporalio.client.Plugin) -> None:
243241
"""Set the next client plugin"""
244242
self.next_client_plugin = next
245-
return self
246243

247244
async def connect_service_client(
248245
self, config: temporalio.service.ConnectConfig
249246
) -> temporalio.service.ServiceClient:
250247
"""No modifications to service client"""
251248
return await self.next_client_plugin.connect_service_client(config)
252249

253-
def init_worker_plugin(
254-
self, next: temporalio.worker.Plugin
255-
) -> temporalio.worker.Plugin:
250+
def init_worker_plugin(self, next: temporalio.worker.Plugin) -> None:
256251
"""Set the next worker plugin"""
257252
self.next_worker_plugin = next
258-
return self
259253

260254
def configure_client(self, config: ClientConfig) -> ClientConfig:
261255
"""Configure the Temporal client for OpenAI agents integration.
@@ -320,14 +314,14 @@ def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig:
320314
return config
321315

322316
@asynccontextmanager
323-
async def workflow_replay(
317+
async def run_replayer(
324318
self,
325319
replayer: Replayer,
326320
histories: AsyncIterator[temporalio.client.WorkflowHistory],
327321
) -> AsyncIterator[AsyncIterator[WorkflowReplayResult]]:
328322
"""Set the OpenAI Overrides during replay"""
329323
with set_open_ai_agent_temporal_overrides(self._model_params):
330-
async with self.next_worker_plugin.workflow_replay(
324+
async with self.next_worker_plugin.run_replayer(
331325
replayer, histories
332326
) as results:
333327
yield results

temporalio/worker/_plugin.py

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ def name(self) -> str:
3535
return type(self).__module__ + "." + type(self).__qualname__
3636

3737
@abc.abstractmethod
38-
def init_worker_plugin(self, next: Plugin) -> Plugin:
38+
def init_worker_plugin(self, next: Plugin) -> None:
3939
"""Initialize this plugin in the plugin chain.
4040
4141
This method sets up the chain of responsibility pattern by storing a reference
@@ -44,9 +44,6 @@ def init_worker_plugin(self, next: Plugin) -> Plugin:
4444
4545
Args:
4646
next: The next plugin in the chain to delegate to.
47-
48-
Returns:
49-
This plugin instance for method chaining.
5047
"""
5148

5249
@abc.abstractmethod
@@ -92,7 +89,7 @@ def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig:
9289
"""
9390

9491
@abc.abstractmethod
95-
def workflow_replay(
92+
def run_replayer(
9693
self,
9794
replayer: Replayer,
9895
histories: AsyncIterator[WorkflowHistory],
@@ -101,7 +98,7 @@ def workflow_replay(
10198

10299

103100
class _RootPlugin(Plugin):
104-
def init_worker_plugin(self, next: Plugin) -> Plugin:
101+
def init_worker_plugin(self, next: Plugin) -> None:
105102
raise NotImplementedError()
106103

107104
def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
@@ -113,7 +110,7 @@ def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig:
113110
async def run_worker(self, worker: Worker) -> None:
114111
await worker._run()
115112

116-
def workflow_replay(
113+
def run_replayer(
117114
self,
118115
replayer: Replayer,
119116
histories: AsyncIterator[WorkflowHistory],

temporalio/worker/_replayer.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ def __init__(
8686
# Apply plugin configuration
8787
root_plugin: temporalio.worker.Plugin = _RootPlugin()
8888
for plugin in reversed(plugins):
89-
root_plugin = plugin.init_worker_plugin(root_plugin)
89+
plugin.init_worker_plugin(root_plugin)
90+
root_plugin = plugin
9091
self._config = root_plugin.configure_replayer(self._config)
9192
self._plugin = root_plugin
9293

@@ -175,7 +176,7 @@ def workflow_replay_iterator(
175176
An async iterator that returns replayed workflow results as they are
176177
replayed.
177178
"""
178-
return self._plugin.workflow_replay(self, histories)
179+
return self._plugin.run_replayer(self, histories)
179180

180181
@asynccontextmanager
181182
async def _workflow_replay_iterator(

temporalio/worker/_worker.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,8 @@ def __init__(
374374

375375
root_plugin: Plugin = _RootPlugin()
376376
for plugin in reversed(plugins):
377-
root_plugin = plugin.init_worker_plugin(root_plugin)
377+
plugin.init_worker_plugin(root_plugin)
378+
root_plugin = plugin
378379
config = root_plugin.configure_worker(config)
379380
self._plugin = root_plugin
380381

tests/test_plugins.py

Lines changed: 12 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,8 @@ class MyClientPlugin(temporalio.client.Plugin):
3737
def __init__(self):
3838
self.interceptor = TestClientInterceptor()
3939

40-
def init_client_plugin(self, next: Plugin) -> Plugin:
40+
def init_client_plugin(self, next: Plugin) -> None:
4141
self.next_client_plugin = next
42-
return self
4342

4443
def configure_client(self, config: ClientConfig) -> ClientConfig:
4544
config["namespace"] = "replaced_namespace"
@@ -74,17 +73,11 @@ async def test_client_plugin(client: Client, env: WorkflowEnvironment):
7473

7574

7675
class MyCombinedPlugin(temporalio.client.Plugin, temporalio.worker.Plugin):
77-
def init_worker_plugin(
78-
self, next: temporalio.worker.Plugin
79-
) -> temporalio.worker.Plugin:
76+
def init_worker_plugin(self, next: temporalio.worker.Plugin) -> None:
8077
self.next_worker_plugin = next
81-
return self
8278

83-
def init_client_plugin(
84-
self, next: temporalio.client.Plugin
85-
) -> temporalio.client.Plugin:
79+
def init_client_plugin(self, next: temporalio.client.Plugin) -> None:
8680
self.next_client_plugin = next
87-
return self
8881

8982
def configure_client(self, config: ClientConfig) -> ClientConfig:
9083
return self.next_client_plugin.configure_client(config)
@@ -104,20 +97,17 @@ async def run_worker(self, worker: Worker) -> None:
10497
def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig:
10598
return self.next_worker_plugin.configure_replayer(config)
10699

107-
def workflow_replay(
100+
def run_replayer(
108101
self,
109102
replayer: Replayer,
110103
histories: AsyncIterator[temporalio.client.WorkflowHistory],
111104
) -> AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]]:
112-
return self.next_worker_plugin.workflow_replay(replayer, histories)
105+
return self.next_worker_plugin.run_replayer(replayer, histories)
113106

114107

115108
class MyWorkerPlugin(temporalio.worker.Plugin):
116-
def init_worker_plugin(
117-
self, next: temporalio.worker.Plugin
118-
) -> temporalio.worker.Plugin:
109+
def init_worker_plugin(self, next: temporalio.worker.Plugin) -> None:
119110
self.next_worker_plugin = next
120-
return self
121111

122112
def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
123113
config["task_queue"] = "replaced_queue"
@@ -135,12 +125,12 @@ async def run_worker(self, worker: Worker) -> None:
135125
def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig:
136126
return self.next_worker_plugin.configure_replayer(config)
137127

138-
def workflow_replay(
128+
def run_replayer(
139129
self,
140130
replayer: Replayer,
141131
histories: AsyncIterator[temporalio.client.WorkflowHistory],
142132
) -> AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]]:
143-
return self.next_worker_plugin.workflow_replay(replayer, histories)
133+
return self.next_worker_plugin.run_replayer(replayer, histories)
144134

145135

146136
async def test_worker_plugin_basic_config(client: Client) -> None:
@@ -203,17 +193,11 @@ async def test_worker_sandbox_restrictions(client: Client) -> None:
203193

204194

205195
class ReplayCheckPlugin(temporalio.client.Plugin, temporalio.worker.Plugin):
206-
def init_worker_plugin(
207-
self, next: temporalio.worker.Plugin
208-
) -> temporalio.worker.Plugin:
196+
def init_worker_plugin(self, next: temporalio.worker.Plugin) -> None:
209197
self.next_worker_plugin = next
210-
return self
211198

212-
def init_client_plugin(
213-
self, next: temporalio.client.Plugin
214-
) -> temporalio.client.Plugin:
199+
def init_client_plugin(self, next: temporalio.client.Plugin) -> None:
215200
self.next_client_plugin = next
216-
return self
217201

218202
def configure_client(self, config: ClientConfig) -> ClientConfig:
219203
config["data_converter"] = pydantic_data_converter
@@ -237,14 +221,12 @@ async def connect_service_client(
237221
return await self.next_client_plugin.connect_service_client(config)
238222

239223
@asynccontextmanager
240-
async def workflow_replay(
224+
async def run_replayer(
241225
self,
242226
replayer: Replayer,
243227
histories: AsyncIterator[temporalio.client.WorkflowHistory],
244228
) -> AsyncIterator[AsyncIterator[WorkflowReplayResult]]:
245-
async with self.next_worker_plugin.workflow_replay(
246-
replayer, histories
247-
) as result:
229+
async with self.next_worker_plugin.run_replayer(replayer, histories) as result:
248230
yield result
249231

250232

0 commit comments

Comments
 (0)