Skip to content

Commit 20894d7

Browse files
authored
Kafka topic creation with partitions for kafka sources, sinks and transport (#25)
* Add kafka topic creation with partitions. Add environment preparation. * Enable for operator mode. * Add topic partitioning for kafka transport. * Add example for Kafka event streaming from HTTP source. * Clean-up code. Rename example. * Add test for transport mode when preparing kafka topics. * Add kafka event streaming example with sink.
1 parent 4dd4ff6 commit 20894d7

File tree

10 files changed

+383
-9
lines changed

10 files changed

+383
-9
lines changed
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
#
2+
# Copyright IBM Corporation 2021
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
17+
import ray
18+
import rayvens
19+
import sys
20+
21+
# Send message to Slack sink using the kafka transport.
22+
23+
# Command line arguments and validation:
24+
if len(sys.argv) < 4:
25+
print(f'usage: {sys.argv[0]} <brokers> <password> <slack_channel>'
26+
'<slack_webhook> <run_mode> OR'
27+
f' {sys.argv[0]} <slack_channel> <slack_webhook> <run_mode>')
28+
sys.exit(1)
29+
30+
# Brokers and run mode:
31+
brokers = None
32+
password = None
33+
slack_channel = sys.argv[1]
34+
slack_webhook = sys.argv[2]
35+
run_mode = sys.argv[3]
36+
if len(sys.argv) == 6:
37+
brokers = sys.argv[1]
38+
password = sys.argv[2]
39+
slack_channel = sys.argv[3]
40+
slack_webhook = sys.argv[4]
41+
run_mode = sys.argv[5]
42+
43+
if run_mode not in ['local', 'mixed', 'operator']:
44+
raise RuntimeError(f'Invalid run mode provided: {run_mode}')
45+
46+
# The Kafka topic used for communication.
47+
topic = "externalTopicSink"
48+
49+
# Initialize ray either on the cluster or locally otherwise.
50+
if run_mode == 'operator':
51+
ray.init(address='auto')
52+
else:
53+
ray.init()
54+
55+
# Start rayvens in operator mode."
56+
rayvens.init(mode=run_mode, transport="kafka")
57+
58+
# Create stream.
59+
stream = rayvens.Stream('slack')
60+
61+
# Event sink config.
62+
sink_config = dict(kind='slack-sink',
63+
channel=slack_channel,
64+
webhookUrl=slack_webhook,
65+
kafka_transport_topic=topic,
66+
kafka_transport_partitions=3)
67+
68+
# Add sink to stream.
69+
sink = stream.add_sink(sink_config)
70+
71+
# Sends message to all sinks attached to this stream.
72+
stream << f'Message to Slack sink in run mode {run_mode} and Kafka transport.'
73+
74+
# Disconnect any sources or sinks attached to the stream 2 seconds after
75+
# the stream is idle (i.e. no events were propagated by the stream).
76+
stream.disconnect_all(after_idle_for=2)
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
#
2+
# Copyright IBM Corporation 2021
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
17+
import ray
18+
import rayvens
19+
import sys
20+
21+
# Event streaming from a third-party external source using Kafka.
22+
23+
# Command line arguments and validation:
24+
if len(sys.argv) < 2:
25+
print(f'usage: {sys.argv[0]} <brokers> <password> <run_mode> OR'
26+
f' {sys.argv[0]} <run_mode>')
27+
sys.exit(1)
28+
29+
# Brokers and run mode:
30+
brokers = None
31+
password = None
32+
run_mode = sys.argv[1]
33+
if len(sys.argv) == 4:
34+
brokers = sys.argv[1]
35+
password = sys.argv[2]
36+
run_mode = sys.argv[3]
37+
38+
if run_mode not in ['local', 'mixed', 'operator']:
39+
raise RuntimeError(f'Invalid run mode provided: {run_mode}')
40+
41+
# The Kafka topic used for communication.
42+
topic = "externalTopicSource"
43+
44+
# Initialize ray either on the cluster or locally otherwise.
45+
if run_mode == 'operator':
46+
ray.init(address='auto')
47+
else:
48+
ray.init()
49+
50+
# Start rayvens in operator mode."
51+
rayvens.init(mode=run_mode, transport="kafka")
52+
53+
# Create stream.
54+
stream = rayvens.Stream('http')
55+
56+
# Event source config.
57+
source_config = dict(
58+
kind='http-source',
59+
url='https://query1.finance.yahoo.com/v7/finance/quote?symbols=AAPL',
60+
route='/from-http',
61+
period=3000,
62+
kafka_transport_topic=topic,
63+
kafka_transport_partitions=3)
64+
65+
# Attach source to stream.
66+
source = stream.add_source(source_config)
67+
68+
# Log all events from stream-attached sources.
69+
stream >> (lambda event: print('LOG:', event))
70+
71+
# Disconnect source after 10 seconds.
72+
stream.disconnect_all(after=10)
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
#
2+
# Copyright IBM Corporation 2021
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
17+
import ray
18+
import rayvens
19+
import sys
20+
import time
21+
22+
# An artificial example of using Kafka sources and sink.
23+
# Typically the user application will interact with an external Kafka
24+
# service to either subscribe or publish data to other services:
25+
#
26+
# EXT. SERVICE => KAFKA => RAYVENS KAFKA SOURCE
27+
# or
28+
# RAYVENS KAFKA SINK => KAFKA => EXT. SERVICE
29+
#
30+
# In this example we put together an artificial example where, to
31+
# demonstrate both Kafka sources and sinks at the same time we
32+
# set a Kafka sink to publish to a test topic then have a Kafka
33+
# source read from that test topic:
34+
#
35+
# RAYVENS KAFKA SINK => KAFKA => RAYVENS KAFKA SOURCE
36+
#
37+
38+
# Command line arguments and validation:
39+
if len(sys.argv) < 2:
40+
print(f'usage: {sys.argv[0]} <brokers> <password> <run_mode> OR'
41+
f' {sys.argv[0]} <run_mode>')
42+
sys.exit(1)
43+
44+
# Brokers and run mode:
45+
brokers = None
46+
password = None
47+
run_mode = sys.argv[1]
48+
if len(sys.argv) == 4:
49+
brokers = sys.argv[1]
50+
password = sys.argv[2]
51+
run_mode = sys.argv[3]
52+
53+
if run_mode not in ['local', 'mixed', 'operator']:
54+
raise RuntimeError(f'Invalid run mode provided: {run_mode}')
55+
56+
# The Kafka topic used for communication.
57+
topic = "externalTopic"
58+
59+
# If using the Kafka broker started by Rayvens the following brokers
60+
# are possible:
61+
# - from inside the cluster: kafka:9092
62+
# - from outside the cluster: localhost:31093
63+
# If using a different Kafka service please provide the brokers in the
64+
# form of host:port,host1:port1, ... .
65+
if brokers is None:
66+
brokers = 'localhost:31093'
67+
if run_mode == 'operator':
68+
brokers = "kafka:9092"
69+
70+
# Initialize ray either on the cluster or locally otherwise.
71+
if run_mode == 'operator':
72+
ray.init(address='auto')
73+
else:
74+
ray.init()
75+
76+
# Start rayvens in operator mode.
77+
rayvens.init(mode=run_mode)
78+
79+
# Create source stream and configuration.
80+
source_stream = rayvens.Stream('kafka-source-stream')
81+
82+
source_config = dict(kind='kafka-source',
83+
route='/fromkafka',
84+
topic=topic,
85+
brokers=brokers,
86+
partitions=3)
87+
if password is not None:
88+
source_config['SASL_password'] = password
89+
source = source_stream.add_source(source_config)
90+
# Log all events from stream-attached sources.
91+
source_stream >> (lambda event: print('KAFKA SOURCE:', event))
92+
93+
# Create sink stream and configuration.
94+
sink_stream = rayvens.Stream('kafka-sink-stream')
95+
sink_config = dict(kind='kafka-sink',
96+
route='/tokafka',
97+
topic=topic,
98+
brokers=brokers,
99+
partitions=3)
100+
if password is not None:
101+
sink_config['SASL_password'] = password
102+
sink = sink_stream.add_sink(sink_config)
103+
104+
time.sleep(10)
105+
106+
# Sends message to all sinks attached to this stream.
107+
sink_stream << f'Sending message to Kafka sink in run mode {run_mode}.'
108+
109+
# Give a grace period to the message to propagate then disconnect source
110+
# and sink.
111+
time.sleep(30)
112+
source_stream.disconnect_all()
113+
sink_stream.disconnect_all()

rayvens/core/common.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -147,15 +147,15 @@ def produce(self, data):
147147
self.producer.produce(self.name, data.encode('utf-8'))
148148

149149

150-
def kafka_send_to(integration_name, handle):
150+
def kafka_send_to(kafka_transport_topic, handle):
151151
# use kafka consumer thread to push from camel source to rayvens stream
152152
consumer = Consumer({
153153
'bootstrap.servers': brokers(),
154154
'group.id': 'ray',
155155
'auto.offset.reset': 'latest'
156156
})
157157

158-
consumer.subscribe([integration_name])
158+
consumer.subscribe([kafka_transport_topic])
159159

160160
def append():
161161
while True:
@@ -168,9 +168,9 @@ def append():
168168
threading.Thread(target=append).start()
169169

170170

171-
def kafka_recv_from(integration_name, handle):
171+
def kafka_recv_from(integration_name, kafka_transport_topic, handle):
172172
# use kafka producer actor to push from rayvens stream to camel sink
173-
helper = KafkaProducerActor.remote(integration_name)
173+
helper = KafkaProducerActor.remote(kafka_transport_topic)
174174
handle.send_to.remote(helper, integration_name)
175175

176176

rayvens/core/integration.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
# limitations under the License.
1515
#
1616

17-
from rayvens.core.utils import random_port
17+
from rayvens.core.common import brokers
18+
from rayvens.core.utils import random_port, create_partitioned_topic
1819
from rayvens.core.name import name_integration
1920
from rayvens.core import catalog
2021
from rayvens.core import kamel
@@ -39,10 +40,17 @@ def __init__(self, stream_name, source_sink_name, config):
3940

4041
self.integration_name = name_integration(self.stream_name,
4142
self.source_sink_name)
43+
44+
# Establish kafka transport topic name:
45+
self.kafka_transport_topic = self.integration_name
46+
if "kafka_transport_topic" in config:
47+
self.kafka_transport_topic = config["kafka_transport_topic"]
48+
4249
self.port = random_port()
4350
self.invocation = None
4451
self.service_name = None
4552
self.server_address = None
53+
self.environment_preparators = []
4654

4755
def invoke_local_run(self, mode, integration_content):
4856
self.invocation = kamel.local_run(
@@ -107,6 +115,31 @@ def accepts_data_type(self, data):
107115
return True
108116
return False
109117

118+
# Method that checks if, based on the configuration, the integration
119+
# requires something to be run or created before the integration is run.
120+
def prepare_environment(self, mode):
121+
# Create a multi-partition topic for a kafka source/sink.
122+
if (self.config['kind'] == 'kafka-source' or
123+
self.config['kind'] == 'kafka-sink') and \
124+
'partitions' in self.config and self.config['partitions'] > 1:
125+
topic = self.config['topic']
126+
partitions = self.config['partitions']
127+
kafka_brokers = self.config['brokers']
128+
129+
# Create topic
130+
create_partitioned_topic(topic, partitions, kafka_brokers)
131+
132+
# Create a multi-partition topic for the Kafka transport of a
133+
# source/sink.
134+
if mode.transport == "kafka" and \
135+
'kafka_transport_partitions' in self.config and \
136+
self.config['kafka_transport_partitions'] > 1:
137+
partitions = self.config['kafka_transport_partitions']
138+
139+
# Create topic
140+
create_partitioned_topic(self.kafka_transport_topic, partitions,
141+
brokers())
142+
110143
def route(self, default=None):
111144
if 'route' in self.config and self.config['route'] is not None:
112145
return self.config['route']

0 commit comments

Comments
 (0)