Skip to content

Commit fafe946

Browse files
committed
[Components] kafka - new source and action components
1 parent ea8eb9d commit fafe946

File tree

9 files changed

+425
-21
lines changed

9 files changed

+425
-21
lines changed
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
import app from "../../kafka.app.mjs";
2+
3+
export default {
4+
key: "kafka-create-topic",
5+
name: "Create Topic",
6+
description: "Create a new Kafka topic by specifying the topic name, number of partitions, and replication factor. [See the documentation](https://github.com/tulios/kafkajs).",
7+
version: "0.0.1",
8+
type: "action",
9+
props: {
10+
app,
11+
topic: {
12+
type: "string",
13+
label: "Topic Name",
14+
description: "Name of the topic to create.",
15+
},
16+
numPartitions: {
17+
type: "integer",
18+
label: "Number Of Partitions",
19+
description: "The number of partitions for the topic.",
20+
optional: true,
21+
},
22+
replicationFactor: {
23+
type: "integer",
24+
label: "Replication Factor",
25+
description: "This is the number of replicas for each partition in the topic. Remember that the replication factor cannot be larger than the number of brokers in the Kafka cluster.",
26+
optional: true,
27+
},
28+
cleanupPolicy: {
29+
type: "string",
30+
label: "Cleanup Policy",
31+
description: "The cleanup policy for the app topic.",
32+
optional: true,
33+
options: [
34+
"delete",
35+
"compact",
36+
],
37+
},
38+
retentionTime: {
39+
type: "integer",
40+
label: "Retention Time",
41+
description: "The number of milli seconds to keep the local log segment before it gets deleted.",
42+
optional: true,
43+
},
44+
},
45+
async run({ $ }) {
46+
const {
47+
app,
48+
topic,
49+
numPartitions,
50+
replicationFactor,
51+
cleanupPolicy,
52+
retentionTime,
53+
} = this;
54+
55+
const configEntries = [];
56+
57+
if (cleanupPolicy) {
58+
configEntries.push({
59+
name: "cleanup.policy",
60+
value: cleanupPolicy,
61+
});
62+
}
63+
64+
if (retentionTime) {
65+
configEntries.push({
66+
name: "retention.ms",
67+
value: String(retentionTime),
68+
});
69+
}
70+
71+
const success = await app.createTopics({
72+
topics: [
73+
{
74+
topic,
75+
numPartitions,
76+
replicationFactor,
77+
configEntries,
78+
},
79+
],
80+
});
81+
$.export("$summary", "Successfully created topic.");
82+
return {
83+
success,
84+
};
85+
},
86+
};
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import app from "../../kafka.app.mjs";
2+
3+
export default {
4+
key: "kafka-delete-topic",
5+
name: "Delete Topic",
6+
description: "Deletes a specified Kafka topic. Requires the topic name as input. [See the documentation](https://github.com/tulios/kafkajs).",
7+
version: "0.0.1",
8+
type: "action",
9+
props: {
10+
app,
11+
topic: {
12+
description: "The Kafka topic to delete.",
13+
propDefinition: [
14+
app,
15+
"topic",
16+
],
17+
},
18+
},
19+
async run({ $ }) {
20+
const {
21+
app,
22+
topic,
23+
} = this;
24+
25+
await app.deleteTopics({
26+
topics: [
27+
topic,
28+
],
29+
});
30+
31+
$.export("$summary", "Successfully deleted topic.");
32+
return {
33+
success: true,
34+
};
35+
},
36+
};
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
import app from "../../kafka.app.mjs";
2+
3+
export default {
4+
key: "kafka-publish-message",
5+
name: "Publish Message",
6+
description: "Sends a message to a specified Kafka topic. Requires specifying the topic, message key, and value. Optional properties include headers and partition. [See the documentation](https://github.com/tulios/kafkajs).",
7+
version: "0.0.1",
8+
type: "action",
9+
props: {
10+
app,
11+
topic: {
12+
description: "The topic to send the message to.",
13+
propDefinition: [
14+
app,
15+
"topic",
16+
],
17+
},
18+
messageKey: {
19+
type: "string",
20+
label: "Message Key",
21+
description: "Key of the message.",
22+
optional: true,
23+
},
24+
messageValue: {
25+
type: "string",
26+
label: "Message Value",
27+
description: "Value of the message.",
28+
},
29+
partition: {
30+
type: "integer",
31+
label: "Partition",
32+
description: "The specific partition to send the message to, optional.",
33+
optional: true,
34+
},
35+
headers: {
36+
type: "object",
37+
label: "Headers",
38+
description: "Optional headers you want to send along with the message.",
39+
optional: true,
40+
},
41+
},
42+
async run({ $ }) {
43+
const {
44+
app,
45+
topic,
46+
messageKey,
47+
messageValue,
48+
partition,
49+
headers,
50+
} = this;
51+
52+
const response = await app.sendMessages({
53+
topic,
54+
messages: [
55+
{
56+
key: messageKey,
57+
value: messageValue,
58+
partition,
59+
headers,
60+
},
61+
],
62+
});
63+
64+
if (!response?.length) {
65+
throw new Error("Failed to publish message. Please see the kafka app logs for more information.");
66+
}
67+
68+
$.export("$summary", "Successfully published message.");
69+
return response;
70+
},
71+
};
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
const API = {
2+
ADMIN: "admin",
3+
PRODUCER: "producer",
4+
CONSUMER: "consumer",
5+
};
6+
7+
export default {
8+
API,
9+
};

