Skip to content
This repository was archived by the owner on Aug 20, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,7 @@ env/*

#Editor
.idea/*
.vscode/*
.vscode/*
venv/*

tfx_addons.egg-info/
33 changes: 18 additions & 15 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,19 @@


def get_project_version():
# Version
extracted_version = {}
base_dir = os.path.dirname(os.path.abspath(__file__))
with open(os.path.join(base_dir, "tfx_addons", "version.py")) as fp:
exec(fp.read(), extracted_version) # pylint: disable=exec-used
# Version
extracted_version = {}
base_dir = os.path.dirname(os.path.abspath(__file__))
with open(os.path.join(base_dir, "tfx_addons", "version.py")) as fp:
exec(fp.read(), extracted_version) # pylint: disable=exec-used

return extracted_version
return extracted_version


def get_long_description():
base_dir = os.path.dirname(os.path.abspath(__file__))
with open(os.path.join(base_dir, "README.md")) as fp:
return fp.read()
base_dir = os.path.dirname(os.path.abspath(__file__))
with open(os.path.join(base_dir, "README.md")) as fp:
return fp.read()


version = get_project_version()
Expand All @@ -53,7 +53,7 @@ def get_long_description():
PKG_REQUIRES = {
# Add dependencies here for your project. Avoid using install_requires.
"mlmd_client":
[required_ml_pipelines_sdk_version, required_ml_metadata_version],
[required_ml_pipelines_sdk_version, required_ml_metadata_version],
"schema_curation": [
required_tfx_version,
],
Expand All @@ -66,7 +66,9 @@ def get_long_description():
"kfp>=1.8,<1.9",
"slackclient>=2.9.0",
"pydantic>=1.8.0",

],
"components":["tfx==1.7.1"]
}
EXTRAS_REQUIRE = PKG_REQUIRES.copy()
EXTRAS_REQUIRE["all"] = list(
Expand All @@ -88,11 +90,12 @@ def get_long_description():
},
extras_require=EXTRAS_REQUIRE,
tests_require=TESTS_REQUIRE,
packages=find_namespace_packages(include=[
# Add here new library package
"tfx_addons",
] + [f"tfx_addons.{m}.*"
for m in PKG_REQUIRES] + [f"tfx_addons.{m}" for m in PKG_REQUIRES]),
install_requires = ['tfx==1.6.1', 'apache-airflow'],
packages=find_namespace_packages(include=["tfx==1.7.1"
# Add here new library package
"tfx_addons",
] + [f"tfx_addons.{m}.*"
for m in PKG_REQUIRES] + [f"tfx_addons.{m}" for m in PKG_REQUIRES]),
classifiers=[
"Intended Audience :: Developers",
"Intended Audience :: Education",
Expand Down
50 changes: 50 additions & 0 deletions tfx_addons/components_addons/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
TFX RemoteZipCsvExampleGen component.

The RemoteZipCsvExampleGen component takes zipped csv file from https or http url and generates train
and eval examples for downstream components.

The RemoteZipCsvExampleGen encodes column values to tf.Example int/float/byte feature.
For the case when there's missing cells, the RemoteZipCsvExampleGen uses:
-- tf.train.Feature(`type`_list=tf.train.`type`List(value=[])), when the
`type` can be inferred.
-- tf.train.Feature() when it cannot infer the `type` from the column.

Note that the type inferring will be per input split. If input isn't a single
split, users need to ensure the column types align in each pre-splits.

For example, given the following csv rows of a split:

header:A,B,C,D
row1: 1,,x,0.1
row2: 2,,y,0.2
row3: 3,,,0.3
row4:

The output example will be
example1: 1(int), empty feature(no type), x(string), 0.1(float)
example2: 2(int), empty feature(no type), x(string), 0.2(float)
example3: 3(int), empty feature(no type), empty list(string), 0.3(float)

Note that the empty feature is `tf.train.Feature()` while empty list string
feature is `tf.train.Feature(bytes_list=tf.train.BytesList(value=[]))`.

Component `outputs` contains:
- `examples`: Channel of type `standard_artifacts.Examples` for output train
and eval examples.

Sample example is given below for implementation
```commandline
from tfx_addons.components_addons.components import RemoteZipCsvExampleGen
import os

# temp location to perform downloading and extraction
INPUT_BASE = os.getcwd()
# file url to download all the file
URL = "https://files.consumerfinance.gov/ccdb/complaints.csv.zip"

remote_zip_csv_example_gen = RemoteZipCsvExampleGen(
input_base=INPUT_BASE,
zip_file_uri=URL
)

```
2 changes: 2 additions & 0 deletions tfx_addons/components_addons/components/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from tfx_addons.components_addons.components.example_gen.remote_zip_example_gen.component import RemoteZipCsvExampleGen

102 changes: 102 additions & 0 deletions tfx_addons/components_addons/components/example_gen/component.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
from typing import Optional, Union

from tfx import types
from tfx.components.example_gen import driver
from tfx.components.example_gen import utils
from tfx.dsl.components.base import base_beam_component
from tfx.dsl.components.base import base_beam_executor
from tfx.dsl.components.base import executor_spec
from tfx.orchestration import data_types
from tfx.proto import example_gen_pb2
from tfx.proto import range_config_pb2
from tfx.types import standard_artifacts
from census_consumer_complaint_types.types import RemoteZipFileBasedExampleGenSpec


class RemoteZipFileBasedExampleGen(base_beam_component.BaseBeamComponent):
"""A TFX component to ingest examples from a file system.

The RemoteZipFileBasedExampleGen component is an API for getting zip
file-based available at HTTP urlrecords into TFX pipelines. It consumes
external files to generate examples which will
be used by other internal components like StatisticsGen or Trainers. The
component will also convert the input data into
[tf.record](https://www.tensorflow.org/tutorials/load_data/tf_records)
and generate train and eval example splits for downstream components.

## Example
```
_taxi_root = os.path.join(os.environ['HOME'], 'taxi')
_data_root = os.path.join(_taxi_root, 'data', 'simple')
_zip_uri = "https://xyz//abz.csv.zip"
# Brings data into the pipeline or otherwise joins/converts training data.
example_gen = RemoteZipFileBasedExample(input_base=_data_root,zip_file_uri="")
```

Component `outputs` contains:
- `examples`: Channel of type `standard_artifacts.Examples` for output train
and eval examples.
"""

SPEC_CLASS = RemoteZipFileBasedExampleGenSpec

# EXECUTOR_SPEC should be overridden by subclasses.
EXECUTOR_SPEC = executor_spec.BeamExecutorSpec(
base_beam_executor.BaseBeamExecutor)
DRIVER_CLASS = driver.FileBasedDriver

def __init__(
self,
input_base: Optional[str] = None,
zip_file_uri: Optional[str] = None,
input_config: Optional[Union[example_gen_pb2.Input,
data_types.RuntimeParameter]] = None,
output_config: Optional[Union[example_gen_pb2.Output,
data_types.RuntimeParameter]] = None,
custom_config: Optional[Union[example_gen_pb2.CustomConfig,
data_types.RuntimeParameter]] = None,
range_config: Optional[Union[range_config_pb2.RangeConfig,
data_types.RuntimeParameter]] = None,
output_data_format: Optional[int] = example_gen_pb2.FORMAT_TF_EXAMPLE,
output_file_format: Optional[int] = example_gen_pb2.FORMAT_TFRECORDS_GZIP,
custom_executor_spec: Optional[executor_spec.ExecutorSpec] = None):
"""Construct a FileBasedExampleGen component.

Args:
input_base: an extract directory containing the CSV files after extraction of downloaded zip file.
zip_file_uri: Remote Zip file uri to download compressed zip csv file
input_config: An
[`example_gen_pb2.Input`](https://github.com/tensorflow/tfx/blob/master/tfx/proto/example_gen.proto)
instance, providing input configuration. If unset, input files will be
treated as a single split.
output_config: An example_gen_pb2.Output instance, providing the output
configuration. If unset, default splits will be 'train' and
'eval' with size 2:1.
custom_config: An optional example_gen_pb2.CustomConfig instance,
providing custom configuration for executor.
range_config: An optional range_config_pb2.RangeConfig instance,
specifying the range of span values to consider. If unset, driver will
default to searching for latest span with no restrictions.
output_data_format: Payload format of generated data in output artifact,
one of example_gen_pb2.PayloadFormat enum.
output_file_format: File format of generated data in output artifact,
one of example_gen_pb2.FileFormat enum.
custom_executor_spec: Optional custom executor spec overriding the default
executor spec specified in the component attribute.
"""
# Configure inputs and outputs.
input_config = input_config or utils.make_default_input_config()
output_config = output_config or utils.make_default_output_config(
input_config)
example_artifacts = types.Channel(type=standard_artifacts.Examples)
spec = RemoteZipFileBasedExampleGenSpec(
input_base=input_base,
zip_file_uri=zip_file_uri,
input_config=input_config,
output_config=output_config,
custom_config=custom_config,
range_config=range_config,
output_data_format=output_data_format,
output_file_format=output_file_format,
examples=example_artifacts)
super().__init__(spec=spec, custom_executor_spec=custom_executor_spec)
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# Copyright 2019 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""TFX RemoteZipCsvExampleGen component definition."""

from typing import Optional, Union

from census_consumer_complaint_custom_component.example_gen.remote_zip_csv_example_gen import executor
from census_consumer_complaint_custom_component.component import RemoteZipFileBasedExampleGen
from tfx.dsl.components.base import executor_spec
from tfx.orchestration import data_types
from tfx.proto import example_gen_pb2
from tfx.proto import range_config_pb2


class RemoteZipCsvExampleGen(RemoteZipFileBasedExampleGen): # pylint: disable=protected-access
"""Official TFX RemoteZipCsvExampleGen component.

The remotezipcsv examplegen component takes zip file url of zip compressed csv data, and generates train
and eval examples for downstream components.

The remotezipcsv examplegen encodes column values to tf.Example int/float/byte feature.
For the case when there's missing cells, the csv examplegen uses:
-- tf.train.Feature(`type`_list=tf.train.`type`List(value=[])), when the
`type` can be inferred.
-- tf.train.Feature() when it cannot infer the `type` from the column.

Note that the type inferring will be per input split. If input isn't a single
split, users need to ensure the column types align in each pre-splits.

For example, given the following csv rows of a split:

header:A,B,C,D
row1: 1,,x,0.1
row2: 2,,y,0.2
row3: 3,,,0.3
row4:

The output example will be
example1: 1(int), empty feature(no type), x(string), 0.1(float)
example2: 2(int), empty feature(no type), x(string), 0.2(float)
example3: 3(int), empty feature(no type), empty list(string), 0.3(float)

Note that the empty feature is `tf.train.Feature()` while empty list string
feature is `tf.train.Feature(bytes_list=tf.train.BytesList(value=[]))`.

Component `outputs` contains:
- `examples`: Channel of type `standard_artifacts.Examples` for output train
and eval examples.
"""

EXECUTOR_SPEC = executor_spec.BeamExecutorSpec(executor.Executor)

def __init__(
self,
input_base: Optional[str] = None,
zip_file_uri: Optional[str] = None,
input_config: Optional[Union[example_gen_pb2.Input,
data_types.RuntimeParameter]] = None,
output_config: Optional[Union[example_gen_pb2.Output,
data_types.RuntimeParameter]] = None,
range_config: Optional[Union[range_config_pb2.RangeConfig,
data_types.RuntimeParameter]] = None):
"""Construct a RemoteZipCsvExampleGen component.

Args:
input_base: an extract directory containing the CSV files after extraction of downloaded zip file.
zip_file_uri: Remote Zip file uri to download compressed zip csv file
input_config: An example_gen_pb2.Input instance, providing input
configuration. If unset, the files under input_base will be treated as a
single split.
output_config: An example_gen_pb2.Output instance, providing output
configuration. If unset, default splits will be 'train' and 'eval' with
size 2:1.
range_config: An optional range_config_pb2.RangeConfig instance,
specifying the range of span values to consider. If unset, driver will
default to searching for latest span with no restrictions.
"""
super().__init__(
input_base=input_base,
zip_file_uri=zip_file_uri,
input_config=input_config,
output_config=output_config,
range_config=range_config)
Loading