Skip to content

Commit 165d311

Browse files
authored
dev-confluentkafka-output-stores-event-object (#861)
* add ng confluentkafka_output * add on delivery_callback * add processor example
1 parent 03fe6cd commit 165d311

File tree

7 files changed

+1254
-1
lines changed

7 files changed

+1254
-1
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
* make `opensearch_output` handle Event class based objects
1515
* deprecate `s3_output` as it does not fit into new architecture
1616
* deprecate `http_output` as it does not fit into new architecture
17+
* make confluentkafka_output store Event class based objects
1718

1819
### Bugfix
1920

Lines changed: 251 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "markdown",
5+
"metadata": {},
6+
"source": [
7+
"# Usage of ConfluentKafka Output Connector with Event Objects\n"
8+
]
9+
},
10+
{
11+
"cell_type": "markdown",
12+
"metadata": {},
13+
"source": [
14+
"The following example demonstrates the delivery of events to the opensearch output connector"
15+
]
16+
},
17+
{
18+
"cell_type": "code",
19+
"execution_count": 1,
20+
"metadata": {
21+
"vscode": {
22+
"languageId": "shellscript"
23+
}
24+
},
25+
"outputs": [
26+
{
27+
"name": "stderr",
28+
"output_type": "stream",
29+
"text": [
30+
" Container kafka Stopping\n",
31+
" Container kafka Stopped\n",
32+
" Container kafka Removing\n",
33+
" Container kafka Removed\n",
34+
" Network compose_kafka Removing\n",
35+
" Network compose_kafka Removed\n",
36+
" Network compose_kafka Creating\n",
37+
" Network compose_kafka Created\n",
38+
" Container kafka Creating\n",
39+
" Container kafka Created\n",
40+
" Container kafka Starting\n",
41+
" Container kafka Started\n"
42+
]
43+
}
44+
],
45+
"source": [
46+
"%%bash\n",
47+
"docker compose -f ../../../../../examples/compose/docker-compose.yml down -v \n",
48+
"docker compose -f ../../../../../examples/compose/docker-compose.yml up -d kafka\n"
49+
]
50+
},
51+
{
52+
"cell_type": "code",
53+
"execution_count": 3,
54+
"metadata": {},
55+
"outputs": [
56+
{
57+
"name": "stdout",
58+
"output_type": "stream",
59+
"text": [
60+
"DEBUG:KafkaOutput:Produced message {'message': 'Event 0', '@timestamp': '2025-07-28 12:24:33.325385+00:00'} to topic consumer\n",
61+
"DEBUG:KafkaOutput:Produced message {'message': 'Event 1', '@timestamp': '2025-07-28 12:24:33.325411+00:00'} to topic consumer\n",
62+
"DEBUG:KafkaOutput:Produced message {'message': 'Event 2', '@timestamp': '2025-07-28 12:24:33.325420+00:00'} to topic consumer\n",
63+
"DEBUG:KafkaOutput:Produced message {'message': 'Event 3', '@timestamp': '2025-07-28 12:24:33.325426+00:00'} to topic consumer\n",
64+
"DEBUG:KafkaOutput:Produced message {'message': 'Event 4', '@timestamp': '2025-07-28 12:24:33.325432+00:00'} to topic consumer\n",
65+
"DEBUG:KafkaOutput:Produced message {'message': 'Event 5', '@timestamp': '2025-07-28 12:24:33.325438+00:00'} to topic consumer\n",
66+
"DEBUG:KafkaOutput:Produced message {'message': 'Event 6', '@timestamp': '2025-07-28 12:24:33.325444+00:00'} to topic consumer\n",
67+
"DEBUG:KafkaOutput:Produced message {'message': 'Event 7', '@timestamp': '2025-07-28 12:24:33.325450+00:00'} to topic consumer\n",
68+
"DEBUG:KafkaOutput:Produced message {'message': 'Event 8', '@timestamp': '2025-07-28 12:24:33.325456+00:00'} to topic consumer\n",
69+
"DEBUG:KafkaOutput:Produced message {'message': 'Event 9', '@timestamp': '2025-07-28 12:24:33.325462+00:00'} to topic consumer\n",
70+
"DEBUG:KafkaOutput:Message delivered to 'consumer' partition 0, offset 10\n",
71+
"DEBUG:KafkaOutput:Message delivered to 'consumer' partition 1, offset 0\n",
72+
"DEBUG:KafkaOutput:Message delivered to 'consumer' partition 1, offset 1\n",
73+
"DEBUG:KafkaOutput:Message delivered to 'consumer' partition 1, offset 2\n",
74+
"DEBUG:KafkaOutput:Message delivered to 'consumer' partition 2, offset 0\n",
75+
"DEBUG:KafkaOutput:Message delivered to 'consumer' partition 2, offset 1\n",
76+
"DEBUG:KafkaOutput:Message delivered to 'consumer' partition 2, offset 2\n",
77+
"DEBUG:KafkaOutput:Message delivered to 'consumer' partition 3, offset 0\n",
78+
"DEBUG:KafkaOutput:Message delivered to 'consumer' partition 3, offset 1\n",
79+
"DEBUG:KafkaOutput:Message delivered to 'consumer' partition 3, offset 2\n",
80+
"INFO:KafkaOutput:Producer flushed successfully. 0 messages remaining.\n"
81+
]
82+
}
83+
],
84+
"source": [
85+
"from typing import Iterator\n",
86+
"from logprep.factory import Factory\n",
87+
"from logprep.util.time import TimeParser\n",
88+
"from logprep.ng.connector.confluent_kafka.output import ConfluentKafkaOutput\n",
89+
"from logprep.ng.event.log_event import LogEvent\n",
90+
"from logprep.ng.event.event_state import EventStateType\n",
91+
"import logging\n",
92+
"import sys\n",
93+
"\n",
94+
"# Configure logging\n",
95+
"logging.basicConfig(\n",
96+
" level=logging.DEBUG, \n",
97+
" stream=sys.stdout\n",
98+
")\n",
99+
"\n",
100+
"\n",
101+
"config = {\n",
102+
" \"type\": \"ng_confluentkafka_output\",\n",
103+
" \"topic\": \"consumer\",\n",
104+
" \"flush_timeout\": 300,\n",
105+
" \"send_timeout\": 0,\n",
106+
" \"kafka_config\": {\n",
107+
" \"bootstrap.servers\": \"127.0.0.1:9092\"\n",
108+
" }\n",
109+
"}\n",
110+
"\n",
111+
"confluent_kafka_output: ConfluentKafkaOutput = Factory.create({\"my_kafka\": config})\n",
112+
"\n",
113+
"\n",
114+
"events: Iterator = [\n",
115+
" LogEvent({\"message\": f\"Event {i}\", \"@timestamp\": str(TimeParser.now())}, original=b\"\", state=EventStateType.PROCESSED)\n",
116+
" for i in range(10)\n",
117+
"]\n",
118+
"\n",
119+
"# store events in the Opensearch output\n",
120+
"for event in events:\n",
121+
" confluent_kafka_output.store(event)\n",
122+
"\n",
123+
"# event goes to state STORED_IN_OUTPUT first and then after callback from librdkafka it will be changed to DELIVERED\n",
124+
"# assert events[-1].state == EventStateType.STORED_IN_OUTPUT\n",
125+
"\n",
126+
"# Flush the output to ensure all events are sent\n",
127+
"confluent_kafka_output.shut_down()\n"
128+
]
129+
},
130+
{
131+
"cell_type": "code",
132+
"execution_count": 4,
133+
"metadata": {},
134+
"outputs": [
135+
{
136+
"name": "stdout",
137+
"output_type": "stream",
138+
"text": [
139+
"Events total: 10\n",
140+
"Events in delivered state: 10\n",
141+
"Events not delivered: []\n"
142+
]
143+
}
144+
],
145+
"source": [
146+
"print(f\"Events total: {len(events)}\")\n",
147+
"print(f\"Events in delivered state: {len([e for e in events if e.state == EventStateType.DELIVERED])}\")\n",
148+
"\n",
149+
"print(f\"Events not delivered: {[event for event in events if event.state != EventStateType.DELIVERED]}\")\n",
150+
"\n",
151+
"\n",
152+
"# Verify that all events are delivered\n",
153+
"for event in events:\n",
154+
" assert event.state == EventStateType.DELIVERED, f\"Event {event.data['message']} not delivered | State: {event.state}\""
155+
]
156+
},
157+
{
158+
"cell_type": "markdown",
159+
"metadata": {},
160+
"source": [
161+
"The following case demonstrates error handling in the confluent_kafka output.\n",
162+
"We try to send to a non existing topic.\n",
163+
"This should provoke an error for that unknown topic or partition."
164+
]
165+
},
166+
{
167+
"cell_type": "code",
168+
"execution_count": 5,
169+
"metadata": {},
170+
"outputs": [
171+
{
172+
"name": "stdout",
173+
"output_type": "stream",
174+
"text": [
175+
"DEBUG:KafkaOutput:Produced message {'message': 'Event 0', '@timestamp': '2025-07-28 12:24:44.018526+00:00'} to topic non_existent_topic\n",
176+
"DEBUG:KafkaOutput:Produced message {'message': 'Event 1', '@timestamp': '2025-07-28 12:24:44.018555+00:00'} to topic non_existent_topic\n",
177+
"DEBUG:KafkaOutput:Produced message {'message': 'Event 2', '@timestamp': '2025-07-28 12:24:44.018564+00:00'} to topic non_existent_topic\n",
178+
"DEBUG:KafkaOutput:Produced message {'message': 'Event 3', '@timestamp': '2025-07-28 12:24:44.018571+00:00'} to topic non_existent_topic\n",
179+
"DEBUG:KafkaOutput:Produced message {'message': 'Event 4', '@timestamp': '2025-07-28 12:24:44.018577+00:00'} to topic non_existent_topic\n",
180+
"DEBUG:KafkaOutput:Produced message {'message': 'Event 5', '@timestamp': '2025-07-28 12:24:44.018584+00:00'} to topic non_existent_topic\n",
181+
"DEBUG:KafkaOutput:Produced message {'message': 'Event 6', '@timestamp': '2025-07-28 12:24:44.018590+00:00'} to topic non_existent_topic\n",
182+
"DEBUG:KafkaOutput:Produced message {'message': 'Event 7', '@timestamp': '2025-07-28 12:24:44.018596+00:00'} to topic non_existent_topic\n",
183+
"DEBUG:KafkaOutput:Produced message {'message': 'Event 8', '@timestamp': '2025-07-28 12:24:44.018602+00:00'} to topic non_existent_topic\n",
184+
"DEBUG:KafkaOutput:Produced message {'message': 'Event 9', '@timestamp': '2025-07-28 12:24:44.018608+00:00'} to topic non_existent_topic\n",
185+
"ERROR:KafkaOutput:Message delivery failed: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str=\"Broker: Unknown topic or partition\"}\n",
186+
"ERROR:KafkaOutput:Message delivery failed: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str=\"Broker: Unknown topic or partition\"}\n",
187+
"ERROR:KafkaOutput:Message delivery failed: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str=\"Broker: Unknown topic or partition\"}\n",
188+
"ERROR:KafkaOutput:Message delivery failed: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str=\"Broker: Unknown topic or partition\"}\n",
189+
"ERROR:KafkaOutput:Message delivery failed: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str=\"Broker: Unknown topic or partition\"}\n",
190+
"ERROR:KafkaOutput:Message delivery failed: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str=\"Broker: Unknown topic or partition\"}\n",
191+
"ERROR:KafkaOutput:Message delivery failed: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str=\"Broker: Unknown topic or partition\"}\n",
192+
"ERROR:KafkaOutput:Message delivery failed: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str=\"Broker: Unknown topic or partition\"}\n",
193+
"ERROR:KafkaOutput:Message delivery failed: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str=\"Broker: Unknown topic or partition\"}\n",
194+
"ERROR:KafkaOutput:Message delivery failed: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str=\"Broker: Unknown topic or partition\"}\n",
195+
"INFO:KafkaOutput:Producer flushed successfully. 0 messages remaining.\n",
196+
"Event Event 0 failed with error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str=\"Broker: Unknown topic or partition\"}\n",
197+
"Event Event 1 failed with error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str=\"Broker: Unknown topic or partition\"}\n",
198+
"Event Event 2 failed with error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str=\"Broker: Unknown topic or partition\"}\n",
199+
"Event Event 3 failed with error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str=\"Broker: Unknown topic or partition\"}\n",
200+
"Event Event 4 failed with error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str=\"Broker: Unknown topic or partition\"}\n",
201+
"Event Event 5 failed with error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str=\"Broker: Unknown topic or partition\"}\n",
202+
"Event Event 6 failed with error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str=\"Broker: Unknown topic or partition\"}\n",
203+
"Event Event 7 failed with error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str=\"Broker: Unknown topic or partition\"}\n",
204+
"Event Event 8 failed with error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str=\"Broker: Unknown topic or partition\"}\n",
205+
"Event Event 9 failed with error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str=\"Broker: Unknown topic or partition\"}\n"
206+
]
207+
}
208+
],
209+
"source": [
210+
"events: Iterator = [\n",
211+
" LogEvent({\"message\": f\"Event {i}\", \"@timestamp\": str(TimeParser.now())}, original=b\"\", state=EventStateType.PROCESSED)\n",
212+
" for i in range(10)\n",
213+
"]\n",
214+
"\n",
215+
"# store events in the Confluent Kafka output\n",
216+
"for event in events:\n",
217+
" confluent_kafka_output.store_custom(event, \"non_existent_topic\")\n",
218+
"\n",
219+
"# Flush the output to ensure all events are sent\n",
220+
"confluent_kafka_output.flush()\n",
221+
"# Verify that all events are delivered\n",
222+
"for event in events:\n",
223+
" assert event.state == EventStateType.FAILED\n",
224+
" assert len(event.errors) == 1\n",
225+
" print (f\"Event {event.data['message']} failed with error: {event.errors[0]}\")"
226+
]
227+
}
228+
],
229+
"metadata": {
230+
"kernelspec": {
231+
"display_name": ".venv",
232+
"language": "python",
233+
"name": "python3"
234+
},
235+
"language_info": {
236+
"codemirror_mode": {
237+
"name": "ipython",
238+
"version": 3
239+
},
240+
"file_extension": ".py",
241+
"mimetype": "text/x-python",
242+
"name": "python",
243+
"nbconvert_exporter": "python",
244+
"pygments_lexer": "ipython3",
245+
"version": "3.12.3"
246+
},
247+
"orig_nbformat": 4
248+
},
249+
"nbformat": 4,
250+
"nbformat_minor": 2
251+
}

0 commit comments

Comments
 (0)