File tree Expand file tree Collapse file tree 1 file changed +6
-2
lines changed
sdks/python/apache_beam/io Expand file tree Collapse file tree 1 file changed +6
-2
lines changed Original file line number Diff line number Diff line change @@ -501,10 +501,14 @@ def test_write_to_dynamic_destination(self):
501501 fileio .TextSink () # pass a FileSink object
502502 ]
503503
504+ # Test assumes that all records will be handled by same worker process,
505+ # pin to FnApiRunner to guarantee hthis
506+ runner = 'FnApiRunner'
507+
504508 for sink in sink_params :
505509 dir = self ._new_tempdir ()
506510
507- with TestPipeline () as p :
511+ with TestPipeline (runner ) as p :
508512 _ = (
509513 p
510514 | "Create" >> beam .Create (range (100 ))
@@ -515,7 +519,7 @@ def test_write_to_dynamic_destination(self):
515519 sink = sink ,
516520 file_naming = fileio .destination_prefix_naming ("test" )))
517521
518- with TestPipeline () as p :
522+ with TestPipeline (runner ) as p :
519523 result = (
520524 p
521525 | fileio .MatchFiles (FileSystems .join (dir , '*' ))
You can’t perform that action at this time.
0 commit comments