@@ -645,4 +645,69 @@ def call(_, handle)
645645 end
646646 end
647647 end
648+
649+ describe '#purge' do
650+ context 'when no outgoing messages' do
651+ it { expect ( producer . purge ) . to eq ( true ) }
652+ end
653+
654+ context 'when librdkafka purge returns an error' do
655+ before { expect ( Rdkafka ::Bindings ) . to receive ( :rd_kafka_purge ) . and_return ( -153 ) }
656+
657+ it 'expect to raise an error' do
658+ expect { producer . purge } . to raise_error ( Rdkafka ::RdkafkaError , /retry/ )
659+ end
660+ end
661+
662+ context 'when there are outgoing things in the queue' do
663+ let ( :producer ) do
664+ rdkafka_producer_config (
665+ "bootstrap.servers" : "localhost:9093" ,
666+ "message.timeout.ms" : 2_000
667+ ) . producer
668+ end
669+
670+ it "should should purge and move forward" do
671+ producer . produce (
672+ topic : "produce_test_topic" ,
673+ payload : "payload headers"
674+ )
675+
676+ expect ( producer . purge ) . to eq ( true )
677+ expect ( producer . flush ( 1_000 ) ) . to eq ( true )
678+ end
679+
680+ it "should materialize the delivery handles" do
681+ handle = producer . produce (
682+ topic : "produce_test_topic" ,
683+ payload : "payload headers"
684+ )
685+
686+ expect ( producer . purge ) . to eq ( true )
687+
688+ expect { handle . wait } . to raise_error ( Rdkafka ::RdkafkaError , /purge_queue/ )
689+ end
690+
691+ context "when using delivery_callback" do
692+ let ( :delivery_reports ) { [ ] }
693+
694+ let ( :delivery_callback ) do
695+ -> ( delivery_report ) { delivery_reports << delivery_report }
696+ end
697+
698+ before { producer . delivery_callback = delivery_callback }
699+
700+ it "should run the callback" do
701+ handle = producer . produce (
702+ topic : "produce_test_topic" ,
703+ payload : "payload headers"
704+ )
705+
706+ expect ( producer . purge ) . to eq ( true )
707+ # queue purge
708+ expect ( delivery_reports [ 0 ] . error ) . to eq ( -152 )
709+ end
710+ end
711+ end
712+ end
648713end
0 commit comments