Skip to content

Commit a6f2137

Browse files
authored
adding 2 new js client demos (#275)
* adding 2 new js client demos * add seed producer --------- Co-authored-by: Cerchie <[email protected]>
1 parent 0fc7f21 commit a6f2137

File tree

12 files changed

+1018
-0
lines changed

12 files changed

+1018
-0
lines changed
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
# Use confluent-kafka-javascript to produce and consume records
2+
3+
This is a basic example of using the [confluent-kafka-javascript](https://github.com/confluentinc/confluent-kafka-javascript) client (early access as of when this was written, 4/29/24).
4+
5+
In this example, you'll produce database metadata records (made in the example with faker.js) to a Kafka topic and consume the records from the same topic.
6+
7+
![CC UI image](/graph.png)
8+
9+
## Step 1: Clone and install
10+
11+
`git clone https://github.com/Cerchie/basic-confluent-kafka-javascript-example.git`
12+
13+
`npm install`
14+
15+
## Step 2: Get set up in Confluent Cloud
16+
17+
Sign up for [Confluent Cloud](https://www.confluent.io/confluent-cloud).
18+
19+
### To add an environment:
20+
21+
- Open the Confluent Cloud Console and go to the Environments page at https://confluent.cloud/environments.
22+
- Click 'Add cloud environment'.
23+
- Enter a name for the environment: `db_metadata`
24+
- Click 'Create'.
25+
26+
Skip any prompting or options for stream governance.
27+
28+
### Then, create a cluster inside your environment.
29+
30+
- Click 'Add cluster'.
31+
- On the 'Create cluster' page, for the 'Basic' cluster, select 'Begin configuration'.
32+
- When you're prompted to select a provider and location, choose AWS's `us-east-2`.
33+
- Select 'Continue'.
34+
- Specify a cluster name, review the configuration and cost information, and then select 'Launch cluster'.
35+
- Depending on the chosen cloud provider and other settings, it may take a few minutes to provision your cluster, but after the cluster has provisioned, the 'Cluster Overview' page displays.
36+
37+
38+
### Create a Confluent Cloud API key and save it.
39+
40+
Note: if you have more than one environment, follow the secondary set of instructions below.
41+
42+
- From the 'Administration' menu, click 'Cloud API keys' or go to https://confluent.cloud/settings/api-keys.
43+
44+
- Click 'Add key'.
45+
46+
- Choose to create the key associated with your user account.
47+
48+
- The API key and secret are generated and displayed.
49+
50+
- Click 'Copy' to copy the key and secret to a secure location.
51+
52+
*Important*
53+
54+
_The secret for the key is only exposed initially in the Create API key dialog and cannot be viewed or retrieved later from the web interface. Store the secret and its corresponding key in a secure location. Do not share the secret for your API key._
55+
56+
- (Optional, but recommended) Enter a description of the API key that describes how you intend to use it, so you can distinguish it from other API keys.
57+
58+
- Select the confirmation check box that you have saved your key and secret.
59+
60+
- Click 'Save'. The key is added to the keys table.
61+
62+
_Instructions for if you have more than one environment:_
63+
64+
- go to the Environments page at https://confluent.cloud/environments and select the environment.
65+
- Select the Cluster.
66+
- Select 'API Keys' then under 'Cluster Overview'.
67+
- Click '+ Add key'. The API key and secret are generated and displayed.
68+
- Click 'Copy' to copy the key and secret to a secure location.
69+
70+
The secret for the key is only displayed in the 'Create API key' dialog and cannot be viewed or retrieved later. Store the API key and secret in a secure location. Do not share the secret for your API key.
71+
72+
- (Optional, but recommended) Enter a description of the API key that describes how you intend to use it, so you can distinguish it from other API keys. Confirm that you have saved your key and secret.
73+
- Click 'Continue'. The key is added to the API keys table.
74+
75+
### Create 1 topic with 1 partition each named `db_metadata`.
76+
77+
- From the navigation menu, click 'Topics', and then click 'Create topic'.
78+
- In the Topic name field, type “db_metadata”. Change the 'Partitions' field from 6 to 1. Then select 'Create with defaults'.
79+
80+
## Step 3: Set up your .env file
81+
82+
Add a `.env` file to the top lever of the app directory.
83+
84+
The contents should read:
85+
86+
```
87+
BOOTSTRAP_SERVERS=${bootstrap_server_address_here}
88+
USERNAME=${confluent_cloud_api_key_here}
89+
PASSWORD=${confluent_cloud_api_secret_here}
90+
```
91+
92+
## Step 4: Run the app
93+
94+
In one terminal window, run
95+
96+
`node producer.js`
97+
98+
and in the other
99+
100+
`node consumer.js`
101+
102+
https://github.com/Cerchie/basic-confluent-kafka-javascript-example/assets/54046179/62a8c715-9856-4842-848d-e97a175a6ec8
103+
104+
you can doubly verify that this is working by visiting your Confluent Cloud topic in the CC UI:
105+
106+
![CC UI image](/db_records.png)
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
const { Kafka } = require('@confluentinc/kafka-javascript').KafkaJS;
2+
// the consumerStart function is from the Confluent documentation: https://github.com/confluentinc/confluent-kafka-javascript/blob/dev_early_access_development_branch/QUICKSTART.md
3+
require("dotenv").config();
4+
5+
const kafka = new Kafka({
6+
kafkaJS: {
7+
brokers: [process.env.BOOTSTRAP_SERVERS],
8+
ssl: true,
9+
sasl: {
10+
mechanism: 'plain',
11+
username: process.env.USERNAME,
12+
password: process.env.PASSWORD,
13+
},
14+
}
15+
});
16+
17+
const consumer = kafka.consumer({'group.id': 'db_metadata', 'auto.offset.reset': 'earliest'});
18+
19+
async function consumerStart() {
20+
21+
let stopped = false;
22+
const topic = "db_metadata"
23+
24+
await consumer.connect();
25+
await consumer.subscribe({ topics: [topic] });
26+
27+
consumer.run({
28+
eachMessage: async ({ topic, partition, message }) => {
29+
console.log({
30+
topic,
31+
partition,
32+
offset: message.offset,
33+
value: message.value.toString(),
34+
});
35+
}
36+
});
37+
38+
// Update stopped whenever we're done consuming.
39+
// The update can be in another async function or scheduled with setTimeout etc.
40+
while(!stopped) {
41+
await new Promise(resolve => setTimeout(resolve, 1000));
42+
}
43+
44+
await consumer.disconnect();
45+
}
46+
47+
consumerStart();
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
{
2+
"dependencies": {
3+
"@confluentinc/kafka-javascript": "^0.1.12-devel",
4+
"@faker-js/faker": "^8.4.1",
5+
"dotenv": "^16.4.5"
6+
},
7+
"devDependencies": {
8+
"@types/node": "^20.12.7"
9+
}
10+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// require('kafkajs') is replaced with require('@confluentinc/kafka-javascript').KafkaJS.
2+
const { Kafka } = require("@confluentinc/kafka-javascript").KafkaJS;
3+
const { faker } = require('@faker-js/faker');
4+
require("dotenv").config();
5+
6+
const kafka = new Kafka({
7+
kafkaJS: {
8+
brokers: [process.env.BOOTSTRAP_SERVERS],
9+
ssl: true,
10+
sasl: {
11+
mechanism: 'plain',
12+
username: process.env.USERNAME,
13+
password: process.env.PASSWORD,
14+
},
15+
}
16+
});
17+
18+
const producer = kafka.producer();
19+
20+
// producer should always be connected at app initialization, separately from producing message
21+
const connectProducer = async () => {
22+
await producer.connect();
23+
console.log("Connected successfully");
24+
}
25+
26+
const run = async () => {
27+
28+
let messageValue = {
29+
collation: faker.database.collation(),
30+
column: faker.database.column(),
31+
engine: faker.database.engine(),
32+
mongodbObjectId: faker.database.engine(),
33+
type: faker.database.type()
34+
35+
}
36+
console.log(messageValue)
37+
producer.send({
38+
topic: 'db_metadata',
39+
messages: [
40+
{ value: JSON.stringify(messageValue) }
41+
]
42+
});
43+
44+
await producer.disconnect();
45+
46+
console.log("Disconnected successfully");
47+
}
48+
49+
(async() => {
50+
await connectProducer();
51+
await run();
52+
})();
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
BOOTSTRAP_SERVERS=${bootstrap_server_address_here}
2+
USERNAME=${confluent_cloud_api_key_here}
3+
PASSWORD=${confluent_cloud_api_secret_here}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
.env
2+
node_modules/
3+
/node_modules
4+
./node_modules
5+
6+
.npm
7+
8+
/.npm
9+
10+
.vscode

0 commit comments

Comments
 (0)