Skip to content

Commit aec479f

Browse files
committed
Use own kafka callback
1 parent 5de624a commit aec479f

File tree

8 files changed

+127
-43
lines changed

8 files changed

+127
-43
lines changed

doc/architectural_decisions/009-kafka-streaming.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ central Kafka instance is not currently considered "reliable" in an experiment c
2323
streaming the documents will allow testing to be done. Kafka will eventually be deployed in a "reliable"
2424
way accessible to each instrument.
2525
- We will encode messages from bluesky using `msgpack` (with the `msgpack-numpy` extension), because:
26-
- It is the default encoder used by the upstream `bluesky-kafka` integration
26+
- It is the default encoder used by the upstream `bluesky-kafka` integration (though we are not using
27+
that integration directly, as it has been soft-deprecated upstream)
2728
- It is a schema-less encoder, meaning we do not have to write/maintain fixed schemas for all the
2829
documents allowed by `event-model`
2930
- It has reasonable performance in terms of encoding speed and message size
@@ -41,7 +42,7 @@ Encoding bluesky documents into JSON and then wrapping them in the
4142
was considered.
4243

4344
We chose `msgpack` instead of json strings + flatbuffers because:
44-
- It is more standard in the bluesky community (e.g. it is the default used in `bluesky-kafka`)
45+
- It is more standard in the bluesky community
4546
- Bluesky events will be streamed to a dedicated topic, which is unlikely to be confused with data
4647
using any other schema.
4748

doc/dev/kafka.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Kafka
22

