@@ -29,7 +29,7 @@ use rdkafka::{
29
29
config:: FromClientConfig ,
30
30
consumer:: { BaseConsumer , Consumer } ,
31
31
error:: KafkaResult ,
32
- message:: OwnedHeaders ,
32
+ message:: { Header , OwnedHeaders } ,
33
33
producer:: { FutureProducer , FutureRecord } ,
34
34
ClientConfig , Offset ,
35
35
} ;
@@ -101,7 +101,10 @@ async fn transactional_retry() -> Result<()> {
101
101
. key ( "foo" )
102
102
. partition ( 1 )
103
103
. timestamp ( 42 )
104
- . headers ( OwnedHeaders :: new ( ) . add ( "header" , "snot" ) ) ;
104
+ . headers ( OwnedHeaders :: new ( ) . insert ( Header {
105
+ key : "header" ,
106
+ value : Some ( "snot" ) ,
107
+ } ) ) ;
105
108
if producer. send ( record, PRODUCE_TIMEOUT ) . await . is_err ( ) {
106
109
return Err ( "Unable to send record to kafka" . into ( ) ) ;
107
110
}
@@ -328,7 +331,10 @@ async fn custom_no_retry() -> Result<()> {
328
331
. key ( "foo" )
329
332
. partition ( 1 )
330
333
. timestamp ( 42 )
331
- . headers ( OwnedHeaders :: new ( ) . add ( "header" , "snot" ) ) ;
334
+ . headers ( OwnedHeaders :: new ( ) . insert ( Header {
335
+ key : "header" ,
336
+ value : Some ( "snot" ) ,
337
+ } ) ) ;
332
338
if producer. send ( record, PRODUCE_TIMEOUT ) . await . is_err ( ) {
333
339
return Err ( "Unable to send record to kafka" . into ( ) ) ;
334
340
}
@@ -530,7 +536,10 @@ async fn performance() -> Result<()> {
530
536
. key ( "foo" )
531
537
. partition ( 1 )
532
538
. timestamp ( 42 )
533
- . headers ( OwnedHeaders :: new ( ) . add ( "header" , "snot" ) ) ;
539
+ . headers ( OwnedHeaders :: new ( ) . insert ( Header {
540
+ key : "header" ,
541
+ value : Some ( "snot" ) ,
542
+ } ) ) ;
534
543
if producer. send ( record, PRODUCE_TIMEOUT ) . await . is_err ( ) {
535
544
return Err ( "Unable to send record to kafka" . into ( ) ) ;
536
545
}
0 commit comments