components/kafka/kafka.app.mjs

Lines changed: 74 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,81 @@
1+
import { Kafka } from "kafkajs";
2+
import constants from "./common/constants.mjs";
3+
14
export default {
25
type: "app",
36
app: "kafka",
4-
propDefinitions: {},
7+
propDefinitions: {
8+
topic: {
9+
type: "string",
10+
label: "Topic",
11+
description: "The topic to interact with.",
12+
options() {
13+
return this.listTopics();
14+
},
15+
},
16+
},
517
methods: {
6-
// this.$auth contains connected account data
7-
authKeys() {
8-
console.log(Object.keys(this.$auth));
18+
getBrokers() {
19+
const {
20+
host,
21+
port,
22+
} = this.$auth;
23+
return [
24+
`${host}:${port}`,
25+
];
26+
},
27+
getClient() {
28+
return new Kafka({
29+
clientId: "Pipedream",
30+
brokers: this.getBrokers(),
31+
});
32+
},
33+
getApiClient(api, config) {
34+
return this.getClient()[api](config);
35+
},
36+
async withApi(fn, api = constants.API.ADMIN, config) {
37+
const apiClient = this.getApiClient(api, config);
38+
await apiClient.connect();
39+
try {
40+
return await fn(apiClient);
41+
} finally {
42+
await apiClient.disconnect();
43+
}
44+
},
45+
listTopics() {
46+
return this.withApi((admin) => admin.listTopics());
47+
},
48+
listGroups() {
49+
return this.withApi((admin) => admin.listGroups());
50+
},
51+
createTopics(args = {}) {
52+
return this.withApi((admin) => admin.createTopics(args));
53+
},
54+
deleteTopics(args = {}) {
55+
return this.withApi((admin) => admin.deleteTopics(args));
56+
},
57+
deleteGroups(args = {}) {
58+
return this.withApi((admin) => admin.deleteGroups(args));
59+
},
60+
sendMessages(args = {}) {
61+
return this.withApi((producer) => producer.send(args), constants.API.PRODUCER);
62+
},
63+
async messageListener({
64+
topic, fromBeginning = true, onMessage, groupId,
65+
} = {}) {
66+
const config = {
67+
groupId,
68+
};
69+
const consumer = this.getApiClient(constants.API.CONSUMER, config);
70+
await consumer.connect();
71+
await consumer.subscribe({
72+
topic,
73+
fromBeginning,
74+
});
75+
await consumer.run({
76+
eachMessage: onMessage,
77+
});
78+
return consumer;
979
},
1080
},
1181
};

components/kafka/package.json

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@pipedream/kafka",
3-
"version": "0.0.1",
3+
"version": "0.1.0",
44
"description": "Pipedream Kafka Components",
55
"main": "kafka.app.mjs",
66
"keywords": [
@@ -11,5 +11,8 @@
1111
"author": "Pipedream <[email protected]> (https://pipedream.com/)",
1212
"publishConfig": {
1313
"access": "public"
14+
},
15+
"dependencies": {
16+
"kafkajs": "^2.2.4"
1417
}
15-
}
18+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
import { DEFAULT_POLLING_SOURCE_TIMER_INTERVAL } from "@pipedream/platform";
2+
import app from "../../kafka.app.mjs";
3+
4+
export default {
5+
key: "kafka-new-message",
6+
name: "New Message",
7+
description: "Emit new event when a message is published to a Kafka topic using a timer. [See the documentation](https://github.com/tulios/kafkajs).",
8+
version: "0.0.1",
9+
type: "source",
10+
dedupe: "unique",
11+
props: {
12+
app,
13+
timer: {
14+
type: "$.interface.timer",
15+
default: {
16+
intervalSeconds: DEFAULT_POLLING_SOURCE_TIMER_INTERVAL,
17+
},
18+
},
19+
topic: {
20+
description: "The topic to listen for new messages on.",
21+
propDefinition: [
22+
app,
23+
"topic",
24+
],
25+
},
26+
},
27+
methods: {
28+
delay(consumer, ms = 1000) {
29+
return new Promise((resolve) => {
30+
setTimeout(async () => {
31+
await consumer.disconnect();
32+
console.log("Consumer disconnected!!!");
33+
resolve();
34+
}, ms);
35+
});
36+
},
37+
},
38+
async run() {
39+
console.log("Running ...");
40+
const {
41+
app,
42+
topic,
43+
delay,
44+
} = this;
45+
46+
const GROUP_ID = "pipedream-group";
47+
48+
await app.deleteGroups([
49+
GROUP_ID,
50+
]);
51+
52+
const consumer = await app.messageListener({
53+
topic,
54+
groupId: GROUP_ID,
55+
onMessage: (record) => {
56+
const { message } = record;
57+
this.$emit({
58+
...record,
59+
msgValue: message.value.toString(),
60+
msgKey: message.key?.toString(),
61+
}, {
62+
id: `${message.key}-${message.offset}-${message.timestamp}`,
63+
summary: `New Message ${message.timestamp}`,
64+
ts: Date.parse(message.timestamp),
65+
});
66+
},
67+
});
68+
69+
await delay(consumer);
70+
},
71+
};

0 commit comments

Comments
 (0)