@@ -145,7 +145,7 @@ static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkm,
145145 goto done ;
146146
147147 msgobj = Message_new0 (self , rkm );
148-
148+
149149 args = Py_BuildValue ("(OO)" , ((Message * )msgobj )-> error , msgobj );
150150
151151 Py_DECREF (msgobj );
@@ -532,6 +532,36 @@ static PyObject *Producer_abort_transaction(Handle *self, PyObject *args) {
532532 Py_RETURN_NONE ;
533533}
534534
535+ static void * Producer_purge (Handle * self , PyObject * args ,
536+ PyObject * kwargs ) {
537+ int in_queue = 1 ;
538+ int in_flight = 1 ;
539+ int blocking = 1 ;
540+ int purge_strategy = 0 ;
541+
542+ rd_kafka_resp_err_t err ;
543+ static char * kws [] = { "in_queue" , "in_flight" , "blocking" , NULL };
544+
545+ if (!PyArg_ParseTupleAndKeywords (args , kwargs , "|bbb" , kws , & in_queue , & in_flight , & blocking ))
546+ return NULL ;
547+ if (in_queue )
548+ purge_strategy = RD_KAFKA_PURGE_F_QUEUE ;
549+ if (in_flight )
550+ purge_strategy |= RD_KAFKA_PURGE_F_INFLIGHT ;
551+ if (blocking )
552+ purge_strategy |= RD_KAFKA_PURGE_F_NON_BLOCKING ;
553+
554+ err = rd_kafka_purge (self -> rk , purge_strategy );
555+
556+ if (err ) {
557+ cfl_PyErr_Format (err , "Purge failed: %s" , rd_kafka_err2str (err ));
558+ return NULL ;
559+ }
560+
561+ Py_RETURN_NONE ;
562+ }
563+
564+
535565static PyMethodDef Producer_methods [] = {
536566 { "produce" , (PyCFunction )Producer_produce ,
537567 METH_VARARGS |METH_KEYWORDS ,
@@ -597,6 +627,18 @@ static PyMethodDef Producer_methods[] = {
597627 "callbacks may be triggered.\n"
598628 "\n"
599629 },
630+ { "purge" , (PyCFunction )Producer_purge , METH_VARARGS |METH_KEYWORDS ,
631+ ".. py:function:: purge([in_queue=True], [in_flight=True], [blocking=True])\n"
632+ "\n"
633+ " Purge messages currently handled by the producer instance.\n"
634+ " The application will need to call poll() or flush() "
635+ "afterwards to serve the delivery report callbacks of the purged messages."
636+ "\n"
637+ " :param: bool in_queue: Purge messages from internal queues. By default, true.\n"
638+ " :param: bool in_flight: Purge messages in flight to or from the broker. By default, true.\n"
639+ " :param: bool blocking: If set to False, will not wait on background thread queue\n"
640+ "purging to finish. By default, true."
641+ },
600642 { "list_topics" , (PyCFunction )list_topics , METH_VARARGS |METH_KEYWORDS ,
601643 list_topics_doc
602644 },
0 commit comments