Skip to content
This repository was archived by the owner on Nov 13, 2019. It is now read-only.

Commit 423ae40

Browse files
authored
Merge pull request #8 from sjwoodman/demo-prep
Demo prep
2 parents 4db5677 + 9bd57ce commit 423ae40

File tree

5 files changed

+90
-68
lines changed

5 files changed

+90
-68
lines changed

cleanup.sh

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
#!/usr/bin/env bash
2+
3+
oc delete dc -l app=${1}
4+
oc delete cm -l streamzi.io/kind=ev
5+
oc delete cm -l strimzi.io/kind=topic

deploy-all.sh

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,14 @@
11
#!/usr/bin/env bash
22

33
mvn -f ./pom.xml clean install
4-
./manager/build.sh;
5-
./operations/log-data/build.sh;
6-
./operations/nodejs/filter-events/build.sh;
7-
./operations/random-data/build.sh;
4+
cd manager
5+
./build.sh;
6+
cd ../operations/random-data/
7+
./build.sh
8+
cd ../log-data/
9+
./build.sh
10+
cd ../nodejs/filter-events/
11+
./build.sh
12+
13+
cd ../../
14+

manager/src/main/java/io/streamzi/openshift/API.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public class API {
3131
@EJB(beanInterface = ClientContainer.class)
3232
private ClientContainer container;
3333

34-
private final String bootstrapServersDefault = "my-cluster-kafka";
34+
private final String bootstrapServersDefault = "my-cluster-kafka:9092";
3535

3636
@GET
3737
@Path("/pods")

manager/src/main/java/io/streamzi/openshift/ProcessorFlowDeployer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ private void populateTopicMaps(Map<String, DeploymentConfig> deploymentConfigs)
128128
final ConfigMap cm = new ConfigMap();
129129
String topicName;
130130

131+
//todo: deal with unconnected outputs - should still have a topic created
131132
for (ProcessorLink link : flow.getLinks()) {
132133
logger.info("Processing link");
133134
topicName = link.getSource().getParent().getUuid() + "-" + link.getSource().getName();

operations/nodejs/filter-events/main.js

Lines changed: 72 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -3,70 +3,79 @@
33
* To change this template file, choose Tools | Templates
44
* and open the template in the editor.
55
*/
6-
console.log("Starting KAFKA Processor in 10 seconds");
7-
setTimeout(function () {
8-
var kafkaHost = process.env.STREAMZI_KAFKA_BOOTSTRAP_SERVER;
9-
var nodeUuid = process.env.STREAMZI_NODE_UUID;
10-
var sourceTopic = process.env.INPUT_DATA;
11-
var targetTopic = process.env.OUTPUT_DATA;
12-
var threshold = process.env.THRESHOLD;
13-
14-
if(threshold === undefined){
15-
threshold = 0.5
16-
}
176

18-
console.log("Host: " + kafkaHost);
19-
console.log("Input topic: " + sourceTopic);
20-
console.log("Output topic: " + targetTopic);
21-
console.log("Threshold: " + threshold);
7+
var kafkaHost = process.env.STREAMZI_KAFKA_BOOTSTRAP_SERVER;
8+
var nodeUuid = process.env.STREAMZI_NODE_UUID;
9+
var sourceTopic = process.env.INPUT_DATA;
10+
var targetTopic = process.env.OUTPUT_DATA;
11+
var threshold = process.env.THRESHOLD;
12+
13+
if(threshold === undefined){
14+
threshold = 0.5
15+
}
16+
17+
console.log("Host: " + kafkaHost);
18+
console.log("Input topic: " + sourceTopic);
19+
console.log("Output topic: " + targetTopic);
20+
console.log("Threshold: " + threshold);
21+
22+
var kafka = require("kafka-node");
23+
var Producer = kafka.Producer;
24+
var Consumer = kafka.Consumer;
25+
const client = new kafka.KafkaClient({
26+
kafkaHost: kafkaHost
27+
});
28+
29+
30+
const producer = new Producer(client);
31+
producer.on("ready", function(){
32+
console.log("Producer ready");
33+
});
34+
35+
producer.on("error", function(err){
36+
console.log("Producer error: " + err);
37+
});
2238

23-
var kafka = require("kafka-node");
24-
var Producer = kafka.Producer;
25-
var Consumer = kafka.Consumer;
26-
const client = new kafka.KafkaClient({
27-
kafkaHost: kafkaHost
28-
});
29-
30-
31-
const producer = new Producer(client);
32-
producer.on("ready", function(){
33-
console.log("Producer ready");
34-
});
35-
36-
producer.on("error", function(err){
37-
console.log("Producer error: " + err);
38-
});
39-
40-
41-
42-
const consumer = new Consumer(
43-
client,
44-
[
45-
{
46-
topic: sourceTopic
47-
}
48-
], {
49-
autoCommit: true,
50-
groupId: nodeUuid
51-
});
5239

53-
consumer.on("message", function (message) {
54-
console.log("y");
55-
var value = message.value;
56-
var cloudEvent = JSON.parse(value);
57-
var numberValue = cloudEvent.data.value;
58-
console.log(numberValue);
59-
60-
if(numberValue>threshold){
61-
62-
var payloads = [
63-
{
64-
topic: targetTopic,
65-
messages: message.value
66-
}
67-
];
68-
producer.send(payloads, function(err, data){
69-
});
70-
}
40+
41+
const consumer = new Consumer(
42+
client,
43+
[
44+
{
45+
topic: sourceTopic,
46+
groupId: nodeUuid
47+
}
48+
], {
49+
autoCommit: true,
50+
groupId: nodeUuid
51+
});
52+
53+
consumer.on("ready", function(){
54+
console.log("Consumer ready");
55+
});
56+
57+
consumer.on("error", function(err){
58+
console.log("Consumer error: " + err);
7159
});
72-
}, 10000);
60+
61+
62+
consumer.on("message", function (message) {
63+
64+
var value = message.value;
65+
var cloudEvent = JSON.parse(value);
66+
var numberValue = cloudEvent.data.value;
67+
console.log(numberValue);
68+
69+
if(numberValue>threshold){
70+
71+
var payloads = [
72+
{
73+
topic: targetTopic,
74+
messages: message.value
75+
}
76+
];
77+
producer.send(payloads, function(err, data){
78+
});
79+
}
80+
});
81+

0 commit comments

Comments
 (0)