Skip to content

Commit 3a9f395

Browse files
authored
Enable Kafka transport for operator mode (#49)
* Add kafka transport for operator mode. * Skip health check for Kafka transport. * Fix kafka transport example. * Do not perform readiness check for kafka transport.
1 parent c631fc4 commit 3a9f395

File tree

4 files changed

+96
-1
lines changed

4 files changed

+96
-1
lines changed

examples/kafka/kafka_event_streaming_source.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@
6161
kind='http-source',
6262
url='https://query1.finance.yahoo.com/v7/finance/quote?symbols=AAPL',
6363
route='/from-http',
64-
period=3000,
64+
period=1000,
6565
kafka_transport_topic=topic,
6666
kafka_transport_partitions=3)
6767

rayvens/api.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from rayvens.core.local import start as start_http
2222
from rayvens.core.kafka import start as start_kafka
2323
from rayvens.core.operator import start as start_operator_http
24+
from rayvens.core.operator_kafka import start as start_operator_kafka
2425
from rayvens.core.ray_serve import start as start_operator_ray_serve
2526
from rayvens.core.name import name_source, name_sink
2627
from rayvens.core.verify import verify_do
@@ -252,6 +253,8 @@ def init(mode=os.getenv('RAYVENS_MODE', 'auto'),
252253
elif mode in ['mixed', 'operator']:
253254
if transport in ['auto', 'http']:
254255
_global_camel = start_operator_http(mode, check_port)
256+
elif transport == 'kafka':
257+
_global_camel = start_operator_kafka(mode, check_port)
255258
elif transport in ['ray-serve']:
256259
_global_camel = start_operator_ray_serve(mode, check_port)
257260
else:

rayvens/core/common.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,10 @@ def await_start(mode, integration):
9696
else:
9797
print('Integration did not start correctly.')
9898

99+
# For kafka transport the health check cannot be performed.
100+
if mode.transport == 'kafka':
101+
return True
102+
99103
# Perform health check and wait for integration to be ready.
100104
healthy_integration = _wait_for_ready_integration(mode, integration)
101105

@@ -276,6 +280,12 @@ def brokers():
276280
return f'{host}:{port}'
277281

278282

283+
def operator_brokers():
284+
host = os.getenv('KAFKA_SERVICE_HOST', 'kafka')
285+
port = os.getenv('KAFKA_SERVICE_PORT', '9092')
286+
return f'{host}:{port}'
287+
288+
279289
def eval(f, data):
280290
if isinstance(f, ActorHandle):
281291
return f.append.remote(data)

rayvens/core/operator_kafka.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
#
2+
# Copyright IBM Corporation 2021
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
17+
from rayvens.core.common import get_run_mode, await_start, brokers
18+
from rayvens.core.common import kafka_send_to, kafka_recv_from
19+
from rayvens.core.catalog import construct_source, construct_sink
20+
from rayvens.core.integration import Integration
21+
22+
23+
def start(camel_mode, check_port):
24+
return Camel(get_run_mode(camel_mode, check_port))
25+
26+
27+
class Camel:
28+
def __init__(self, mode):
29+
self.mode = mode
30+
self.mode.transport = 'kafka'
31+
32+
def add_source(self, stream, source, source_name):
33+
# Construct integration
34+
integration = Integration(stream.name, source_name, source)
35+
36+
# Prepare env:
37+
integration.prepare_environment(self.mode)
38+
39+
# Determine the `to` endpoint value made up of a base address and
40+
# a custom route provided by the user. Use this to construct the
41+
# integration source code.
42+
integration_content = construct_source(
43+
source,
44+
f'kafka:{integration.kafka_transport_topic}?brokers={brokers()}')
45+
46+
# Start running the source integration.
47+
integration.invoke_run(self.mode, integration_content)
48+
49+
# Set up source for the HTTP connector case.
50+
kafka_send_to(integration.kafka_transport_topic,
51+
integration.kafka_transport_partitions, stream.actor)
52+
53+
if not await_start(self.mode, integration):
54+
raise RuntimeError('Could not start source')
55+
return integration
56+
57+
def add_sink(self, stream, sink, sink_name):
58+
# Construct integration
59+
integration = Integration(stream.name, sink_name, sink)
60+
61+
# Prepare env:
62+
integration.prepare_environment(self.mode)
63+
64+
# Get integration source code.
65+
integration_content = construct_sink(
66+
sink,
67+
f'kafka:{integration.kafka_transport_topic}?brokers={brokers()}')
68+
69+
# Start running the integration.
70+
integration.invoke_run(self.mode, integration_content)
71+
72+
kafka_recv_from(sink_name, integration.kafka_transport_topic,
73+
stream.actor)
74+
75+
# Wait for integration to finish.
76+
if not await_start(self.mode, integration):
77+
raise RuntimeError('Could not start sink')
78+
79+
return integration
80+
81+
def disconnect(self, integration):
82+
integration.disconnect(self.mode)

0 commit comments

Comments
 (0)