unbounded PCollections writes to files support in iobase derived IOs: AvroIO, ParquetIO , TextIO, TFRecordIO#34777
unbounded PCollections writes to files support in iobase derived IOs: AvroIO, ParquetIO , TextIO, TFRecordIO#34777razvanculea wants to merge 24 commits intoapache:masterfrom
Conversation
…IO, ParquetIO and TFRecordIO
…ming writes for AvroIO, ParquetIO and TFRecordIO
|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
sdks/python/apache_beam/io/iobase.py
Outdated
| init_result_window_coll = ( | ||
| keyed_pcoll | ||
| | core.WindowInto(window.GlobalWindows()) | ||
| | core.GroupByKey() | ||
| | 'WriteBundles' >> core.ParDo( | ||
| _WriteKeyedBundleDoFn(self.sink), AsSingleton(init_result_coll))) | ||
| | 'Pair init' >> core.Map(lambda x: (None, x)) | ||
| | 'Pair init gbk' >> core.GroupByKey() | ||
| | 'InitializeWindowedWrite' >> core.Map( | ||
| lambda _, sink: sink.initialize_write(), self.sink) |
There was a problem hiding this comment.
This doesn't have to depend on the main input, right?
sdks/python/apache_beam/io/iobase.py
Outdated
| if isinstance(pcoll.windowing.windowfn, window.GlobalWindows): | ||
| widowed_pcoll = ( | ||
| pcoll | ||
| | core.WindowInto(window.FixedWindows(self.sink.triggering_frequency), | ||
| trigger=beam.transforms.trigger.AfterWatermark(), | ||
| accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING, | ||
| allowed_lateness=beam.utils.timestamp.Duration(seconds=0)) | ||
| ) |
There was a problem hiding this comment.
If the input is globally windowed a trigger within the global window set to repeat every sink.triggering_frequency would be what I'd expect instead of windowing into FixedWindows.
There was a problem hiding this comment.
that would be an alternative way to implement.
The windowing will help have elements of the same window in the same file, vs group into batches & trigger will have elements as they come (possibly mixed)
|
Assigning reviewers. If you would like to opt out of this review, comment R: @shunping for label python. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
fix write_ptransform_test.py fix iobase.Sink.finalize_write params fix lint warnings reorder imports
|
is this PR tested with DataflowRunner? |
@liferoad i've added the iobase_it_test from PR 34814 and it runs on Dataflow (it's actually using TextIO for write), but i don't have an IT for TextIO/parquetIO/AvroIO/TFRecordIO, I can try to add them, but i can't use TestStream in Dataflow, so i'll need to use PubSub public topics , right ? |
I forgot whether periodic impulse can trigger #25598. You could try that. And better add some checks about the output results. |
@liferoad, I added an IT in parquetio_it_test.py using PeriodicImpulse and it's working fine on Dataflow Did not add IT for AvroIO/TFRecordIO because there is no IT at all. I could do like for parquetIO later, if we deem this really a plus. |
…requency param. On GlobalWindow throw an exception if trigger_frequency is not set / 0.
|
Improved support for user windowing.
If an unbounded PCollection with a user set windowing is written:
If num_shards is set : per triggering_frequency/user window, the output is written in num_shards files |
|
Reminder, please take a look at this pr: @shunping |
|
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment R: @jrmccluskey for label python. Available commands:
|
|
@razvanculea, sorry about the delay of the review. Do you mind merging to the latest master so that the tests are re-triggered? |
|
waiting on author |
|
@shunping don't review this anymore. |
|
sg! Feel free to tag me when they are ready. |
|
Reminder, please take a look at this pr: @jrmccluskey |
iobase patch for writing unbounded PColl sources.
Implement unbounded PCollections writes to files support in iobase derived IOs: AvroIO, ParquetIO , TextIO, TFRecordIO
Tests for streming PCollections writes to files in AvroIO, ParquetIO , TextIO, TFRecordIO
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.