Skip to content

Commit 88f9ba5

Browse files
authored
Add examples with event triggered Ray datasets (#22)
* Fix file watch to use JSON. Add Dataset example. * Add event triggered dataset creation with actor. * Fix dataset example.
1 parent 7f30419 commit 88f9ba5

File tree

7 files changed

+227
-7
lines changed

7 files changed

+227
-7
lines changed
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
#
2+
# Copyright IBM Corporation 2021
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
17+
import ray
18+
import rayvens
19+
import sys
20+
import json
21+
import time
22+
23+
# This example demonstrates how to receive events.
24+
25+
# Parse command-line arguments
26+
if len(sys.argv) < 2:
27+
print(f'usage: {sys.argv[0]} <run_mode>')
28+
sys.exit(1)
29+
30+
# Check run mode:
31+
run_mode = sys.argv[1]
32+
if run_mode not in ['local', 'operator']:
33+
raise RuntimeError(f'Invalid run mode provided: {run_mode}')
34+
35+
# Initialize ray either on the cluster or locally otherwise.
36+
if run_mode == 'operator':
37+
ray.init(address='auto')
38+
else:
39+
ray.init()
40+
41+
# Start rayvens in the desired mode.
42+
rayvens.init(mode=run_mode)
43+
44+
# Create an object stream:
45+
stream = rayvens.Stream('bucket')
46+
47+
# Configure the source:
48+
source_config = dict(kind='file-watch-source',
49+
path='test_files',
50+
events='CREATE')
51+
52+
# Run the source:
53+
source = stream.add_source(source_config)
54+
55+
56+
@ray.remote
57+
class Filename:
58+
def __init__(self):
59+
self.filename = None
60+
61+
def set_filename(self, filename):
62+
self.filename = filename
63+
64+
def get_filename(self):
65+
return self.filename
66+
67+
68+
filename_obj = Filename.remote()
69+
70+
71+
def process_file(event, filename_obj):
72+
print(f'received {len(event)} bytes')
73+
json_event = json.loads(event)
74+
print("Contents:")
75+
print("Filename:", json_event['filename'])
76+
print("Event type:", json_event['event_type'])
77+
filename_obj.set_filename.remote(json_event['filename'])
78+
79+
# filename = json_event['filename']
80+
# WARNING: Cannot pickle Ray itself so we cannot read a
81+
# file using Datasets API in response to an event.
82+
# ds = ray.experimental.data.read_json([filename])
83+
# print(ds)
84+
85+
86+
# Process incoming file name.
87+
stream >> (lambda event: process_file(event, filename_obj))
88+
89+
# Create a data set and write the csv file using datasets.
90+
# TODO: Ray 1.5.1 uses pandas to write a CSV file so we avoid
91+
# using this method for writing CSV files.
92+
# test_ds = ray.experimental.data.range(100)
93+
# test_ds.write_csv("test_files/test.csv")
94+
95+
# Read JSON file to Ray dataset:
96+
timeout_counter = 100
97+
filename = ray.get(filename_obj.get_filename.remote())
98+
while filename is None and timeout_counter > 0:
99+
filename = ray.get(filename_obj.get_filename.remote())
100+
timeout_counter -= 1
101+
time.sleep(1)
102+
if filename is not None:
103+
ds = ray.experimental.data.read_json([filename])
104+
print(ds)
105+
print("Dataset constructed correctly")
106+
else:
107+
print("No file was received")
108+
109+
# Run while events are still being received then stop if not.
110+
stream.disconnect_all(after_idle_for=2)
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
#
2+
# Copyright IBM Corporation 2021
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
17+
import ray
18+
import rayvens
19+
import sys
20+
import json
21+
import time
22+
23+
# This example demonstrates how to receive events.
24+
25+
# Parse command-line arguments
26+
if len(sys.argv) < 2:
27+
print(f'usage: {sys.argv[0]} <run_mode>')
28+
sys.exit(1)
29+
30+
# Check run mode:
31+
run_mode = sys.argv[1]
32+
if run_mode not in ['local', 'operator']:
33+
raise RuntimeError(f'Invalid run mode provided: {run_mode}')
34+
35+
# Initialize ray either on the cluster or locally otherwise.
36+
if run_mode == 'operator':
37+
ray.init(address='auto')
38+
else:
39+
ray.init()
40+
41+
# Start rayvens in the desired mode.
42+
rayvens.init(mode=run_mode)
43+
44+
# Create an object stream:
45+
stream = rayvens.Stream('bucket')
46+
47+
# Configure the source:
48+
source_config = dict(kind='file-watch-source',
49+
path='test_files',
50+
events='CREATE')
51+
52+
# Run the source:
53+
source = stream.add_source(source_config)
54+
55+
56+
@ray.remote
57+
class Filename:
58+
def __init__(self):
59+
self.filename = None
60+
61+
def set_filename(self, event):
62+
print(f'received {len(event)} bytes')
63+
json_event = json.loads(event)
64+
print("Contents:")
65+
print("Filename:", json_event['filename'])
66+
print("Event type:", json_event['event_type'])
67+
self.filename = json_event['filename']
68+
69+
def get_filename(self):
70+
return self.filename
71+
72+
73+
# Instantiate actor.
74+
filename_actor = Filename.remote()
75+
76+
# Process incoming file name.
77+
stream >> filename_actor.set_filename
78+
79+
# Read JSON file to Ray dataset:
80+
timeout_counter = 100
81+
filename = None
82+
while filename is None and timeout_counter > 0:
83+
filename = ray.get(filename_actor.get_filename.remote())
84+
timeout_counter -= 1
85+
time.sleep(1)
86+
if filename is not None:
87+
ds = ray.experimental.data.read_json([filename])
88+
print(ds)
89+
print("Dataset constructed correctly")
90+
else:
91+
print("No file was received")
92+
93+
# Run while events are still being received then stop if not.
94+
stream.disconnect_all(after_idle_for=2)
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"greeting": "Hello!"}

