@@ -494,7 +494,8 @@ def commit_offsets_async(self, offsets, callback=None):
494494 return future
495495
496496 def _do_commit_offsets_async (self , offsets , callback = None ):
497- assert self .config ['api_version' ] >= (0 , 8 , 1 ), 'Unsupported Broker API'
497+ if self .config ['api_version' ] < (0 , 8 , 1 ):
498+ raise Errors .UnsupportedVersionError ('OffsetCommitRequest requires 0.8.1+ broker' )
498499 assert all (map (lambda k : isinstance (k , TopicPartition ), offsets ))
499500 assert all (map (lambda v : isinstance (v , OffsetAndMetadata ),
500501 offsets .values ()))
@@ -516,7 +517,8 @@ def commit_offsets_sync(self, offsets, timeout_ms=None):
516517
517518 Raises error on failure
518519 """
519- assert self .config ['api_version' ] >= (0 , 8 , 1 ), 'Unsupported Broker API'
520+ if self .config ['api_version' ] < (0 , 8 , 1 ):
521+ raise Errors .UnsupportedVersionError ('OffsetCommitRequest requires 0.8.1+ broker' )
520522 assert all (map (lambda k : isinstance (k , TopicPartition ), offsets ))
521523 assert all (map (lambda v : isinstance (v , OffsetAndMetadata ),
522524 offsets .values ()))
@@ -573,7 +575,8 @@ def _send_offset_commit_request(self, offsets):
573575 Returns:
574576 Future: indicating whether the commit was successful or not
575577 """
576- assert self .config ['api_version' ] >= (0 , 8 , 1 ), 'Unsupported Broker API'
578+ if self .config ['api_version' ] < (0 , 8 , 1 ):
579+ raise Errors .UnsupportedVersionError ('OffsetCommitRequest requires 0.8.1+ broker' )
577580 assert all (map (lambda k : isinstance (k , TopicPartition ), offsets ))
578581 assert all (map (lambda v : isinstance (v , OffsetAndMetadata ),
579582 offsets .values ()))
@@ -761,7 +764,8 @@ def _send_offset_fetch_request(self, partitions):
761764 Returns:
762765 Future: resolves to dict of offsets: {TopicPartition: OffsetAndMetadata}
763766 """
764- assert self .config ['api_version' ] >= (0 , 8 , 1 ), 'Unsupported Broker API'
767+ if self .config ['api_version' ] < (0 , 8 , 1 ):
768+ raise Errors .UnsupportedVersionError ('OffsetFetchRequest requires 0.8.1+ broker' )
765769 assert all (map (lambda k : isinstance (k , TopicPartition ), partitions ))
766770 if not partitions :
767771 return Future ().success ({})
0 commit comments