3-
`ibex_bluesky_core` uses [the `bluesky-kafka` library](https://github.com/bluesky/bluesky-kafka) to send documents
3+
`ibex_bluesky_core` uses {py:obj}`~ibex_bluesky_core.callbacks.KafkaCallback` to send documents
44
emitted by the {py:obj}`~bluesky.run_engine.RunEngine` to Kafka. The Kafka callback is automatically added by
55
{py:obj}`ibex_bluesky_core.run_engine.get_run_engine`, and so no user configuration is required - the callback is always
66
enabled.

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ classifiers = [
4141

4242
dependencies = [
4343
"bluesky", # Bluesky framework
44-
"bluesky-kafka", # Bluesky-kafka integration
44+
"confluent-kafka", # Kafka producer
4545
"ophyd-async[ca] == 0.14.0", # Device abstraction. When changing, also change in doc/conf.py
4646
"lmfit", # Fitting
4747
"matplotlib", # Plotting

src/ibex_bluesky_core/callbacks/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
LiveFit,
3434
LiveFitLogger,
3535
)
36+
from ibex_bluesky_core.callbacks._kafka import KafkaCallback
3637
from ibex_bluesky_core.callbacks._plotting import LivePColorMesh, LivePlot, PlotPNGSaver, show_plot
3738
from ibex_bluesky_core.callbacks._utils import get_default_output_path
3839
from ibex_bluesky_core.fitting import FitMethod
@@ -49,6 +50,7 @@
4950
"DocLoggingCallback",
5051
"HumanReadableFileCallback",
5152
"ISISCallbacks",
53+
"KafkaCallback",
5254
"LiveFit",
5355
"LiveFitLogger",
5456
"LivePColorMesh",
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
import logging
2+
import os
3+
import socket
4+
from typing import Any
5+
6+
import msgpack_numpy
7+
from bluesky.callbacks import CallbackBase
8+
from confluent_kafka import Producer
9+
10+
logger = logging.getLogger(__name__)
11+
12+
13+
DEFAULT_KAFKA_BROKER = "livedata.isis.cclrc.ac.uk:31092"
14+
15+
16+
def get_kafka_topic_name() -> str:
17+
"""Get the name of the bluesky Kafka topic for this machine."""
18+
computer_name = os.environ.get("COMPUTERNAME", socket.gethostname()).upper()
19+
computer_name = computer_name.upper()
20+
if computer_name.startswith(("NDX", "NDH")):
21+
name = computer_name[3:]
22+
else:
23+
name = computer_name
24+
25+
return f"{name}_bluesky"
26+
27+
28+
class KafkaCallback(CallbackBase):
29+
"""Forward all bluesky documents to Kafka.
30+
31+
Documents are sent to kafka encoded using the MsgPack format with
32+
the ``msgpack_numpy`` extension to allow efficiently encoding arrays.
33+
34+
.. note::
35+
36+
This callback is automatically configured by
37+
:py:obj:`ibex_bluesky_core.run_engine.get_run_engine`, and does not need
38+
to be configured manually.
39+
"""
40+
41+
def __init__(
42+
self,
43+
*,
44+
bootstrap_servers: list[str] | None = None,
45+
topic: str | None = None,
46+
key: str,
47+
kafka_config: dict[str, Any],
48+
) -> None:
49+
super().__init__()
50+
51+
self._topic = topic or get_kafka_topic_name()
52+
self._key = key
53+
54+
if "bootstrap.servers" in kafka_config:
55+
raise ValueError(
56+
"Do not specify bootstrap.servers in kafka config, use bootstrap_servers argument."
57+
)
58+
59+
if bootstrap_servers is None:
60+
bootstrap_servers = [
61+
os.environ.get("IBEX_BLUESKY_CORE_KAFKA_BROKER", DEFAULT_KAFKA_BROKER)
62+
]
63+
64+
kafka_config["bootstrap.servers"] = ",".join(bootstrap_servers)
65+
66+
self._producer = Producer(kafka_config)
67+
68+
def __call__(
69+
self, name: str, doc: dict[str, Any], validate: bool = False
70+
) -> tuple[str, dict[str, Any]]:
71+
try:
72+
data = msgpack_numpy.dumps([name, doc])
73+
self._producer.produce(topic=self._topic, key=self._key, value=data)
74+
except Exception:
75+
# If we can't produce to kafka, log and carry on. We don't want
76+
# kafka failures to kill a scan - kafka is currently considered
77+
# 'non-critical'.
78+
logger.exception("Failed to publish Kafka message")
79+
80+
return name, doc

src/ibex_bluesky_core/run_engine/__init__.py

Lines changed: 4 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -3,46 +3,27 @@
33
import asyncio
44
import functools
55
import logging
6-
import os
7-
import socket
86
from collections.abc import Generator
97
from functools import cache
108
from threading import Event, Lock
119
from typing import Any, cast
1210

1311
import bluesky.preprocessors as bpp
14-
import msgpack_numpy
1512
from bluesky.run_engine import RunEngine, RunEngineResult
1613
from bluesky.utils import DuringTask, Msg, RunEngineControlException, RunEngineInterrupted
17-
from bluesky_kafka import Publisher
1814

19-
from ibex_bluesky_core.callbacks import DocLoggingCallback
15+
from ibex_bluesky_core.callbacks import DocLoggingCallback, KafkaCallback
2016
from ibex_bluesky_core.plan_stubs import CALL_QT_AWARE_MSG_KEY, CALL_SYNC_MSG_KEY
2117
from ibex_bluesky_core.preprocessors import add_rb_number_processor
2218
from ibex_bluesky_core.run_engine._msg_handlers import call_qt_aware_handler, call_sync_handler
2319
from ibex_bluesky_core.utils import is_matplotlib_backend_qt
2420
from ibex_bluesky_core.version import version
2521

26-
__all__ = ["get_kafka_topic_name", "get_run_engine", "run_plan"]
22+
__all__ = ["get_run_engine", "run_plan"]
2723

2824
logger = logging.getLogger(__name__)
2925

3026

31-
DEFAULT_KAFKA_BROKER = "livedata.isis.cclrc.ac.uk:31092"
32-
33-
34-
def get_kafka_topic_name() -> str:
35-
"""Get the name of the bluesky Kafka topic for this machine."""
36-
computer_name = os.environ.get("COMPUTERNAME", socket.gethostname()).upper()
37-
computer_name = computer_name.upper()
38-
if computer_name.startswith(("NDX", "NDH")):
39-
name = computer_name[3:]
40-
else:
41-
name = computer_name
42-
43-
return f"{name}_bluesky"
44-
45-
4627
class _DuringTask(DuringTask):
4728
def block(self, blocking_event: Event) -> None:
4829
"""On windows, event.wait() on the main thread is not interruptible by a CTRL-C.
@@ -120,12 +101,9 @@ def get_run_engine() -> RunEngine:
120101
log_callback = DocLoggingCallback()
121102
RE.subscribe(log_callback)
122103

123-
kafka_callback = Publisher(
124-
topic=get_kafka_topic_name(),
125-
bootstrap_servers=os.environ.get("IBEX_BLUESKY_CORE_KAFKA_BROKER", DEFAULT_KAFKA_BROKER),
104+
kafka_callback = KafkaCallback(
126105
key="doc",
127-
serializer=msgpack_numpy.dumps,
128-
producer_config={
106+
kafka_config={
129107
"enable.idempotence": True,
130108
"log_level": 0,
131109
"log.connection.close": False,

tests/callbacks/test_kafka.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import re
2+
from unittest import mock
3+
4+
import pytest
5+
6+
from ibex_bluesky_core.callbacks._kafka import KafkaCallback, get_kafka_topic_name
7+
8+
9+
def test_get_kafka_topic_name():
10+
with mock.patch("ibex_bluesky_core.callbacks._kafka.os.environ.get", return_value="FOO"):
11+
assert get_kafka_topic_name() == "FOO_bluesky"
12+
13+
with mock.patch("ibex_bluesky_core.callbacks._kafka.os.environ.get", return_value="NDXBAR"):
14+
assert get_kafka_topic_name() == "BAR_bluesky"
15+
16+
with mock.patch("ibex_bluesky_core.callbacks._kafka.os.environ.get", return_value="NDHBAZ"):
17+
assert get_kafka_topic_name() == "BAZ_bluesky"
18+
19+
20+
def test_init_kafka_callback_with_duplicate_bootstrap_servers():
21+
with pytest.raises(
22+
ValueError,
23+
match=re.escape(
24+
"Do not specify bootstrap.servers in kafka config, use bootstrap_servers argument."
25+
),
26+
):
27+
KafkaCallback(bootstrap_servers=["abc"], kafka_config={"bootstrap.servers": "foo"}, key="")
28+
29+
30+
def test_exceptions_suppressed():
31+
cb = KafkaCallback(bootstrap_servers=["abc"], kafka_config={}, key="")
32+
with mock.patch(
33+
"ibex_bluesky_core.callbacks._kafka.msgpack_numpy.dumps", side_effect=ValueError
34+
):
35+
cb("start", {})

tests/test_run_engine.py

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import threading
44
from collections.abc import Generator
55
from typing import Any
6-
from unittest import mock
76
from unittest.mock import MagicMock
87

98
import bluesky.plan_stubs as bps
@@ -12,7 +11,7 @@
1211
from bluesky.run_engine import RunEngineResult
1312
from bluesky.utils import Msg, RequestAbort, RunEngineInterrupted
1413

15-
from ibex_bluesky_core.run_engine import _DuringTask, get_kafka_topic_name, get_run_engine, run_plan
14+
from ibex_bluesky_core.run_engine import _DuringTask, get_run_engine, run_plan
1615
from ibex_bluesky_core.version import version
1716

1817

@@ -147,14 +146,3 @@ def plan():
147146
result = run_plan(plan())
148147
assert result.plan_result == "happy_path_result"
149148
assert result.exit_status == "success"
150-
151-
152-
def test_get_kafka_topic_name():
153-
with mock.patch("ibex_bluesky_core.run_engine.os.environ.get", return_value="FOO"):
154-
assert get_kafka_topic_name() == "FOO_bluesky"
155-
156-
with mock.patch("ibex_bluesky_core.run_engine.os.environ.get", return_value="NDXBAR"):
157-
assert get_kafka_topic_name() == "BAR_bluesky"
158-
159-
with mock.patch("ibex_bluesky_core.run_engine.os.environ.get", return_value="NDHBAZ"):
160-
assert get_kafka_topic_name() == "BAZ_bluesky"

0 commit comments

Comments
 (0)