Skip to content

Commit 9d56ce4

Browse files
Sample pipethrough (#156)
New service to consume a message from a queue, and forward it to another. The sample producer and sample consumer services allow a message to be sent between them. The sample pipethrough service produces a test service which can be used to both receive and send messages in a recipe. This is useful to test features that need some basic functionality over multiple services, such as distributed tracing.
1 parent 5d06b25 commit 9d56ce4

File tree

2 files changed

+45
-0
lines changed

2 files changed

+45
-0
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ workflows = "workflows"
3939

4040
[project.entry-points."workflows.services"]
4141
SampleConsumer = "workflows.services.sample_consumer:SampleConsumer"
42+
SamplePipethrough = "workflows.services.sample_pipethrough:SamplePipethrough"
4243
SampleProducer = "workflows.services.sample_producer:SampleProducer"
4344
SampleTxn = "workflows.services.sample_transaction:SampleTxn"
4445
SampleTxnProducer = "workflows.services.sample_transaction:SampleTxnProducer"
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
from __future__ import annotations
2+
3+
import json
4+
import time
5+
6+
import workflows.recipe
7+
from workflows.services.common_service import CommonService
8+
9+
10+
class SamplePipethrough(CommonService):
11+
"""An example services building on top of the workflow.services architecture,
12+
demonstrating how this architecture can be used.
13+
This services consumes messages off a queue, and forwards to another."""
14+
15+
# Human readable service name
16+
_service_name = "Message Pipethrough"
17+
18+
# Logger name
19+
_logger_name = "workflows.service.sample_pipethrough"
20+
21+
def initializing(self):
22+
"""Subscribe to a channel."""
23+
workflows.recipe.wrap_subscribe(
24+
self._transport,
25+
"transient.destination",
26+
self.process,
27+
)
28+
29+
def process(self, rw, header, message):
30+
"""Consume message and send to output pipe."""
31+
t = (time.time() % 1000) * 1000
32+
33+
if header:
34+
header_str = json.dumps(header, indent=2) + "\n" + "----------------" + "\n"
35+
else:
36+
header_str = ""
37+
if isinstance(message, dict):
38+
message = json.dumps(message, indent=2) + "\n" + "----------------" + "\n"
39+
40+
self.log.info(
41+
f"=== Consume ====\n{header_str}{message}\nReceived message @{t:10.3f} ms"
42+
)
43+
44+
rw.send(0)

0 commit comments

Comments
 (0)