Skip to content

Commit 00268d8

Browse files
committed
create kafka topics on developer update, first time install and normal deploy
1 parent 3d048bd commit 00268d8

File tree

3 files changed

+9
-4
lines changed

3 files changed

+9
-4
lines changed

installation_and_upgrade/ibex_install_utils/install_tasks.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,8 @@ def run_instrument_deploy_main(self) -> None:
254254
self._python_tasks.update_script_definitions()
255255
self._python_tasks.remove_instrument_script_githooks()
256256
self._server_tasks.setup_log_rotation()
257+
self._system_tasks.update_kafka_topics()
258+
257259

258260
def run_instrument_deploy_pre_stop(self) -> None:
259261
"""Upgrade an instrument. Steps to do before ibex is stopped.
@@ -293,6 +295,7 @@ def run_developer_update(self) -> None:
293295
self._mysql_tasks.install_mysql(force=False)
294296
self._system_tasks.check_java_installation()
295297
self._system_tasks.install_or_upgrade_git()
298+
self._system_tasks.update_kafka_topics()
296299

297300
def run_vhd_creation(self) -> None:
298301
"""Automated job which creates a set of VHDs containing all IBEX components.

installation_and_upgrade/ibex_install_utils/kafka_utils.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
from kafka import KafkaConsumer
22
from kafka.admin import KafkaAdminClient, NewTopic
33

4-
REQUIRED_SUFFIXES = ["_events", "_sampleEnv", "_runInfo", "_epicsForwarderConfig", "_detSpecMap"]
4+
REQUIRED_SUFFIXES = ["_events", "_sampleEnv", "_runInfo", "_forwarderConfig", "_detSpecMap",
5+
"_forwarderStorage", "_forwarderStatus", "_runLog", "_areaDetector",
6+
"_monitorHistograms"]
57

68

7-
def get_existing_topics(kafka_broker):
9+
def _get_existing_topics(kafka_broker):
810
return KafkaConsumer(bootstrap_servers=[kafka_broker]).topics()
911

1012

@@ -18,7 +20,7 @@ def add_required_topics(kafka_broker, instrument):
1820
"""
1921
required_topics = set(instrument + suffix for suffix in REQUIRED_SUFFIXES)
2022
existing_topics = set(
21-
filter(lambda topic: topic.startswith(instrument), get_existing_topics(kafka_broker))
23+
filter(lambda topic: topic.startswith(instrument), _get_existing_topics(kafka_broker))
2224
)
2325

2426
if required_topics != existing_topics:

installation_and_upgrade/ibex_install_utils/tasks/system_tasks.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ def update_kafka_topics(self) -> None:
189189
"""
190190
Adds the required kafka topics to the cluster.
191191
"""
192-
add_required_topics("livedata.isis.cclrc.ac.uk:9092", self._get_instrument_name())
192+
add_required_topics("livedata.isis.cclrc.ac.uk:31092", self._get_instrument_name())
193193

194194
@task("Add Nagios checks")
195195
def add_nagios_checks(self) -> None:

0 commit comments

Comments
 (0)