Skip to content

Commit b50f576

Browse files
authored
Use JSON to encode incoming Cloud Object Storage data. (#18)
1 parent 017c2c0 commit b50f576

File tree

9 files changed

+140
-27
lines changed

9 files changed

+140
-27
lines changed

examples/cloud_object_storage/cos_sink.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
sink_config['region'] = region
5555

5656
# Run the sink
57-
source = stream.add_sink(sink_config)
57+
sink = stream.add_sink(sink_config)
5858

5959
# Send file contents to Cloud Object Storage:
6060
stream << "File contents sample!"

examples/cloud_object_storage/cos_source.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import ray
1818
import rayvens
1919
import sys
20-
import time
20+
import json
2121

2222
# This example demonstrates how to receive objects from AWS S3 or
2323
# IBM Cloud Object Storage. It requires a bucket name, HMAC credentials,
@@ -62,12 +62,14 @@
6262

6363
def process_file(event):
6464
print(f'received {len(event)} bytes')
65+
json_event = json.loads(event)
6566
print("Contents:")
66-
print(event)
67+
print(json_event['filename'])
68+
print(json_event['body'])
6769

6870

6971
# Log object sizes to the console
7072
stream >> process_file
7173

7274
# Run for a while
73-
time.sleep(10)
75+
stream.disconnect_all(after_idle_for=2)

examples/cloud_object_storage/cos_source_meta.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import ray
1818
import rayvens
1919
import sys
20-
import time
20+
import json
2121

2222
# This example demonstrates how to receive events about data becoming available
2323
# in Cloud Object Storage.
@@ -61,13 +61,13 @@
6161

6262
def process_file(event):
6363
print(f'received {len(event)} bytes')
64+
json_event = json.loads(event)
6465
print("Contents:")
65-
print(event)
66+
print(json_event['filename'])
6667

6768

6869
# Log object sizes to the console
6970
stream >> process_file
70-
print("Waiting for event...")
7171

7272
# Run for a while
73-
time.sleep(10)
73+
stream.disconnect_all(after_idle_for=2)

examples/cloud_object_storage/cos_source_move_after_read.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import ray
1818
import rayvens
1919
import sys
20-
import time
20+
import json
2121

2222
# This example demonstrates how to receive objects from AWS S3 or
2323
# IBM Cloud Object Storage. It requires a bucket name, HMAC credentials,
@@ -63,12 +63,14 @@
6363

6464
def process_file(event):
6565
print(f'received {len(event)} bytes')
66+
json_event = json.loads(event)
6667
print("Contents:")
67-
print(event)
68+
print(json_event['filename'])
69+
print(json_event['body'])
6870

6971

7072
# Log object sizes to the console
7173
stream >> process_file
7274

7375
# Run for a while
74-
time.sleep(10)
76+
stream.disconnect_all(after_idle_for=2)

rayvens/api.py

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -84,17 +84,25 @@ def _wait_for_timeout(self, after_idle_for, after):
8484
while True:
8585
time_elapsed_since_last_event = self._idle_time()
8686

87-
# Idle timeout exceeds the user-specified time limit:
88-
if time_elapsed_since_last_event > after_idle_for:
89-
break
90-
91-
# Check again after waiting for the rest of the timeout time:
92-
time.sleep(after_idle_for - time_elapsed_since_last_event + 1)
87+
if time_elapsed_since_last_event is not None:
88+
# Idle timeout exceeds the user-specified time limit:
89+
if time_elapsed_since_last_event > after_idle_for:
90+
break
91+
92+
# Check again after waiting for the rest of the timeout
93+
# time:
94+
time.sleep(after_idle_for - time_elapsed_since_last_event +
95+
1)
96+
else:
97+
time.sleep(after_idle_for)
9398
if after is not None and after > 0:
9499
time.sleep(after)
95100

96101
def _idle_time(self):
97-
return time.time() - ray.get(self.actor._get_latest_timestamp.remote())
102+
latest_timestamp = ray.get(self.actor._get_latest_timestamp.remote())
103+
if latest_timestamp is None:
104+
return None
105+
return time.time() - latest_timestamp
98106

99107

100108
@ray.remote(num_cpus=0)

rayvens/core/FileQueueJson.java

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Copyright IBM Corporation 2021
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import java.util.concurrent.BlockingQueue;
18+
import java.util.concurrent.LinkedBlockingQueue;
19+
import java.io.File;
20+
21+
import org.apache.camel.BindToRegistry;
22+
import org.apache.camel.Exchange;
23+
import org.apache.camel.Processor;
24+
import org.apache.camel.builder.RouteBuilder;
25+
import org.json.simple.JSONObject;
26+
27+
class Recv implements Processor {
28+
BlockingQueue<Object> queue;
29+
30+
public Recv(BlockingQueue<Object> queue) {
31+
this.queue = queue;
32+
}
33+
34+
public void process(Exchange exchange) throws Exception {
35+
JSONObject returnJsonObject = new JSONObject();
36+
String body = exchange.getIn().getBody(String.class);
37+
returnJsonObject.put("body", body);
38+
39+
Object key = exchange.getIn().getHeader("CamelAwsS3Key");
40+
returnJsonObject.put("filename", key.toString());
41+
queue.add(returnJsonObject.toString());
42+
}
43+
}
44+
45+
class Send implements Processor {
46+
BlockingQueue<Object> queue;
47+
48+
public Send(BlockingQueue<Object> queue) {
49+
this.queue = queue;
50+
}
51+
52+
public void process(Exchange exchange) throws Exception {
53+
Object body = queue.take();
54+
exchange.getIn().setBody(body);
55+
}
56+
}
57+
58+
public class FileQueueJson extends RouteBuilder {
59+
BlockingQueue<Object> queue = new LinkedBlockingQueue<Object>();
60+
61+
@BindToRegistry
62+
public Recv addToFileJsonQueue() {
63+
return new Recv(queue);
64+
}
65+
66+
@BindToRegistry
67+
public Send takeFromFileJsonQueue() {
68+
return new Send(queue);
69+
}
70+
71+
@Override
72+
public void configure() throws Exception {
73+
}
74+
}

rayvens/core/MetaEventQueue.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.camel.Exchange;
2525
import org.apache.camel.Processor;
2626
import org.apache.camel.builder.RouteBuilder;
27+
import org.json.simple.JSONObject;
2728

2829
class Recv implements Processor {
2930
BlockingQueue<Object> queue;
@@ -33,9 +34,10 @@ public Recv(BlockingQueue<Object> queue) {
3334
}
3435

3536
public void process(Exchange exchange) throws Exception {
37+
JSONObject returnJsonObject = new JSONObject();
3638
Object key = exchange.getIn().getHeader("CamelAwsS3Key");
37-
exchange.getIn().setBody("None");
38-
queue.add(key.toString());
39+
returnJsonObject.put("filename", key.toString());
40+
queue.add(returnJsonObject.toString());
3941
}
4042
}
4143

rayvens/core/catalog.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,9 @@ def construct_source(config, endpoint, inverted=False):
312312
'meta_event_only' in config and config['meta_event_only']:
313313
take_from_queue = 'takeFromMetaEventQueue'
314314
add_to_queue = 'addToMetaEventQueue'
315+
elif config['kind'] == 'cloud-object-storage-source':
316+
take_from_queue = 'takeFromFileJsonQueue'
317+
add_to_queue = 'addToFileJsonQueue'
315318

316319
# Multi-source integration with several routes:
317320
if isinstance(spec, list):
@@ -538,12 +541,6 @@ def cos_sink(config):
538541
# Create new file route for final spec:
539542
file_spec = {'steps': []}
540543
file_spec['steps'].append({'bean': 'processFile'})
541-
# file_spec['steps'].append({
542-
# 'set-header': {
543-
# 'name': 'CamelAwsS3Key',
544-
# 'simple': uploaded_file_name
545-
# }
546-
# })
547544
file_spec['steps'].append({'to': uri})
548545
spec_list.append((file_spec, from_uri))
549546
elif 'from_directory' in config:

rayvens/core/kamel.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,14 @@ def run(integration_content,
106106
queue = os.path.join(os.path.dirname(__file__), 'FileQueue.java')
107107
command.append(queue)
108108

109+
if _integration_requires_file_queue_json(integration_content):
110+
queue = os.path.join(os.path.dirname(__file__),
111+
'FileQueueJson.java')
112+
command.append(queue)
113+
114+
command.append("-d")
115+
command.append("mvn:com.googlecode.json-simple:json-simple:1.1.1")
116+
109117
if _integration_requires_file_watch_queue(integration_content):
110118
queue = os.path.join(os.path.dirname(__file__),
111119
'FileWatchQueue.java')
@@ -167,14 +175,23 @@ def local_run(integration_content,
167175
'ProcessFile.java')
168176
command.append(process_file)
169177

178+
# Use appropriate Queue type(s).
170179
if mode.transport == 'http':
171-
# Use appropriate Queue type(s).
172180
if integration_type == 'source':
173181
if _integration_requires_file_queue(integration_content):
174182
queue = os.path.join(os.path.dirname(__file__),
175183
'FileQueue.java')
176184
command.append(queue)
177185

186+
if _integration_requires_file_queue_json(integration_content):
187+
queue = os.path.join(os.path.dirname(__file__),
188+
'FileQueueJson.java')
189+
command.append(queue)
190+
191+
command.append("-d")
192+
command.append(
193+
"mvn:com.googlecode.json-simple:json-simple:1.1.1")
194+
178195
if _integration_requires_file_watch_queue(integration_content):
179196
queue = os.path.join(os.path.dirname(__file__),
180197
'FileWatchQueue.java')
@@ -185,6 +202,10 @@ def local_run(integration_content,
185202
'MetaEventQueue.java')
186203
command.append(queue)
187204

205+
command.append("-d")
206+
command.append(
207+
"mvn:com.googlecode.json-simple:json-simple:1.1.1")
208+
188209
if _integration_requires_queue(integration_content):
189210
queue = os.path.join(os.path.dirname(__file__), 'Queue.java')
190211
command.append(queue)
@@ -262,6 +283,13 @@ def _integration_requires_file_queue(integration_content):
262283
return False
263284

264285

286+
def _integration_requires_file_queue_json(integration_content):
287+
configuration = yaml.dump(integration_content)
288+
if 'bean: addToFileJsonQueue' in configuration:
289+
return True
290+
return False
291+
292+
265293
def _integration_requires_file_watch_queue(integration_content):
266294
configuration = yaml.dump(integration_content)
267295
if 'bean: addToFileWatchQueue' in configuration:

0 commit comments

Comments
 (0)