@@ -40,12 +40,14 @@ def __init__(
4040 messages ,
4141 subscription = None ,
4242 id_attribute = None ,
43- timestamp_attribute = None ):
43+ timestamp_attribute = None ,
44+ max_read_time_seconds = None ):
4445 self ._topic = topic
4546 self ._subscription = subscription
4647 self ._messages = messages
4748 self ._id_attribute = id_attribute
4849 self ._timestamp_attribute = timestamp_attribute
50+ self ._max_read_time_seconds = max_read_time_seconds
4951
5052 def __call__ (
5153 self ,
@@ -54,11 +56,13 @@ def __call__(
5456 subscription ,
5557 with_attributes ,
5658 id_label ,
57- timestamp_attribute ):
59+ timestamp_attribute ,
60+ max_read_time_seconds = None ):
5861 assert topic == self ._topic
5962 assert id_label == self ._id_attribute
6063 assert timestamp_attribute == self ._timestamp_attribute
6164 assert subscription == self ._subscription
65+ assert max_read_time_seconds == self ._max_read_time_seconds
6266 if with_attributes :
6367 data = self ._messages
6468 else :
@@ -536,6 +540,27 @@ def test_read_proto(self):
536540 ''' )
537541 assert_that (result , equal_to (data ))
538542
543+ def test_read_with_max_read_time (self ):
544+ with beam .Pipeline (options = beam .options .pipeline_options .PipelineOptions (
545+ pickle_library = 'cloudpickle' )) as p :
546+ with mock .patch ('apache_beam.io.ReadFromPubSub' ,
547+ FakeReadFromPubSub (
548+ topic = 'my_topic' ,
549+ messages = [PubsubMessage (b'msg1' , {'attr' : 'value1' }),
550+ PubsubMessage (b'msg2' , {'attr' : 'value2' })],
551+ max_read_time_seconds = 60 )):
552+ result = p | YamlTransform (
553+ '''
554+ type: ReadFromPubSub
555+ config:
556+ topic: my_topic
557+ format: RAW
558+ max_read_time_seconds: 60
559+ ''' )
560+ assert_that (
561+ result ,
562+ equal_to ([beam .Row (payload = b'msg1' ), beam .Row (payload = b'msg2' )]))
563+
539564
540565if __name__ == '__main__' :
541566 logging .getLogger ().setLevel (logging .INFO )
0 commit comments