@@ -23,10 +23,11 @@ use mz_kafka_util::client::MzClientContext;
2323use mz_ore:: cast:: CastFrom ;
2424use mz_ore:: cli:: { self , CliConfig } ;
2525use mz_ore:: retry:: Retry ;
26- use rand:: distributions:: uniform:: SampleUniform ;
27- use rand:: distributions:: { Alphanumeric , Bernoulli , Uniform , WeightedIndex } ;
26+ use rand:: distr:: uniform:: SampleUniform ;
27+ use rand:: distr:: weighted:: WeightedIndex ;
28+ use rand:: distr:: { Alphanumeric , Bernoulli , Uniform } ;
2829use rand:: prelude:: { Distribution , ThreadRng } ;
29- use rand:: thread_rng ;
30+ use rand:: rng ;
3031use rdkafka:: error:: KafkaError ;
3132use rdkafka:: producer:: { BaseRecord , Producer , ThreadedProducer } ;
3233use rdkafka:: types:: RDKafkaErrorCode ;
@@ -247,21 +248,21 @@ impl<'a> RandomAvroGenerator<'a> {
247248 x[ 0 ] . as_i64 ( ) . unwrap ( ) . try_into ( ) . unwrap ( ) ,
248249 x[ 1 ] . as_i64 ( ) . unwrap ( ) . try_into ( ) . unwrap ( ) ,
249250 ) ;
250- let dist = Uniform :: new_inclusive ( min, max) ;
251+ let dist = Uniform :: new_inclusive ( min, max) . unwrap ( ) ;
251252 move |rng| dist. sample ( rng)
252253 }
253254 fn float_dist ( json : & serde_json:: Value ) -> impl FnMut ( & mut ThreadRng ) -> f32 + Clone {
254255 let x = json. as_array ( ) . unwrap ( ) ;
255256 // TODO(benesch): rewrite to avoid `as`.
256257 #[ allow( clippy:: as_conversions) ]
257258 let ( min, max) = ( x[ 0 ] . as_f64 ( ) . unwrap ( ) as f32 , x[ 1 ] . as_f64 ( ) . unwrap ( ) as f32 ) ;
258- let dist = Uniform :: new_inclusive ( min, max) ;
259+ let dist = Uniform :: new_inclusive ( min, max) . unwrap ( ) ;
259260 move |rng| dist. sample ( rng)
260261 }
261262 fn double_dist ( json : & serde_json:: Value ) -> impl FnMut ( & mut ThreadRng ) -> f64 + Clone {
262263 let x = json. as_array ( ) . unwrap ( ) ;
263264 let ( min, max) = ( x[ 0 ] . as_f64 ( ) . unwrap ( ) , x[ 1 ] . as_f64 ( ) . unwrap ( ) ) ;
264- let dist = Uniform :: new_inclusive ( min, max) ;
265+ let dist = Uniform :: new_inclusive ( min, max) . unwrap ( ) ;
265266 move |rng| dist. sample ( rng)
266267 }
267268 fn string_dist ( json : & serde_json:: Value ) -> impl FnMut ( & mut ThreadRng ) -> Vec < u8 > + Clone {
@@ -276,7 +277,7 @@ impl<'a> RandomAvroGenerator<'a> {
276277 let mut len = integral_dist :: < usize > ( json) ;
277278 move |rng| {
278279 let len = len ( rng) ;
279- let bd = Uniform :: new_inclusive ( 0 , 255 ) ;
280+ let bd = Uniform :: new_inclusive ( 0 , 255 ) . unwrap ( ) ;
280281 iter:: repeat_with ( || bd. sample ( rng) ) . take ( len) . collect ( )
281282 }
282283 }
@@ -302,7 +303,7 @@ impl<'a> RandomAvroGenerator<'a> {
302303 min,
303304 precision
304305 ) ;
305- let dist = Uniform :: < i64 > :: new_inclusive ( min, max) ;
306+ let dist = Uniform :: < i64 > :: new_inclusive ( min, max) . unwrap ( ) ;
306307 move |rng| dist. sample ( rng) . to_be_bytes ( ) . to_vec ( )
307308 }
308309 // TODO(benesch): rewrite to avoid `as`.
@@ -602,8 +603,9 @@ async fn main() -> anyhow::Result<()> {
602603 bail ! ( "cannot specify --avro-distribution without --values=avro" ) ;
603604 }
604605 let len =
605- Uniform :: new_inclusive ( args. min_value_size . unwrap ( ) , args. max_value_size . unwrap ( ) ) ;
606- let bytes = Uniform :: new_inclusive ( 0 , 255 ) ;
606+ Uniform :: new_inclusive ( args. min_value_size . unwrap ( ) , args. max_value_size . unwrap ( ) )
607+ . unwrap ( ) ;
608+ let bytes = Uniform :: new_inclusive ( 0 , 255 ) . unwrap ( ) ;
607609
608610 ValueGenerator :: UniformBytes { len, bytes }
609611 }
@@ -677,10 +679,7 @@ async fn main() -> anyhow::Result<()> {
677679 }
678680 } ;
679681 let key_dist = if let KeyFormat :: Random = args. key_format {
680- Some ( Uniform :: new_inclusive (
681- args. key_min . unwrap ( ) ,
682- args. key_max . unwrap ( ) ,
683- ) )
682+ Some ( Uniform :: new_inclusive ( args. key_min . unwrap ( ) , args. key_max . unwrap ( ) ) . unwrap ( ) )
684683 } else {
685684 None
686685 } ;
@@ -711,7 +710,7 @@ async fn main() -> anyhow::Result<()> {
711710 n += 1 ;
712711 }
713712 scope. spawn ( move |_| {
714- let mut rng = thread_rng ( ) ;
713+ let mut rng = rng ( ) ;
715714 for _ in 0 ..n {
716715 let i = counter. fetch_add ( 1 , Ordering :: Relaxed ) ;
717716 if !args. quiet && i % 100_000 == 0 {
0 commit comments