Skip to content

Commit da89f40

Browse files
authored
Add sink multitask example. (#64)
1 parent 6d68b65 commit da89f40

File tree

1 file changed

+90
-0
lines changed

1 file changed

+90
-0
lines changed
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
#
2+
# Copyright IBM Corporation 2021
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
17+
import ray
18+
import rayvens
19+
import sys
20+
import time
21+
22+
# This example demonstrates how to send objects to the AWS S3 or
23+
# IBM Cloud Object Storage.
24+
25+
# Parse command-line arguments
26+
if len(sys.argv) < 5:
27+
print(f'usage: {sys.argv[0]} <bucket> <access_key_id> <secret_access_key>'
28+
'<endpoint> [<region>]')
29+
sys.exit(1)
30+
bucket = sys.argv[1]
31+
access_key_id = sys.argv[2]
32+
secret_access_key = sys.argv[3]
33+
endpoint = sys.argv[4]
34+
region = None
35+
if len(sys.argv) == 6:
36+
region = sys.argv[5]
37+
38+
# Initialize Ray and Rayvens
39+
ray.init()
40+
41+
# TODO: make header setting work for Kafka transport. Currently the
42+
# Camel-K component for Kafka does not propagate message headers. This
43+
# will be fixed by Camel-K 1.8.0 release.
44+
# rayvens.init(transport="kafka")
45+
rayvens.init()
46+
47+
# Create an object stream
48+
stream = rayvens.Stream('upload-file')
49+
50+
# Configure the sink
51+
sink_config = dict(kind='cloud-object-storage-sink',
52+
bucket_name=bucket,
53+
access_key_id=access_key_id,
54+
secret_access_key=secret_access_key,
55+
endpoint=endpoint)
56+
57+
if region is not None:
58+
sink_config['region'] = region
59+
60+
# Run the sink
61+
sink = stream.add_sink(sink_config)
62+
63+
64+
# Operator sub-task:
65+
@ray.remote
66+
def sub_task(context, intermediate_data):
67+
contents = "sub-task " + intermediate_data
68+
print(contents)
69+
sub_task_outgoing_event = rayvens.OutputEvent(
70+
contents,
71+
{"CamelAwsS3Key": "custom_file_" + intermediate_data + ".json"})
72+
context.publish(sub_task_outgoing_event)
73+
74+
75+
# Operator task:
76+
@ray.remote
77+
def multi_part_task(context, incoming_event):
78+
print("multi-part-task:", incoming_event)
79+
for i in range(3):
80+
sub_task.remote(context, str(i))
81+
82+
83+
# Add multi-task operator to stream.
84+
stream.add_multitask_operator(multi_part_task)
85+
86+
# Send file contents to Cloud Object Storage:
87+
stream << "Random Event"
88+
89+
# Run for a while
90+
time.sleep(10)

0 commit comments

Comments
 (0)