@@ -32,22 +32,22 @@ use crate::retry::RetryConfig;
3232#[ derive( Debug , thiserror:: Error ) ]
3333pub enum Error {
3434 #[ error( "List shards error, error details: {0:?}" ) ]
35- ListShardsError ( #[ from] SdkError < ListShardsError , AwsHttpResponse > ) ,
35+ ListShards ( #[ from] SdkError < ListShardsError , AwsHttpResponse > ) ,
3636
3737 #[ error( "List streams error, error details: {0:?}" ) ]
38- ListStreamsError ( #[ from] SdkError < ListStreamsError , AwsHttpResponse > ) ,
38+ ListStreams ( #[ from] SdkError < ListStreamsError , AwsHttpResponse > ) ,
3939
4040 #[ error( "Describe stream error, error details: {0:?}" ) ]
41- DescribeStreamSummaryError ( #[ from] SdkError < DescribeStreamSummaryError , AwsHttpResponse > ) ,
41+ DescribeStreamSummary ( #[ from] SdkError < DescribeStreamSummaryError , AwsHttpResponse > ) ,
4242
4343 #[ error( "Get shard iterator error, error details: {0:?}" ) ]
44- GetShardIteratorError ( #[ from] SdkError < GetShardIteratorError , AwsHttpResponse > ) ,
44+ GetShardIterator ( #[ from] SdkError < GetShardIteratorError , AwsHttpResponse > ) ,
4545
4646 #[ error( "Get shard records error, error details: {0:?}" ) ]
47- GetShardRecordsError ( #[ from] SdkError < GetRecordsError , AwsHttpResponse > ) ,
47+ GetShardRecords ( #[ from] SdkError < GetRecordsError , AwsHttpResponse > ) ,
4848
4949 #[ error( "Put records error, error details: {0:?}" ) ]
50- PutRecordsError ( #[ from] SdkError < PutRecordsError , AwsHttpResponse > ) ,
50+ PutRecords ( #[ from] SdkError < PutRecordsError , AwsHttpResponse > ) ,
5151
5252 #[ error( "Iterator not found for shard {0}" ) ]
5353 IteratorNotFound ( String ) ,
@@ -212,7 +212,11 @@ impl ShardSet {
212212 // Update, case 1: we've read everything we could.
213213 let iteration_elapsed = self . last_updated_at . elapsed ( ) ;
214214 if iteration_elapsed < self . round_robin_duration {
215- std:: thread:: sleep ( self . round_robin_duration - iteration_elapsed) ;
215+ std:: thread:: sleep (
216+ self . round_robin_duration
217+ . checked_sub ( iteration_elapsed)
218+ . unwrap ( ) ,
219+ ) ;
216220 }
217221 update_is_needed = true ;
218222 } else if self . had_full_round_robin
@@ -318,7 +322,7 @@ impl KinesisReader {
318322 let response = iterator_builder
319323 . send ( )
320324 . await
321- . map_err ( Error :: GetShardIteratorError ) ?;
325+ . map_err ( Error :: GetShardIterator ) ?;
322326 let iterator = response
323327 . shard_iterator ( )
324328 . ok_or ( Error :: IteratorNotFound ( shard_id. to_string ( ) ) ) ?
@@ -347,7 +351,7 @@ impl Reader for KinesisReader {
347351 let mut shard_closing_offsets: HashMap < String , String > = HashMap :: new ( ) ;
348352 for shard in & self . operated_set . assigned_shards {
349353 if let Some ( last_record_id) = & shard. last_record_id {
350- shard_closing_offsets. insert ( shard. shard_id . clone ( ) , last_record_id. to_string ( ) ) ;
354+ shard_closing_offsets. insert ( shard. shard_id . clone ( ) , last_record_id. clone ( ) ) ;
351355 }
352356 }
353357
@@ -365,7 +369,7 @@ impl Reader for KinesisReader {
365369 continue ;
366370 } ;
367371 self . shard_offsets
368- . insert ( shard_id. to_string ( ) , offset. to_string ( ) ) ;
372+ . insert ( shard_id. to_string ( ) , offset. clone ( ) ) ;
369373 if let Some ( closing_offset) = shard_closing_offsets. get ( shard_id. as_str ( ) ) {
370374 if closing_offset == offset {
371375 self . operated_set
@@ -577,7 +581,7 @@ impl KinesisWriter {
577581 . stream_name ( stream_name)
578582 . send ( )
579583 . await
580- . map_err ( Error :: DescribeStreamSummaryError ) ?;
584+ . map_err ( Error :: DescribeStreamSummary ) ?;
581585 let stream_status = stream
582586 . stream_description_summary ( )
583587 . ok_or ( Error :: StreamDoesntExist ( stream_name. clone ( ) ) ) ?
@@ -595,7 +599,7 @@ impl KinesisWriter {
595599 . limit ( 1 )
596600 . send ( )
597601 . await
598- . map_err ( Error :: ListStreamsError ) ?;
602+ . map_err ( Error :: ListStreams ) ?;
599603 }
600604 }
601605 Ok :: < _ , Error > ( ( ) )
0 commit comments