1616 * under the License.
1717 */
1818
19- use crate :: actors:: {
20- ApiLabel , BatchMetrics , BenchmarkInit ,
21- producer:: client:: {
22- BenchmarkProducerClient ,
23- interface:: { BenchmarkProducerConfig , ProducerClient } ,
19+ use crate :: {
20+ actors:: {
21+ ApiLabel , BatchMetrics , BenchmarkInit ,
22+ producer:: client:: {
23+ BenchmarkProducerClient ,
24+ interface:: { BenchmarkProducerConfig , ProducerClient } ,
25+ } ,
2426 } ,
27+ utils:: batch_generator:: BenchmarkBatchGenerator ,
2528} ;
2629use iggy:: prelude:: * ;
2730use integration:: test_server:: { ClientFactory , login_root} ;
@@ -43,46 +46,58 @@ impl HighLevelProducerClient {
4346 }
4447 }
4548}
46- # [ allow ( clippy :: significant_drop_tightening ) ]
49+
4750impl ProducerClient for HighLevelProducerClient {
4851 async fn produce_batch (
4952 & mut self ,
50- batch_generator : & mut crate :: utils :: batch_generator :: BenchmarkBatchGenerator ,
53+ batch_generator : & mut BenchmarkBatchGenerator ,
5154 ) -> Result < Option < BatchMetrics > , IggyError > {
52- let producer = self . producer . as_mut ( ) . expect ( "Producer not initialized" ) ;
53-
5455 let batch = batch_generator. generate_owned_batch ( ) ;
5556 if batch. messages . is_empty ( ) {
5657 return Ok ( None ) ;
5758 }
5859 let message_count = u32:: try_from ( batch. messages . len ( ) ) . unwrap ( ) ;
60+ let user_data_bytes = batch. user_data_bytes ;
61+ let total_bytes = batch. total_bytes ;
62+
5963 let before_send = Instant :: now ( ) ;
60- producer. send ( batch. messages ) . await ?;
64+ self . producer
65+ . as_mut ( )
66+ . expect ( "Producer not initialized" )
67+ . send ( batch. messages )
68+ . await ?;
6169 let latency = before_send. elapsed ( ) ;
6270
6371 Ok ( Some ( BatchMetrics {
6472 messages : message_count,
65- user_data_bytes : batch . user_data_bytes ,
66- total_bytes : batch . total_bytes ,
73+ user_data_bytes,
74+ total_bytes,
6775 latency,
6876 } ) )
6977 }
7078}
7179
7280impl BenchmarkInit for HighLevelProducerClient {
7381 async fn setup ( & mut self ) -> Result < ( ) , IggyError > {
74- let topic_id: u32 = 1 ;
82+ let topic_id_str = "topic-1" ;
83+ let default_partition_id = 0u32 ;
7584
7685 let client = self . client_factory . create_client ( ) . await ;
7786 let client = IggyClient :: create ( client, None , None ) ;
7887 login_root ( & client) . await ;
7988
8089 let stream_id_str = self . config . stream_id . clone ( ) ;
81- let topic_id_str = topic_id. to_string ( ) ;
90+
91+ let partitioning = match self . config . partitions {
92+ 0 => panic ! ( "Partition count must be greater than 0" ) ,
93+ 1 => Partitioning :: partition_id ( default_partition_id) ,
94+ _ => Partitioning :: balanced ( ) ,
95+ } ;
8296
8397 self . producer = Some (
8498 client
85- . producer ( & stream_id_str, & topic_id_str) ?
99+ . producer ( & stream_id_str, topic_id_str) ?
100+ . partitioning ( partitioning)
86101 . create_stream_if_not_exists ( )
87102 . create_topic_if_not_exists (
88103 self . config . partitions ,
@@ -92,7 +107,6 @@ impl BenchmarkInit for HighLevelProducerClient {
92107 )
93108 . build ( ) ,
94109 ) ;
95-
96110 self . producer . as_mut ( ) . unwrap ( ) . init ( ) . await ?;
97111 Ok ( ( ) )
98112 }
0 commit comments