Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
0e0cb0b
iobase patch for unbounded PColl and adding streaming writes for Avro…
razvanculea Apr 29, 2025
eb3bb71
update changes.md : iobase patch for unbounded PColl and adding strea…
razvanculea Apr 29, 2025
3c720d3
Merge branch 'master' into iobase
razvanculea Apr 29, 2025
b5c6700
apply yapf
razvanculea Apr 29, 2025
447cef5
Merge branch 'apache:master' into iobase
razvanculea Apr 29, 2025
f01b843
clean debug logs
razvanculea Apr 29, 2025
d708e7c
moar yapf 0.29 and pylint
razvanculea Apr 30, 2025
2d92cab
revert a change interating over None in iobase._finalize_write
razvanculea Apr 30, 2025
ac24e0d
Merge branch 'master' into iobase
razvanculea Apr 30, 2025
ac46d43
fix test sink in snippets
razvanculea Apr 30, 2025
f13a3e3
add more detail on IOBase.sink change
razvanculea Apr 30, 2025
c4e57f2
text formatting fix
razvanculea Apr 30, 2025
afb69c7
make triggering_frequent optional in _create_parquet_sink
razvanculea Apr 30, 2025
5bd5e15
make triggering_frequency opt in _TFRecordSink , _create_avro_sink, _…
razvanculea Apr 30, 2025
1bd138c
fix _TFRecordSink making triggering_frequency opt
razvanculea Apr 30, 2025
491a285
fix filenaming format for windows (no ":")
razvanculea Apr 30, 2025
f195060
Merge branch 'master' into iobase
razvanculea May 1, 2025
9ca2f66
add iobase_it_test from PR 34814
razvanculea May 5, 2025
403bb72
Add ParquetIO streaming write IT
razvanculea May 5, 2025
41a1b07
formatting fix
razvanculea May 5, 2025
e685e16
yapf recos
razvanculea May 5, 2025
e28cf60
improve support for user windowing that can be overidden by trigger_f…
razvanculea May 7, 2025
70d4c24
fix unittest needing either user window or triggering_frequency
razvanculea May 7, 2025
b1a5952
Merge branch 'master' into iobase
razvanculea May 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@
## I/Os

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Upgraded GoogleAdsAPI to v19 for GoogleAdsIO (Java) ([#34497](https://github.com/apache/beam/pull/34497)). Changed PTransform method from version-specified (`v17()`) to `current()` for better backward compatibility in the future.
* Added support for writing to Pubsub with ordering keys (Java) ([#21162](https://github.com/apache/beam/issues/21162))
* Support for streaming writes for AvroIO, ParquetIO, TextIO, TFRecordIO
* IOBase.Sink finalize_write has a new optional parameter w for the window

## New Features / Improvements

Expand Down
3 changes: 2 additions & 1 deletion sdks/python/apache_beam/examples/snippets/snippets.py
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,8 @@ def open_writer(self, access_token, uid):
def pre_finalize(self, init_result, writer_results):
pass

def finalize_write(self, access_token, table_names, pre_finalize_result):
def finalize_write(
self, access_token, table_names, pre_finalize_result, unused_window):
for i, table_name in enumerate(table_names):
self._simplekv.rename_table(
access_token, table_name, self._final_table_name + str(i))
Expand Down
16 changes: 16 additions & 0 deletions sdks/python/apache_beam/examples/unbounded_sinks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
133 changes: 133 additions & 0 deletions sdks/python/apache_beam/examples/unbounded_sinks/generate_event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from datetime import datetime

import pytz

import apache_beam as beam
from apache_beam.testing.test_stream import TestStream


class GenerateEvent(beam.PTransform):
@staticmethod
def sample_data():
return GenerateEvent()

def expand(self, input):
elemlist = [{'age': 10}, {'age': 20}, {'age': 30}]
elem = elemlist
return (
input
| TestStream().add_elements(
elements=elem,
event_timestamp=datetime(
2021, 3, 1, 0, 0, 1, 0,
tzinfo=pytz.UTC).timestamp()).add_elements(
elements=elem,
event_timestamp=datetime(
2021, 3, 1, 0, 0, 2, 0,
tzinfo=pytz.UTC).timestamp()).add_elements(
elements=elem,
event_timestamp=datetime(
2021, 3, 1, 0, 0, 3, 0,
tzinfo=pytz.UTC).timestamp()).add_elements(
elements=elem,
event_timestamp=datetime(
2021, 3, 1, 0, 0, 4, 0,
tzinfo=pytz.UTC).timestamp()).
advance_watermark_to(
datetime(2021, 3, 1, 0, 0, 5, 0,
tzinfo=pytz.UTC).timestamp()).add_elements(
elements=elem,
event_timestamp=datetime(
2021, 3, 1, 0, 0, 5, 0,
tzinfo=pytz.UTC).timestamp()).
add_elements(
elements=elem,
event_timestamp=datetime(
2021, 3, 1, 0, 0, 6,
0, tzinfo=pytz.UTC).timestamp()).add_elements(
elements=elem,
event_timestamp=datetime(
2021, 3, 1, 0, 0, 7, 0,
tzinfo=pytz.UTC).timestamp()).add_elements(
elements=elem,
event_timestamp=datetime(
2021, 3, 1, 0, 0, 8, 0,
tzinfo=pytz.UTC).timestamp()).add_elements(
elements=elem,
event_timestamp=datetime(
2021, 3, 1, 0, 0, 9, 0,
tzinfo=pytz.UTC).timestamp()).
advance_watermark_to(
datetime(2021, 3, 1, 0, 0, 10, 0,
tzinfo=pytz.UTC).timestamp()).add_elements(
elements=elem,
event_timestamp=datetime(
2021, 3, 1, 0, 0, 10, 0,
tzinfo=pytz.UTC).timestamp()).add_elements(
elements=elem,
event_timestamp=datetime(
2021, 3, 1, 0, 0, 11, 0,
tzinfo=pytz.UTC).timestamp()).
add_elements(
elements=elem,
event_timestamp=datetime(
2021, 3, 1, 0, 0, 12, 0,
tzinfo=pytz.UTC).timestamp()).add_elements(
elements=elem,
event_timestamp=datetime(
2021, 3, 1, 0, 0, 13, 0,
tzinfo=pytz.UTC).timestamp()).add_elements(
elements=elem,
event_timestamp=datetime(
2021, 3, 1, 0, 0, 14, 0,
tzinfo=pytz.UTC).timestamp()).
advance_watermark_to(
datetime(2021, 3, 1, 0, 0, 15, 0,
tzinfo=pytz.UTC).timestamp()).add_elements(
elements=elem,
event_timestamp=datetime(
2021, 3, 1, 0, 0, 15, 0,
tzinfo=pytz.UTC).timestamp()).add_elements(
elements=elem,
event_timestamp=datetime(
2021, 3, 1, 0, 0, 16, 0,
tzinfo=pytz.UTC).timestamp()).
add_elements(
elements=elem,
event_timestamp=datetime(
2021, 3, 1, 0, 0, 17, 0,
tzinfo=pytz.UTC).timestamp()).add_elements(
elements=elem,
event_timestamp=datetime(
2021, 3, 1, 0, 0, 18, 0,
tzinfo=pytz.UTC).timestamp()).add_elements(
elements=elem,
event_timestamp=datetime(
2021, 3, 1, 0, 0, 19, 0,
tzinfo=pytz.UTC).timestamp()).
advance_watermark_to(
datetime(2021, 3, 1, 0, 0, 20, 0,
tzinfo=pytz.UTC).timestamp()).add_elements(
elements=elem,
event_timestamp=datetime(
2021, 3, 1, 0, 0, 20, 0,
tzinfo=pytz.UTC).timestamp()).advance_watermark_to(
datetime(
2021, 3, 1, 0, 0, 25, 0, tzinfo=pytz.UTC).
timestamp()).advance_watermark_to_infinity())
177 changes: 177 additions & 0 deletions sdks/python/apache_beam/examples/unbounded_sinks/test_write.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

# python -m apache_beam.examples.unbounded_sinks.test_write
# This file contains multiple examples of writing unbounded PCollection to files

import argparse
import json
import logging

import pyarrow

import apache_beam as beam
from apache_beam.examples.unbounded_sinks.generate_event import GenerateEvent
from apache_beam.io.fileio import WriteToFiles
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.runners.runner import PipelineResult
from apache_beam.transforms.trigger import AccumulationMode
from apache_beam.transforms.trigger import AfterWatermark
from apache_beam.transforms.util import LogElements
from apache_beam.transforms.window import FixedWindows
from apache_beam.utils.timestamp import Duration


class CountEvents(beam.PTransform):
def expand(self, events):
return (
events
| beam.WindowInto(
FixedWindows(5),
trigger=AfterWatermark(),
accumulation_mode=AccumulationMode.DISCARDING,
allowed_lateness=Duration(seconds=0))
| beam.CombineGlobally(
beam.combiners.CountCombineFn()).without_defaults())


def run(argv=None, save_main_session=True) -> PipelineResult:
"""Main entry point; defines and runs the wordcount pipeline."""
parser = argparse.ArgumentParser()
_, pipeline_args = parser.parse_known_args(argv)

# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session

p = beam.Pipeline(options=pipeline_options)

output = p | GenerateEvent.sample_data()

#TextIO
output2 = output | 'TextIO WriteToText' >> beam.io.WriteToText(
file_path_prefix="__output__/ouput_WriteToText",
file_name_suffix=".txt",
#shard_name_template='-V-SSSSS-of-NNNNN',
num_shards=2,
triggering_frequency=5,
)
_ = output2 | 'LogElements after WriteToText' >> LogElements(
prefix='after WriteToText ', with_window=True, level=logging.INFO)

#FileIO
_ = (
output
| 'FileIO window' >> beam.WindowInto(
FixedWindows(5),
trigger=AfterWatermark(),
accumulation_mode=AccumulationMode.DISCARDING,
allowed_lateness=Duration(seconds=0))
| 'Serialize' >> beam.Map(json.dumps)
| 'FileIO WriteToFiles' >>
WriteToFiles(path="__output__/output_WriteToFiles"))

#ParquetIO
pyschema = pyarrow.schema([('age', pyarrow.int64())])

output4a = output | 'WriteToParquet' >> beam.io.WriteToParquet(
file_path_prefix="__output__/output_parquet",
#shard_name_template='-V-SSSSS-of-NNNNN',
file_name_suffix=".parquet",
num_shards=2,
triggering_frequency=5,
schema=pyschema)
_ = output4a | 'LogElements after WriteToParquet' >> LogElements(
prefix='after WriteToParquet 4a ', with_window=True, level=logging.INFO)

output4aw = (
output
| 'ParquetIO window' >> beam.WindowInto(
FixedWindows(20),
trigger=AfterWatermark(),
accumulation_mode=AccumulationMode.DISCARDING,
allowed_lateness=Duration(seconds=0))
| 'WriteToParquet windowed' >> beam.io.WriteToParquet(
file_path_prefix="__output__/output_parquet",
shard_name_template='-W-SSSSS-of-NNNNN',
file_name_suffix=".parquet",
num_shards=2,
schema=pyschema))
_ = output4aw | 'LogElements after WriteToParquet windowed' >> LogElements(
prefix='after WriteToParquet 4aw ', with_window=True, level=logging.INFO)

output4b = (
output
| 'To PyArrow Table' >>
beam.Map(lambda x: pyarrow.Table.from_pylist([x], schema=pyschema))
| 'WriteToParquetBatched to parquet' >> beam.io.WriteToParquetBatched(
file_path_prefix="__output__/output_parquet_batched",
shard_name_template='-V-SSSSS-of-NNNNN',
file_name_suffix=".parquet",
num_shards=2,
triggering_frequency=5,
schema=pyschema))
_ = output4b | 'LogElements after WriteToParquetBatched' >> LogElements(
prefix='after WriteToParquetBatched 4b ',
with_window=True,
level=logging.INFO)

#AvroIO
avroschema = {
'name': 'dummy', # your supposed to be file name with .avro extension
'type': 'record', # type of avro serilazation, there are more (see above
# docs) but as per me this will do most of the time
'fields': [ # this defines actual keys & their types
{'name': 'age', 'type': 'int'},
],
}
output5 = output | 'WriteToAvro' >> beam.io.WriteToAvro(
file_path_prefix="__output__/output_avro",
#shard_name_template='-V-SSSSS-of-NNNNN',
file_name_suffix=".avro",
num_shards=2,
#triggering_frequency=5,
schema=avroschema)
_ = output5 | 'LogElements after WriteToAvro' >> LogElements(
prefix='after WriteToAvro 5 ', with_window=True, level=logging.INFO)

#TFrecordIO
output6 = (
output
| "encode" >> beam.Map(lambda s: json.dumps(s).encode('utf-8'))
| 'WriteToTFRecord' >> beam.io.WriteToTFRecord(
file_path_prefix="__output__/output_tfrecord",
#shard_name_template='-V-SSSSS-of-NNNNN',
file_name_suffix=".tfrecord",
num_shards=2,
triggering_frequency=5))
_ = output6 | 'LogElements after WriteToTFRecord' >> LogElements(
prefix='after WriteToTFRecord 6 ', with_window=True, level=logging.INFO)

# Execute the pipeline and return the result.
result = p.run()
result.wait_until_finish()
return result


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Loading
Loading