Skip to content

Commit 8549895

Browse files
committed
[KIP-932] Add rd_kafka_share_commit_sync() API
1. Core implementation: - SHARE_COMMIT_SYNC_FANOUT op type and handler in main thread - Segregate acks by partition leader into per-broker pending_commit_sync - Dispatch sync ack ops with priority over async_ack_details - Timeout timer fills remaining partitions with _TIMED_OUT - App thread blocks on temp queue until all broker replies or timeout 2. Data structures: - commit_sync_request in rkcg (abs_timeout, results, wait count, replyq, timer) - pending_commit_sync in broker (sync_ack_details, abs_timeout, request_id) - SHARE_COMMIT_SYNC_FANOUT and FANOUT_REPLY op types with union members 3. Broker thread changes: - ShareAcknowledge reply handler populates ack_results in reply op - Main thread reply handler copies per-partition errors to sync results - Pending sync acks dispatched before async when broker becomes free 4. Tests (0176-share_consumer_commit_sync.c): - Basic implicit/explicit ack mode commit_sync - No pending acks returns NULL/NULL - Commit prevents redelivery (Consumer B gets 0 records) - Mixed ack types (ACCEPT/RELEASE/REJECT) with delivery count verification - Multiple commit_sync calls with back-to-back no-op commits - Multi-topic multi-partition (10 topics x 6 partitions x 10 msgs) - Mock: verifies ShareAcknowledge RPC count matches commit_sync calls - Mock: timeout handling with 5s RTT / 2s timeout and recovery - Mixed commit types (10 async + 1 sync pattern) - Mock: broker dispatch priority (sync before async) 5. Example program and build integration
1 parent c9e39d3 commit 8549895

File tree

12 files changed

+2894
-20
lines changed

12 files changed

+2894
-20
lines changed

examples/Makefile

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ EXAMPLES ?= rdkafka_example rdkafka_performance rdkafka_example_cpp \
22
rdkafka_complex_consumer_example rdkafka_complex_consumer_example_cpp \
33
kafkatest_verifiable_client \
44
producer consumer share_consumer share_consumer_commit_async \
5+
share_consumer_commit_sync \
56
idempotent_producer transactions \
67
delete_records \
78
openssl_engine_example_cpp \
@@ -63,6 +64,10 @@ share_consumer_commit_async: ../src/librdkafka.a share_consumer_commit_async.c
6364
$(CC) $(CPPFLAGS) $(CFLAGS) $@.c -o $@ $(LDFLAGS) \
6465
../src/librdkafka.a $(LIBS)
6566

67+
share_consumer_commit_sync: ../src/librdkafka.a share_consumer_commit_sync.c
68+
$(CC) $(CPPFLAGS) $(CFLAGS) $@.c -o $@ $(LDFLAGS) \
69+
../src/librdkafka.a $(LIBS)
70+
6671
idempotent_producer: ../src/librdkafka.a idempotent_producer.c
6772
$(CC) $(CPPFLAGS) $(CFLAGS) $@.c -o $@ $(LDFLAGS) \
6873
../src/librdkafka.a $(LIBS)
Lines changed: 303 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,303 @@
1+
/*
2+
* librdkafka - Apache Kafka C library
3+
*
4+
* Copyright (c) 2026, Confluent Inc.
5+
* All rights reserved.
6+
*
7+
* Redistribution and use in source and binary forms, with or without
8+
* modification, are permitted provided that the following conditions are met:
9+
*
10+
* 1. Redistributions of source code must retain the above copyright notice,
11+
* this list of conditions and the following disclaimer.
12+
* 2. Redistributions in binary form must reproduce the above copyright notice,
13+
* this list of conditions and the following disclaimer in the documentation
14+
* and/or other materials provided with the distribution.
15+
*
16+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17+
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18+
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19+
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20+
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21+
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22+
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23+
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24+
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25+
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26+
* POSSIBILITY OF SUCH DAMAGE.
27+
*/
28+
29+
/**
30+
* Share consumer example using rd_kafka_share_commit_sync() to
31+
* explicitly acknowledge and synchronously commit records.
32+
*
33+
* Usage:
34+
* share_consumer_commit_sync <broker> <group.id> <topic1> [topic2 ...]
35+
*
36+
* This example demonstrates:
37+
* - Consuming records with rd_kafka_share_consume_batch()
38+
* - Explicitly acknowledging individual records with
39+
* rd_kafka_share_acknowledge_type() using ACCEPT, RELEASE, or REJECT
40+
* - Committing acknowledgements synchronously with
41+
* rd_kafka_share_commit_sync() at ~10% rate mid-batch,
42+
* printing per-partition results
43+
*/
44+
45+
#ifndef _POSIX_C_SOURCE
46+
#define _POSIX_C_SOURCE 199309L
47+
#endif
48+
49+
#include <stdio.h>
50+
#include <signal.h>
51+
#include <string.h>
52+
#include <ctype.h>
53+
#include <stdlib.h>
54+
#include <time.h>
55+
56+
/* Typical include path would be <librdkafka/rdkafka.h>, but this program
57+
* is builtin from within the librdkafka source tree and thus differs. */
58+
#include "rdkafka.h"
59+
60+
61+
static volatile sig_atomic_t run = 1;
62+
63+
/**
64+
* @brief Signal termination of program
65+
*/
66+
static void stop(int sig) {
67+
run = 0;
68+
}
69+
70+
71+
/**
72+
* @returns 1 if all bytes are printable, else 0.
73+
*/
74+
static int is_printable(const char *buf, size_t size) {
75+
size_t i;
76+
77+
for (i = 0; i < size; i++)
78+
if (!isprint((int)buf[i]))
79+
return 0;
80+
81+
return 1;
82+
}
83+
84+
static const char *
85+
ack_type_to_str(rd_kafka_share_AcknowledgeType_t type) {
86+
switch (type) {
87+
case RD_KAFKA_SHARE_ACKNOWLEDGE_TYPE_ACCEPT:
88+
return "ACCEPT";
89+
case RD_KAFKA_SHARE_ACKNOWLEDGE_TYPE_RELEASE:
90+
return "RELEASE";
91+
case RD_KAFKA_SHARE_ACKNOWLEDGE_TYPE_REJECT:
92+
return "REJECT";
93+
default:
94+
return "UNKNOWN";
95+
}
96+
}
97+
98+
99+
int main(int argc, char **argv) {
100+
rd_kafka_share_t *rkshare;
101+
rd_kafka_conf_t *conf;
102+
rd_kafka_resp_err_t err;
103+
char errstr[512];
104+
const char *brokers;
105+
const char *groupid;
106+
char **topics;
107+
int topic_cnt;
108+
rd_kafka_topic_partition_list_t *subscription;
109+
int i;
110+
111+
if (argc < 4) {
112+
fprintf(stderr,
113+
"%% Usage: "
114+
"%s <broker> <group.id> <topic1> [topic2 ..]\n",
115+
argv[0]);
116+
return 1;
117+
}
118+
119+
brokers = argv[1];
120+
groupid = argv[2];
121+
topics = &argv[3];
122+
topic_cnt = argc - 3;
123+
124+
conf = rd_kafka_conf_new();
125+
126+
if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr,
127+
sizeof(errstr)) != RD_KAFKA_CONF_OK) {
128+
fprintf(stderr, "%s\n", errstr);
129+
rd_kafka_conf_destroy(conf);
130+
return 1;
131+
}
132+
133+
if (rd_kafka_conf_set(conf, "group.id", groupid, errstr,
134+
sizeof(errstr)) != RD_KAFKA_CONF_OK) {
135+
fprintf(stderr, "%s\n", errstr);
136+
rd_kafka_conf_destroy(conf);
137+
return 1;
138+
}
139+
140+
if (rd_kafka_conf_set(conf, "share.acknowledgement.mode", "explicit",
141+
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
142+
fprintf(stderr, "%s\n", errstr);
143+
rd_kafka_conf_destroy(conf);
144+
return 1;
145+
}
146+
147+
rkshare = rd_kafka_share_consumer_new(conf, errstr, sizeof(errstr));
148+
if (!rkshare) {
149+
fprintf(stderr, "%% Failed to create new share consumer: %s\n",
150+
errstr);
151+
return 1;
152+
}
153+
154+
conf = NULL;
155+
156+
subscription = rd_kafka_topic_partition_list_new(topic_cnt);
157+
for (i = 0; i < topic_cnt; i++)
158+
rd_kafka_topic_partition_list_add(subscription, topics[i],
159+
RD_KAFKA_PARTITION_UA);
160+
161+
err = rd_kafka_share_subscribe(rkshare, subscription);
162+
if (err) {
163+
fprintf(stderr, "%% Failed to subscribe to %d topics: %s\n",
164+
subscription->cnt, rd_kafka_err2str(err));
165+
rd_kafka_topic_partition_list_destroy(subscription);
166+
rd_kafka_share_destroy(rkshare);
167+
return 1;
168+
}
169+
170+
fprintf(stderr,
171+
"%% Subscribed to %d topic(s), "
172+
"waiting for rebalance and messages...\n",
173+
subscription->cnt);
174+
175+
rd_kafka_topic_partition_list_destroy(subscription);
176+
177+
signal(SIGINT, stop);
178+
179+
srand((unsigned int)time(NULL));
180+
181+
rd_kafka_message_t *rkmessages[10001];
182+
while (run) {
183+
size_t rcvd_msgs = 0;
184+
rd_kafka_error_t *error;
185+
186+
printf("Calling rd_kafka_share_consume_batch()\n");
187+
error = rd_kafka_share_consume_batch(rkshare, 3000, rkmessages,
188+
&rcvd_msgs);
189+
190+
if (error) {
191+
fprintf(stderr, "%% Consume error: %s\n",
192+
rd_kafka_error_string(error));
193+
rd_kafka_error_destroy(error);
194+
continue;
195+
}
196+
197+
if (rcvd_msgs == 0)
198+
continue;
199+
200+
printf("Received %zu messages\n", rcvd_msgs);
201+
202+
for (i = 0; i < (int)rcvd_msgs; i++) {
203+
rd_kafka_message_t *rkm = rkmessages[i];
204+
rd_kafka_share_AcknowledgeType_t ack_type;
205+
int r;
206+
207+
if (rkm->err) {
208+
fprintf(stderr, "%% Consumer error: %d: %s\n",
209+
rkm->err, rd_kafka_message_errstr(rkm));
210+
rd_kafka_message_destroy(rkm);
211+
continue;
212+
}
213+
214+
/* Randomly choose ack type:
215+
* 50% ACCEPT, 30% RELEASE, 20% REJECT */
216+
r = rand() % 100;
217+
if (r < 50)
218+
ack_type =
219+
RD_KAFKA_SHARE_ACKNOWLEDGE_TYPE_ACCEPT;
220+
else if (r < 80)
221+
ack_type =
222+
RD_KAFKA_SHARE_ACKNOWLEDGE_TYPE_RELEASE;
223+
else
224+
ack_type =
225+
RD_KAFKA_SHARE_ACKNOWLEDGE_TYPE_REJECT;
226+
227+
printf("Message on %s [%" PRId32 "] at offset %" PRId64
228+
" (delivery count: %" PRId16 ") -> %s",
229+
rd_kafka_topic_name(rkm->rkt), rkm->partition,
230+
rkm->offset,
231+
rd_kafka_message_delivery_count(rkm),
232+
ack_type_to_str(ack_type));
233+
234+
if (rkm->key && is_printable(rkm->key, rkm->key_len))
235+
printf(" Key: %.*s", (int)rkm->key_len,
236+
(const char *)rkm->key);
237+
238+
if (rkm->payload &&
239+
is_printable(rkm->payload, rkm->len))
240+
printf(" Value: %.*s", (int)rkm->len,
241+
(const char *)rkm->payload);
242+
243+
printf("\n");
244+
245+
err = rd_kafka_share_acknowledge_type(rkshare, rkm,
246+
ack_type);
247+
if (err)
248+
fprintf(stderr,
249+
"%% Acknowledge error for "
250+
"%s [%" PRId32 "] @ %" PRId64 ": %s\n",
251+
rd_kafka_topic_name(rkm->rkt),
252+
rkm->partition, rkm->offset,
253+
rd_kafka_err2str(err));
254+
255+
rd_kafka_message_destroy(rkm);
256+
257+
/* Randomly commit ~10% of the time to
258+
* exercise sync commit mid-batch. */
259+
if (run && (rand() % 100) < 10) {
260+
rd_kafka_topic_partition_list_t *partitions =
261+
NULL;
262+
263+
printf("Calling "
264+
"rd_kafka_share_commit_sync()\n");
265+
error = rd_kafka_share_commit_sync(
266+
rkshare, 30000, &partitions);
267+
if (error) {
268+
fprintf(stderr,
269+
"%% Commit sync error: %s\n",
270+
rd_kafka_error_string(error));
271+
rd_kafka_error_destroy(error);
272+
} else if (partitions) {
273+
int j;
274+
printf("Commit sync results "
275+
"(%d partitions):\n",
276+
partitions->cnt);
277+
for (j = 0; j < partitions->cnt; j++) {
278+
rd_kafka_topic_partition_t
279+
*rktpar =
280+
&partitions->elems[j];
281+
printf(" %s [%" PRId32
282+
"]: %s\n",
283+
rktpar->topic,
284+
rktpar->partition,
285+
rd_kafka_err2str(
286+
rktpar->err));
287+
}
288+
rd_kafka_topic_partition_list_destroy(
289+
partitions);
290+
} else {
291+
printf("No pending acks to commit\n");
292+
}
293+
}
294+
}
295+
}
296+
297+
fprintf(stderr, "%% Closing share consumer\n");
298+
rd_kafka_share_consumer_close(rkshare);
299+
300+
rd_kafka_share_destroy(rkshare);
301+
302+
return 0;
303+
}

0 commit comments

Comments
 (0)