examples/file/file_source.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import ray
1818
import rayvens
1919
import sys
20-
import time
2120

2221
# This example shows how a file is used to trigger an event to process
2322
# the file.
@@ -57,4 +56,4 @@ def process_file(event):
5756
stream >> process_file
5857

5958
# Run for a while.
60-
time.sleep(100)
59+
stream.disconnect_all(after_idle_for=2)

examples/file/file_watch_source.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import ray
1818
import rayvens
1919
import sys
20-
import time
20+
import json
2121

2222
# This example demonstrates how to receive events.
2323

@@ -54,12 +54,14 @@
5454

5555
def process_file(event):
5656
print(f'received {len(event)} bytes')
57+
json_event = json.loads(event)
5758
print("Contents:")
58-
print(event)
59+
print(json_event['filename'])
60+
print(json_event['event_type'])
5961

6062

6163
# Log object sizes to the console
6264
stream >> process_file
6365

6466
# Run for a while
65-
time.sleep(100)
67+
stream.disconnect_all(after_idle_for=2)

rayvens/core/FileWatchQueue.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.camel.Exchange;
2525
import org.apache.camel.Processor;
2626
import org.apache.camel.builder.RouteBuilder;
27+
import org.json.simple.JSONObject;
2728

2829
class Recv implements Processor {
2930
BlockingQueue<Object> queue;
@@ -33,10 +34,16 @@ public Recv(BlockingQueue<Object> queue) {
3334
}
3435

3536
public void process(Exchange exchange) throws Exception {
37+
JSONObject returnJsonObject = new JSONObject();
38+
39+
// Record event type:
3640
Object eventType = exchange.getIn().getHeader("CamelFileEventType");
41+
returnJsonObject.put("event_type", eventType.toString());
42+
43+
// Record event type:
3744
File file = exchange.getIn().getBody(File.class);
38-
// TODO: use Cloud event.
39-
queue.add(eventType.toString()+":file="+file.toString());
45+
returnJsonObject.put("filename", file.toString());
46+
queue.add(returnJsonObject.toString());
4047
}
4148
}
4249

rayvens/core/kamel.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,9 @@ def run(integration_content,
119119
'FileWatchQueue.java')
120120
command.append(queue)
121121

122+
command.append("-d")
123+
command.append("mvn:com.googlecode.json-simple:json-simple:1.1.1")
124+
122125
if _integration_requires_meta_event_queue(integration_content):
123126
queue = os.path.join(os.path.dirname(__file__),
124127
'MetaEventQueue.java')
@@ -197,6 +200,10 @@ def local_run(integration_content,
197200
'FileWatchQueue.java')
198201
command.append(queue)
199202

203+
command.append("-d")
204+
command.append(
205+
"mvn:com.googlecode.json-simple:json-simple:1.1.1")
206+
200207
if _integration_requires_meta_event_queue(integration_content):
201208
queue = os.path.join(os.path.dirname(__file__),
202209
'MetaEventQueue.java')

0 commit comments

Comments
 (0)