Skip to content

Make it possible to react on rebalance#27

Open
Leonidas-from-XIV wants to merge 1 commit intodidier-wenzek:masterfrom
Leonidas-from-XIV:rebalance-cb
Open

Make it possible to react on rebalance#27
Leonidas-from-XIV wants to merge 1 commit intodidier-wenzek:masterfrom
Leonidas-from-XIV:rebalance-cb

Conversation

@Leonidas-from-XIV
Copy link
Contributor

I am attempting to implement batching using the modern Kafka API and for this I need to do a few steps as described in this comment by the author of librdkafka. One of them is to be able to flush the messages that I have collected so far when a rebalance has happened. This adds (minimal) support to notify the OCaml code when this has happened.

Let me know what you think. In particular this does not (yet?) allow the OCaml code to influence the partitioning, it just does essentially what the documentation gives example of a callback.

@didier-wenzek
Copy link
Owner

Thanks for this pull request. This is definitely a feature to have. Taking the time to review it.

@didier-wenzek
Copy link
Owner

Looks good to me. I'm just wondering how to test it. It would also be good to have it working with Lwt.

@didier-wenzek
Copy link
Owner

It will be more complex to have this working for Lwt which uses the former batch API.

Copy link
Contributor

@anmonteiro anmonteiro left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Works after getting the callback from the option block


CAMLprim value ocaml_kafka_async_new_consumer(value caml_rebalance_callback, value caml_consumer_options) {
CAMLparam2(caml_rebalance_callback, caml_consumer_options);
CAMLlocal1(result);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
CAMLlocal1(result);
CAMLlocal2(caml_callback, result);

Comment on lines +119 to +120
if (Is_block(caml_rebalance_callback)) {
opaque = ocaml_kafka_opaque_create(caml_rebalance_callback);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (Is_block(caml_rebalance_callback)) {
opaque = ocaml_kafka_opaque_create(caml_rebalance_callback);
if (Is_block(caml_rebalance_callback)) {
caml_callback = Field(caml_rebalance_callback, 0);
opaque = ocaml_kafka_opaque_create(caml_callback);

Copy link
Contributor

@anmonteiro anmonteiro left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Works after getting the callback from the option block

Copy link
Contributor

@anmonteiro anmonteiro left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Works after getting the callback from the option block

caml_cb = ((ocaml_kafka_opaque*)opaque)->caml_callback;

// do the default thing, but also call our OCaml code
switch (err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should probably handle cooperative assignors too, and call rd_kafka_incremental_assign (see https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#a10db731dc1a295bd9884e4f8cb199311)

caml_callback(caml_cb, Val_unit);
break;

default:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is missing a call to rd_kafka_assign():

In this latter case (arbitrary error), the application must call rd_kafka_assign(rk, NULL) to synchronize state.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants