- 
                Notifications
    You must be signed in to change notification settings 
- Fork 3.2k
Sync producer
Due to librdkafka's async nature, it is not trivial to implement a synchronic produce interface in the library that can be used in parallel to the standard async interface.
The best solution at this time is to provide a sync interface in your application, which fortunately is a trivial thing to do.
Note: The code has data race, this is just a sample.
This function produces a message, provides a local stack variable as the message opaque, and waits for the stack variable to change value by calling rd_kafka_poll() that in turn will call the delivery report callback sync_produce_dr_cb().
rd_kafka_resp_err_t sync_produce (rd_kafka_topic_t *rkt, int32_t partition,
                                  void *payload, size_t len,
                                  const void *key, size_t keylen) {
        rd_kafka_resp_err_t err = -12345;
        if (rd_kafka_produce(rkt, partition, 0, payload, len,
                             key, keylen, &err) == -1)
                return rd_kafka_errno2err(errno);
        while (err == -12345)
                rd_kafka_poll(rk, 1000);
        return err;
}Simply stores the result of the produce request in the provided errp pointer, this errp pointer is pointing to the err variable in msg_delivered().
static void msg_delivered (rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) {
	if (rkmessage->_private) {
		rd_kafka_resp_err_t *errp = (rd_kafka_resp_err_t *)rkmessage->_private;
		*errp = rkmessage->err;
	}
}Application's initialization code for librdkafka must set up the delivery report callback.
void my_init_code (void) {
        char errstr[512];
        rd_kafka_conf_t *rk_conf = rd_kafka_conf_new();
        /* Set delivery report callback */
        rd_kafka_conf_set_dr_msg_cb(rk_conf, msg_delivered);
        /* Minimize wait-for-larger-batch delay (since there will be no batching) */
        rd_kafka_conf_set(rk_conf, "queue.buffering.max.ms", "1", errstr, sizeof(errstr));
        /* Minimize wait-for-socket delay (otherwise you will lose 100ms per message instead just the RTT) */
        rd_kafka_conf_set(rk_conf, "socket.blocking.max.ms", "1", errstr, sizeof(errstr));
        rk = rd_kafka_new(RD_KAFKA_PRODUCER, rk_conf, errstr, sizeof(errstr));
        if (!rk)
                ERROR(errstr);
        /* create topics, etc.. */
}