Skip to content

Commit 2c9aade

Browse files
committed
style fix
1 parent 850bbc4 commit 2c9aade

9 files changed

+209
-194
lines changed

examples/share_consumer_commit_sync.c

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,7 @@ static int is_printable(const char *buf, size_t size) {
8181
return 1;
8282
}
8383

84-
static const char *
85-
ack_type_to_str(rd_kafka_share_AcknowledgeType_t type) {
84+
static const char *ack_type_to_str(rd_kafka_share_AcknowledgeType_t type) {
8685
switch (type) {
8786
case RD_KAFKA_SHARE_ACKNOWLEDGE_TYPE_ACCEPT:
8887
return "ACCEPT";
@@ -260,8 +259,9 @@ int main(int argc, char **argv) {
260259
rd_kafka_topic_partition_list_t *partitions =
261260
NULL;
262261

263-
printf("Calling "
264-
"rd_kafka_share_commit_sync()\n");
262+
printf(
263+
"Calling "
264+
"rd_kafka_share_commit_sync()\n");
265265
error = rd_kafka_share_commit_sync(
266266
rkshare, 30000, &partitions);
267267
if (error) {
@@ -271,9 +271,10 @@ int main(int argc, char **argv) {
271271
rd_kafka_error_destroy(error);
272272
} else if (partitions) {
273273
int j;
274-
printf("Commit sync results "
275-
"(%d partitions):\n",
276-
partitions->cnt);
274+
printf(
275+
"Commit sync results "
276+
"(%d partitions):\n",
277+
partitions->cnt);
277278
for (j = 0; j < partitions->cnt; j++) {
278279
rd_kafka_topic_partition_t
279280
*rktpar =

src/rdkafka_fetcher.c

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1212,7 +1212,8 @@ static rd_kafka_resp_err_t rd_kafka_share_fetch_reply_handle_partition(
12121212

12131213
/* Enqueue acknowledgement callback for this partition if callback is
12141214
* registered and we have ack_details for this partition. */
1215-
if (rkb->rkb_rk->rk_conf.share_acknowledgement_commit_cb && ack_details) {
1215+
if (rkb->rkb_rk->rk_conf.share_acknowledgement_commit_cb &&
1216+
ack_details) {
12161217
rd_kafka_share_ack_batches_t *ack_batch;
12171218
int k;
12181219
RD_LIST_FOREACH(ack_batch, ack_details, k) {
@@ -1222,7 +1223,8 @@ static rd_kafka_resp_err_t rd_kafka_share_fetch_reply_handle_partition(
12221223
ack_batch->rktpar->partition == PartitionId) {
12231224
rd_kafka_share_enqueue_ack_callback(
12241225
rkb->rkb_rk, ack_batch,
1225-
(rd_kafka_resp_err_t)AcknowledgementErrorCode);
1226+
(rd_kafka_resp_err_t)
1227+
AcknowledgementErrorCode);
12261228
break;
12271229
}
12281230
}
@@ -2007,8 +2009,8 @@ static void rd_kafka_broker_share_acknowledge_reply(rd_kafka_t *rk,
20072009
/* Enqueue ack callbacks before destroying ack_details.
20082010
* Priority order:
20092011
* 1. Top-level error (err != NO_ERROR): Apply same error to all
2010-
* partitions. This happens for transport errors, session errors, etc.
2011-
* In this case ack_results is NULL.
2012+
* partitions. This happens for transport errors, session errors,
2013+
* etc. In this case ack_results is NULL.
20122014
* 2. No top-level error: Use ack_results which contains per-partition
20132015
* results. Each partition has its own error code (NO_ERROR for
20142016
* success, or a specific per-partition error). */

src/rdkafka_op.c

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -309,8 +309,7 @@ rd_kafka_op_t *rd_kafka_op_new0(const char *source, rd_kafka_op_type_t type) {
309309
[RD_KAFKA_OP_SHARE_SESSION_PARTITION_REMOVE] = _RD_KAFKA_OP_EMPTY,
310310
[RD_KAFKA_OP_SHARE_FETCH_RESPONSE] =
311311
sizeof(rko->rko_u.share_fetch_response),
312-
[RD_KAFKA_OP_SHARE_ACK_REPLY] =
313-
sizeof(rko->rko_u.share_ack_reply),
312+
[RD_KAFKA_OP_SHARE_ACK_REPLY] = sizeof(rko->rko_u.share_ack_reply),
314313
[RD_KAFKA_OP_SHARE_COMMIT_ASYNC_FANOUT] =
315314
sizeof(rko->rko_u.share_commit_async_fanout),
316315
[RD_KAFKA_OP_SHARE_COMMIT_SYNC_FANOUT] =

src/rdkafka_op.h

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -884,17 +884,18 @@ struct rd_kafka_op_s {
884884

885885
/**
886886
* Share acknowledgement callback reply.
887-
* Contains results to deliver to share_acknowledgement_commit_cb.
887+
* Contains results to deliver to
888+
* share_acknowledgement_commit_cb.
888889
*/
889890
struct {
890891
/** List of partition offsets. */
891892
rd_kafka_share_partition_offsets_list_t *partitions;
892893
/** Callback function pointer. */
893-
void (*cb)(rd_kafka_share_t *rkshare,
894-
rd_kafka_share_partition_offsets_list_t
895-
*partitions,
896-
rd_kafka_resp_err_t err,
897-
void *opaque);
894+
void (*cb)(
895+
rd_kafka_share_t *rkshare,
896+
rd_kafka_share_partition_offsets_list_t *partitions,
897+
rd_kafka_resp_err_t err,
898+
void *opaque);
898899
/** Application opaque. */
899900
void *opaque;
900901
} share_ack_reply;

src/rdkafka_share_acknowledgement.c

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -622,8 +622,8 @@ void rd_kafka_share_partition_offsets_init(
622622
int offsets_cnt) {
623623
elem->partition.topic = rd_strdup(topic);
624624
elem->partition.partition = partition;
625-
elem->offsets = rd_calloc((size_t)offsets_cnt, sizeof(int64_t));
626-
elem->cnt = offsets_cnt;
625+
elem->offsets = rd_calloc((size_t)offsets_cnt, sizeof(int64_t));
626+
elem->cnt = offsets_cnt;
627627
}
628628

629629
/**
@@ -649,7 +649,8 @@ void rd_kafka_share_partition_offsets_clear(
649649
*
650650
* @param capacity Initial capacity for elements.
651651
* @returns Newly allocated list, or NULL if capacity is 0.
652-
* Caller must destroy with rd_kafka_share_partition_offsets_list_destroy().
652+
* Caller must destroy with
653+
* rd_kafka_share_partition_offsets_list_destroy().
653654
*/
654655
rd_kafka_share_partition_offsets_list_t *
655656
rd_kafka_share_partition_offsets_list_new(int capacity) {
@@ -676,8 +677,7 @@ rd_kafka_share_build_partition_offsets_list(
676677
int offset_idx = 0;
677678
int j;
678679

679-
if (!batches || !batches->rktpar ||
680-
rd_list_cnt(&batches->entries) == 0)
680+
if (!batches || !batches->rktpar || rd_list_cnt(&batches->entries) == 0)
681681
return NULL;
682682

683683
/* Count total offsets */
@@ -696,9 +696,9 @@ rd_kafka_share_build_partition_offsets_list(
696696
list->cnt = 1;
697697
elem = &list->elems[0];
698698

699-
rd_kafka_share_partition_offsets_init(
700-
elem, batches->rktpar->topic, batches->rktpar->partition,
701-
total_offsets);
699+
rd_kafka_share_partition_offsets_init(elem, batches->rktpar->topic,
700+
batches->rktpar->partition,
701+
total_offsets);
702702

703703
/* Fill offsets array */
704704
RD_LIST_FOREACH(entry, &batches->entries, j) {
@@ -835,11 +835,10 @@ void rd_kafka_share_dispatch_ack_callbacks(
835835
for (int p = 0; p < ack_results->cnt; p++) {
836836
rd_kafka_topic_partition_t *rktpar = &ack_results->elems[p];
837837

838-
ack_batch = rd_kafka_share_ack_batch_find(ack_details,
839-
rktpar->topic,
840-
rktpar->partition);
838+
ack_batch = rd_kafka_share_ack_batch_find(
839+
ack_details, rktpar->topic, rktpar->partition);
841840
if (ack_batch)
842841
rd_kafka_share_enqueue_ack_callback(rk, ack_batch,
843-
rktpar->err);
842+
rktpar->err);
844843
}
845844
}

src/rdkafka_share_acknowledgement.h

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -200,18 +200,19 @@ int rd_kafka_share_ack_entries_sort_cmp_ptr(const void *_a, const void *_b);
200200
* @brief Partition with set of acknowledged offsets.
201201
*/
202202
struct rd_kafka_share_partition_offsets_s {
203-
rd_kafka_topic_partition_t partition; /**< Topic partition information */
204-
int64_t *offsets; /**< Array of acknowledged offsets */
205-
int cnt; /**< Number of offsets in array */
203+
rd_kafka_topic_partition_t
204+
partition; /**< Topic partition information */
205+
int64_t *offsets; /**< Array of acknowledged offsets */
206+
int cnt; /**< Number of offsets in array */
206207
};
207208

208209
/**
209210
* @struct rd_kafka_share_partition_offsets_list_s
210211
* @brief List of share partition offsets for callback.
211212
*/
212213
struct rd_kafka_share_partition_offsets_list_s {
213-
int cnt; /**< Number of partitions */
214-
int size; /**< Allocated size */
214+
int cnt; /**< Number of partitions */
215+
int size; /**< Allocated size */
215216
rd_kafka_share_partition_offsets_t *elems; /**< Array of partition
216217
offsets */
217218
};

tests/0171-share_consumer_consume.c

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -989,7 +989,8 @@ static void test_poll_callback_piggybacked_acks(void) {
989989
ack_cb_state_t state;
990990

991991
TEST_SAY("\n");
992-
TEST_SAY("=== Poll callback test (piggybacked acks on ShareFetch) ===\n");
992+
TEST_SAY(
993+
"=== Poll callback test (piggybacked acks on ShareFetch) ===\n");
993994

994995
ack_cb_state_init(&state);
995996

@@ -1027,7 +1028,8 @@ static void test_poll_callback_piggybacked_acks(void) {
10271028
size_t m;
10281029
rd_kafka_error_t *err;
10291030

1030-
err = rd_kafka_share_consume_batch(consumer, 2000, batch, &rcvd);
1031+
err =
1032+
rd_kafka_share_consume_batch(consumer, 2000, batch, &rcvd);
10311033
if (err) {
10321034
rd_kafka_error_destroy(err);
10331035
continue;
@@ -1050,7 +1052,8 @@ static void test_poll_callback_piggybacked_acks(void) {
10501052
size_t m;
10511053
rd_kafka_error_t *err;
10521054

1053-
err = rd_kafka_share_consume_batch(consumer, 2000, batch, &rcvd);
1055+
err =
1056+
rd_kafka_share_consume_batch(consumer, 2000, batch, &rcvd);
10541057
if (err)
10551058
rd_kafka_error_destroy(err);
10561059

@@ -1062,11 +1065,13 @@ static void test_poll_callback_piggybacked_acks(void) {
10621065
state.callback_cnt, state.total_offsets,
10631066
rd_kafka_err2name(state.last_err));
10641067

1065-
TEST_ASSERT(state.callback_cnt >= 1,
1066-
"Expected at least 1 callback from piggybacked acks, got %d",
1067-
state.callback_cnt);
1068+
TEST_ASSERT(
1069+
state.callback_cnt >= 1,
1070+
"Expected at least 1 callback from piggybacked acks, got %d",
1071+
state.callback_cnt);
10681072
TEST_ASSERT(state.total_offsets > 0,
1069-
"Expected offsets in callback, got %d", state.total_offsets);
1073+
"Expected offsets in callback, got %d",
1074+
state.total_offsets);
10701075

10711076
TEST_SAY("SUCCESS: Poll callback received %d callbacks, %d offsets\n",
10721077
state.callback_cnt, state.total_offsets);
@@ -1091,8 +1096,8 @@ create_share_consumer_explicit_with_cb(const char *group,
10911096

10921097
test_conf_init(&conf, NULL, 60);
10931098
rd_kafka_conf_set(conf, "group.id", group, errstr, sizeof(errstr));
1094-
rd_kafka_conf_set(conf, "share.acknowledgement.mode", "explicit", errstr,
1095-
sizeof(errstr));
1099+
rd_kafka_conf_set(conf, "share.acknowledgement.mode", "explicit",
1100+
errstr, sizeof(errstr));
10961101
rd_kafka_conf_set_share_acknowledgement_commit_cb(conf, share_ack_cb);
10971102
rd_kafka_conf_set_opaque(conf, state);
10981103

@@ -1169,9 +1174,9 @@ static void test_ack_after_commit(void) {
11691174
rd_kafka_resp_err_t ack_err;
11701175
ack_cb_state_t state;
11711176
/* Store message info for re-ack attempt after commit */
1172-
const char *saved_topic = NULL;
1173-
int32_t saved_partition = -1;
1174-
int64_t saved_offset = -1;
1177+
const char *saved_topic = NULL;
1178+
int32_t saved_partition = -1;
1179+
int64_t saved_offset = -1;
11751180

11761181
TEST_SAY("\n");
11771182
TEST_SAY("=== Negative test: Acknowledge after commit ===\n");
@@ -1205,9 +1210,10 @@ static void test_ack_after_commit(void) {
12051210
if (!rkmessages[j]->err) {
12061211
/* Save first message info for later */
12071212
if (saved_offset < 0) {
1208-
saved_topic = topic;
1209-
saved_partition = rkmessages[j]->partition;
1210-
saved_offset = rkmessages[j]->offset;
1213+
saved_topic = topic;
1214+
saved_partition =
1215+
rkmessages[j]->partition;
1216+
saved_offset = rkmessages[j]->offset;
12111217
TEST_SAY("Saved msg info: %s [%" PRId32
12121218
"] @ %" PRId64 "\n",
12131219
saved_topic, saved_partition,

tests/0173-share_consumer_commit_async.c

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1619,10 +1619,9 @@ static void share_ack_cb(rd_kafka_share_t *rkshare,
16191619
mtx_unlock(&state->lock);
16201620
}
16211621

1622-
static rd_kafka_share_t *
1623-
create_share_consumer_with_cb(const char *group_id,
1624-
const char *ack_mode,
1625-
ack_cb_state_t *state) {
1622+
static rd_kafka_share_t *create_share_consumer_with_cb(const char *group_id,
1623+
const char *ack_mode,
1624+
ack_cb_state_t *state) {
16261625
rd_kafka_share_t *rkshare;
16271626
rd_kafka_conf_t *conf;
16281627
char errstr[512];
@@ -1735,7 +1734,8 @@ static void do_test_commit_async_callback(void) {
17351734
TEST_ASSERT(state.callback_cnt >= 1,
17361735
"Expected at least 1 callback, got %d", state.callback_cnt);
17371736
TEST_ASSERT(state.total_offsets > 0,
1738-
"Expected offsets in callback, got %d", state.total_offsets);
1737+
"Expected offsets in callback, got %d",
1738+
state.total_offsets);
17391739

17401740
rd_kafka_share_consumer_close(rkshare);
17411741
rd_kafka_share_destroy(rkshare);
@@ -1765,9 +1765,9 @@ static void do_test_ack_after_commit_async(void) {
17651765
rd_kafka_resp_err_t ack_err;
17661766
ack_cb_state_t state;
17671767
/* Store message info for re-ack attempt after commit */
1768-
const char *saved_topic = NULL;
1769-
int32_t saved_partition = -1;
1770-
int64_t saved_offset = -1;
1768+
const char *saved_topic = NULL;
1769+
int32_t saved_partition = -1;
1770+
int64_t saved_offset = -1;
17711771

17721772
SUB_TEST();
17731773

@@ -1795,9 +1795,10 @@ static void do_test_ack_after_commit_async(void) {
17951795
if (!rkmessages[j]->err) {
17961796
/* Save first message info for later */
17971797
if (saved_offset < 0) {
1798-
saved_topic = topic;
1799-
saved_partition = rkmessages[j]->partition;
1800-
saved_offset = rkmessages[j]->offset;
1798+
saved_topic = topic;
1799+
saved_partition =
1800+
rkmessages[j]->partition;
1801+
saved_offset = rkmessages[j]->offset;
18011802
TEST_SAY("Saved msg info: %s [%" PRId32
18021803
"] @ %" PRId64 "\n",
18031804
saved_topic, saved_partition,

0 commit comments

Comments
 (0)