Releases: MTSWebServices/onetl
0.12.3 (2024-11-22)
Bug Fixes
- Allow passing table names in format
schema."table.with.dots"toDBReader(source=...)andDBWriter(target=...).
0.12.2 (2024-11-12)
Improvements
- Change Spark
jobDescriptionfor DBReader & FileDFReader fromDBReader.run() -> ConnectiontoConnection -> DBReader.run().
Bug Fixes
- Fix
log_hwmoutput forKeyValueIntHWM(used by Kafka). (#316) - Fix
log_collectionhiding values in logs withINFOlevel. (#316)
Dependencies
- Allow using etl-entities==2.4.0.
Doc only Changes
- Fix links to MSSQL date & time type documentation.
0.12.1 (2024-10-28)
Features
- Log detected JDBC dialect while using
DBWriter.
Bug Fixes
- Fix
SparkMetricsRecorderfailing when receivingSparkListenerTaskEndwithouttaskMetrics(e.g. executor was killed by OOM). (#313) - Call
kinitbefore checking for HDFS active namenode. - Wrap
kinitwiththreading.Lockto avoid multithreading issues. - Immediately show
kiniterrors to user, instead of hiding them. - Use
AttributeErrorinstead ofImportErrorin module's__getattr__method, to make code compliant with Python spec.
Doc only Changes
- Add note about spark-dialect-extension package to Clickhouse connector documentation. (#310)
0.12.0 (2024-09-03)
Breaking Changes
-
Change connection URL used for generating HWM names of S3 and Samba sources:
smb://host:port->smb://host:port/shares3://host:port->s3://host:port/bucket(#304)
-
Update DB connectors/drivers to latest versions:
- Clickhouse
0.6.0-patch5→0.6.5 - MongoDB
10.3.0→10.4.0 - MSSQL
12.6.2→12.8.1 - MySQL
8.4.0→9.0.0 - Oracle
23.4.0.24.05→23.5.0.24.07 - Postgres
42.7.3→42.7.4
- Clickhouse
-
Update
Excelpackage from0.20.3to0.20.4, to include Spark 3.5.1 support. (#306)
Features
-
Add support for specifying file formats (
ORC,Parquet,CSV, etc.) inHiveWriteOptions.format(#292):Hive.WriteOptions(format=ORC(compression="snappy"))
-
Collect Spark execution metrics in following methods, and log then in DEBUG mode:
DBWriter.run()FileDFWriter.run()Hive.sql()Hive.execute()
This is implemented using custom
SparkListenerwhich wraps the entire method call, and then report collected metrics. But these metrics sometimes may be missing due to Spark architecture, so they are not reliable source of information. That's why logs are printed only in DEBUG mode, and are not returned as method call result. (#303) -
Generate default
jobDescriptionbased on currently executed method. Examples:DBWriter.run(schema.table) -> Postgres[host:5432/database]MongoDB[localhost:27017/admin] -> DBReader.has_data(mycollection)Hive[cluster].execute()
If user already set custom
jobDescription, it will left intact. (#304) -
Add log.info about JDBC dialect usage (#305):
|MySQL| Detected dialect: 'org.apache.spark.sql.jdbc.MySQLDialect' -
Log estimated size of in-memory dataframe created by
JDBC.fetchandJDBC.executemethods. (#303)
Bug Fixes
- Fix passing
Greenplum(extra={"options": ...})during read/write operations. (#308) - Do not raise exception if yield-based hook whas something past (and only one)
yield.
0.11.2 (2024-09-02)
Bug Fixes
- Fix passing
Greenplum(extra={"options": ...})during read/write operations. (#308)
0.11.1 (2024-05-29)
0.11.0 (2024-05-27)
Breaking Changes
There can be some changes in connection behavior, related to version upgrades. So we mark these changes as breaking although most of users will not see any differences.
-
Update Clickhouse JDBC driver to latest version (#249):
- Package was renamed
ru.yandex.clickhouse:clickhouse-jdbc→com.clickhouse:clickhouse-jdbc. - Package version changed
0.3.2→0.6.0-patch5. - Driver name changed
ru.yandex.clickhouse.ClickHouseDriver→com.clickhouse.jdbc.ClickHouseDriver.
This brings up several fixes for Spark <-> Clickhouse type compatibility, and also Clickhouse clusters support.
- Package was renamed
Warning
New JDBC driver has a more strict behavior regarding types:
- Old JDBC driver applied
max(1970-01-01T00:00:00, value)for Timestamp values, as this is a minimal supported value ofDateTime32Clickhouse type. New JDBC driver doesn't. - Old JDBC driver rounded values with higher precision than target column during write. New JDBC driver doesn't.
- Old JDBC driver replaced NULLs as input for non-Nullable columns with column's DEFAULT value. New JDBC driver doesn't. To enable previous behavior, pass
Clickhouse(extra={"nullsAsDefault": 2})(see documentation).
-
Update other JDBC drivers to latest versions:
-
Update MongoDB connector to latest version:
10.1.1→10.3.0(#255, #283).This brings up Spark 3.5 support.
-
Update
XMLpackage to latest version:0.17.0→0.18.0(#259).This brings few bugfixes with datetime format handling.
-
For JDBC connections add new
SQLOptionsclass forDB.sql(query, options=...)method (#272).Firsly, to keep naming more consistent.
Secondly, some of options are not supported by
DB.sql(...)method, but supported byDBReader. For example,SQLOptionsdo not supportpartitioning_modeand require explicit definition oflower_boundandupper_boundwhennum_partitionsis greater than 1.ReadOptionsdoes supportpartitioning_modeand allows skippinglower_boundandupper_boundvalues.This require some code changes. Before:
from onetl.connection import Postgres postgres = Postgres(...) df = postgres.sql( """ SELECT * FROM some.mytable WHERE key = 'something' """, options=Postgres.ReadOptions( partitioning_mode="range", partition_column="id", num_partitions=10, ), )
After:
from onetl.connection import Postgres postgres = Postgres(...) df = postgres.sql( """ SELECT * FROM some.mytable WHERE key = 'something' """, options=Postgres.SQLOptions( # partitioning_mode is not supported! partition_column="id", num_partitions=10, lower_bound=0, # <-- set explicitly upper_bound=1000, # <-- set explicitly ), )
For now,
DB.sql(query, options=...)can acceptReadOptionsto keep backward compatibility, but emits deprecation warning. The support will be removed inv1.0.0. -
Split up
JDBCOptionsclass intoFetchOptionsandExecuteOptions(#274).New classes are used by
DB.fetch(query, options=...)andDB.execute(query, options=...)methods respectively. This is mostly to keep naming more consistent.This require some code changes. Before:
from onetl.connection import Postgres postgres = Postgres(...) df = postgres.fetch( "SELECT * FROM some.mytable WHERE key = 'something'", options=Postgres.JDBCOptions( fetchsize=1000, query_timeout=30, ), ) postgres.execute( "UPDATE some.mytable SET value = 'new' WHERE key = 'something'", options=Postgres.JDBCOptions(query_timeout=30), )
After:
from onetl.connection import Postgres # Using FetchOptions for fetching data postgres = Postgres(...) df = postgres.fetch( "SELECT * FROM some.mytable WHERE key = 'something'", options=Postgres.FetchOptions( # <-- change class name fetchsize=1000, query_timeout=30, ), ) # Using ExecuteOptions for executing statements postgres.execute( "UPDATE some.mytable SET value = 'new' WHERE key = 'something'", options=Postgres.ExecuteOptions(query_timeout=30), # <-- change class name )
For now,
DB.fetch(query, options=...)andDB.execute(query, options=...)can acceptJDBCOptions, to keep backward compatibility, but emit a deprecation warning. The old class will be removed inv1.0.0. -
Serialize
ColumnDatetimeHWMto Clickhouse'sDateTime64(6)(precision up to microseconds) instead ofDateTime(precision up to seconds) (#267).In previous onETL versions,
ColumnDatetimeHWMvalue was rounded to the second, and thus reading some rows that were read in previous runs, producing duplicates.For Clickhouse versions below 21.1 comparing column of type
DateTimewith a value of typeDateTime64is not supported, returning an empty dataframe. To avoid this, replace:DBReader( ..., hwm=DBReader.AutoDetectHWM( name="my_hwm", expression="hwm_column", # <-- ), )
with:
DBReader( ..., hwm=DBReader.AutoDetectHWM( name="my_hwm", expression="CAST(hwm_column AS DateTime64)", # <-- add explicit CAST ), )
-
Pass JDBC connection extra params as
propertiesdict instead of URL with query part (#268).This allows passing custom connection parameters like
Clickhouse(extra={"custom_http_options": "option1=value1,option2=value2"})without need to apply urlencode to parameter value, likeoption1%3Dvalue1%2Coption2%3Dvalue2.
Features
Improve user experience with Kafka messages and Database tables with serialized columns, like JSON/XML.
-
Allow passing custom package version as argument for
DB.get_packages(...)method of several DB connectors:Clickhouse.get_packages(package_version=..., apache_http_client_version=...)(#249).MongoDB.get_packages(scala_version=..., spark_version=..., package_version=...)(#255).MySQL.get_packages(package_version=...)(#253).MSSQL.get_packages(java_version=..., package_version=...)(#254).Oracle.get_packages(java_version=..., package_version=...)(#252).Postgres.get_packages(package_version=...)(#251).Teradata.get_packages(package_version=...)(#256).
Now users can downgrade or upgrade connection without waiting for next onETL release. Previously only
KafkaandGreenplumsupported this feature. -
Add
FileFormat.parse_column(...)method to several classes:Avro.parse_column(col)(#265).JSON.parse_column(col, schema=...)(#257).CSV.parse_column(col, schema=...)(#258).XML.parse_column(col, schema=...)(#269).
This allows parsing data in
valuefield of Kafka message or string/binary column of some table as a nested Spark structure. -
Add
FileFormat.serialize_column(...)method to several classes:Avro.serialize_column(col)(#265).JSON.serialize_column(col)(#257).CSV.serialize_column(col)(#258).
This allows saving Spark nested structures or arrays to
valuefield of Kafka message or string/binary column of some table.
Improvements
Few documentation improvements.
-
Replace all
assertin documentation with doctest syntax. This should make documentation more readable (#273). -
Add generic
Troubleshootingguide (#275). -
Improve Kafka documentation:
- Add "Prerequisites" page describing different aspects of connecting to Kafka.
- Improve "Reading from" and "Writing to" page of Kafka documentation, add more examples and usage notes.
- Add "Troubleshooting" page (#276).
-
Improve Hive documentation:
- Add "Prerequisites" page describing different aspects of connecting to Hive.
- Improve "Reading from" and "Writing to" page of Hive documentation, add more examples and recommendations.
- Improve "Executing statements in Hive" page of Hive documentation. (#278).
-
Add "Prerequisites" page describing different aspects of using SparkHDFS and SparkS3 connectors. (#279).
-
Add note about connecting to Clickhouse cluster. (#280).
-
Add notes about versions when specific class/method/attribute/argument was added, renamed or changed behavior (#282).
Bug Fixes
- Fix missing
pysmbpackage after installingpip install onetl[files].
0.10.2 (2024-03-21)
Features
- Add support of Pydantic v2. (#230)
Improvements
-
Improve database connections documentation:
- Add "Types" section describing mapping between Clickhouse and Spark types
- Add "Prerequisites" section describing different aspects of connecting to Clickhouse
- Separate documentation of
DBReaderand.sql()/.pipeline(...) - Add examples for
.fetch()and.execute()(#211, #228, #229, #233, #234, #235, #236, #240)
-
Add notes to Greenplum documentation about issues with IP resolution and building
gpfdistURL (#228) -
Allow calling
MongoDB.pipeline(...)with passing just collection name, without explicit aggregation pipeline. (#237) -
Update default
Postgres(extra={...})to include{"stringtype": "unspecified"}option. This allows to write text data to non-text column (or vice versa), relying to Postgres cast capabilities.For example, now it is possible to read column of type
moneyas Spark'sStringType(), and write it back to the same column, without using intermediate columns or tables. (#229)
Bug Fixes
-
Return back handling of
DBReader(columns="string"). This was a valid syntax up to v0.10 release, but it was removed because most of users neved used it. It looks that we were wrong, returning this behavior back, but with deprecation warning. (#238) -
Downgrade Greenplum package version from
2.3.0to2.2.0. (#239)This is because version 2.3.0 introduced issues with writing data to Greenplum 6.x. Connector can open transaction with
SELECT * FROM table LIMIT 0query, but does not close it, which leads to deadlocks.For using this connector with Greenplum 7.x, please pass package version explicitly:
maven_packages = Greenplum.get_packages(package_version="2.3.0", ...)
0.10.1 (2024-02-05)
Features
-
Add support of
Incremental StrategiesforKafkaconnection:reader = DBReader( connection=Kafka(...), source="topic_name", hwm=AutoDetectHWM(name="some_hwm_name", expression="offset"), ) with IncrementalStrategy(): df = reader.run()
This lets you resume reading data from a Kafka topic starting at the last committed offset from your previous run. (#202)
-
Add
has_data,raise_if_no_datamethods toDBReaderclass. (#203) -
Updade VMware Greenplum connector from
2.1.4to2.3.0. This implies:- Greenplum 7.x support
- Kubernetes support
- New read option gpdb.matchDistributionPolicy which allows to match each Spark executor with specific Greenplum segment, avoiding redundant data transfer between Greenplum segments
- Allows overriding Greenplum optimizer parameters in read/write operations (#208)
-
Greenplum.get_packages()method now accepts optional argpackage_versionwhich allows to override version of Greenplum connector package. (#208)
0.10.0 (2023-12-18)
Breaking Changes
-
Upgrade
etl-entitiesfrom v1 to v2 (#172).This implies that
HWMclasses are now have different internal structure than they used to.etl-entities < 2 etl-entities >= 2 from etl_entities.old_hwm import IntHWM as OldIntHWM from etl_entities.source import Column, Table from etl_entities.process import Process hwm = OldIntHWM( process=Process(name="myprocess", task="abc", dag="cde", host="myhost"), source=Table(name="schema.table", instance="postgres://host:5432/db"), column=Column(name="col1"), value=123, )
from etl_entities.hwm import ColumnIntHWM hwm = ColumnIntHWM( name="some_unique_name", description="any value you want", source="schema.table", expression="col1", value=123, )
Breaking change: If you used HWM classes from
etl_entitiesmodule, you should rewrite your code to make it compatible with new version.More details
HWMclasses used by previous onETL versions were moved frometl_entitiestoetl_entities.old_hwmsubmodule. They are here for compatibility reasons, but are planned to be removed inetl-entitiesv3 release.- New
HWMclasses have flat structure instead of nested. - New
HWMclasses have mandatorynameattribute (it was known asqualified_namebefore). - Type aliases used while serializing and deserializing
HWMobjects todictrepresentation were changed too:int->column_int.
To make migration simpler, you can use new method:
old_hwm = OldIntHWM(...) new_hwm = old_hwm.as_new_hwm()
Which automatically converts all fields from old structure to new one, including
qualified_name->name. -
Breaking changes:
- Methods
BaseHWMStore.get()andBaseHWMStore.save()were renamed toget_hwm()andset_hwm(). - They now can be used only with new HWM classes from
etl_entities.hwm, old HWM classes are not supported.
If you used them in your code, please update it accordingly.
- Methods
-
YAMLHWMStore CANNOT read files created by older onETL versions (0.9.x or older).
Upgrade procedure
# pip install onetl==0.9.5 # Get qualified_name for HWM # Option 1. HWM is built manually from etl_entities import IntHWM, FileListHWM from etl_entities.source import Column, Table, RemoteFolder from etl_entities.process import Process # for column HWM old_column_hwm = IntHWM( process=Process(name="myprocess", task="abc", dag="cde", host="myhost"), source=Table(name="schema.table", instance="postgres://host:5432/db"), column=Column(name="col1"), ) qualified_name = old_column_hwm.qualified_name # "col1#schema.table@postgres://host:5432/db#cde.abc.myprocess@myhost" # for file HWM old_file_hwm = FileListHWM( process=Process(name="myprocess", task="abc", dag="cde", host="myhost"), source=RemoteFolder(name="/absolute/path", instance="ftp://ftp.server:21"), ) qualified_name = old_file_hwm.qualified_name # "file_list#/absolute/path@ftp://ftp.server:21#cde.abc.myprocess@myhost" # Option 2. HWM is generated automatically (by DBReader/FileDownloader) # See onETL logs and search for string like qualified_name = '...' qualified_name = "col1#schema.table@postgres://host:5432/db#cde.abc.myprocess@myhost" # Get .yml file path by qualified_name import os from pathlib import PurePosixPath from onetl.hwm.store import YAMLHWMStore # here you should pass the same arguments as used on production, if any yaml_hwm_store = YAMLHWMStore() hwm_path = yaml_hwm_store.get_file_path(qualified_name) print(hwm_path) # for column HWM # LocalPosixPath('/home/maxim/.local/share/onETL/yml_hwm_store/col1__schema.table__postgres_host_5432_db__cde.abc.myprocess__myhost.yml') # for file HWM # LocalPosixPath('/home/maxim/.local/share/onETL/yml_hwm_store/file_list__absolute_path__ftp_ftp.server_21__cde.abc.myprocess__myhost.yml') # Read raw .yml file content from yaml import safe_load, dump raw_old_hwm_items = safe_load(hwm_path.read_text()) print(raw_old_hwm_items) # for column HWM # [ # { # "column": { "name": "col1", "partition": {} }, # "modified_time": "2023-12-18T10: 39: 47.377378", # "process": { "dag": "cde", "host": "myhost", "name": "myprocess", "task": "abc" }, # "source": { "instance": "postgres: //host:5432/db", "name": "schema.table" }, # "type": "int", # "value": "123", # }, # ] # for file HWM # [ # { # "modified_time": "2023-12-18T11:15:36.478462", # "process": { "dag": "cde", "host": "myhost", "name": "myprocess", "task": "abc" }, # "source": { "instance": "ftp://ftp.server:21", "name": "/absolute/path" }, # "type": "file_list", # "value": ["file1.txt", "file2.txt"], # }, # ] # Convert file content to new structure, compatible with onETL 0.10.x raw_new_hwm_items = [] for old_hwm in raw_old_hwm_items: new_hwm = {"name": qualified_name, "modified_time": old_hwm["modified_time"]} if "column" in old_hwm: new_hwm["expression"] = old_hwm["column"]["name"] new_hwm["entity"] = old_hwm["source"]["name"] old_hwm.pop("process", None) if old_hwm["type"] == "int": new_hwm["type"] = "column_int" new_hwm["value"] = old_hwm["value"] elif old_hwm["type"] == "date": new_hwm["type"] = "column_date" new_hwm["value"] = old_hwm["value"] elif old_hwm["type"] == "datetime": new_hwm["type"] = "column_datetime" new_hwm["value"] = old_hwm["value"] elif old_hwm["type"] == "file_list": new_hwm["type"] = "file_list" new_hwm["value"] = [ os.fspath(PurePosixPath(old_hwm["source"]["name"]).joinpath(path)) for path in old_hwm["value"] ] else: raise ValueError("WAT?") raw_new_hwm_items.append(new_hwm) print(raw_new_hwm_items) # for column HWM # [ # { # "name": "col1#schema.table@postgres://host:5432/db#cde.abc.myprocess@myhost", # "modified_time": "2023-12-18T10:39:47.377378", # "expression": "col1", # "source": "schema.table", # "type": "column_int", # "value": 123, # }, # ] # for file HWM # [ # { # "name": "file_list#/absolute/path@ftp://ftp.server:21#cde.abc.myprocess@myhost", # "modified_time": "2023-12-18T11:15:36.478462", # "entity": "/absolute/path", # "type": "file_list", # "value": ["/absolute/path/file1.txt", "/absolute/path/file2.txt"], # }, # ] # Save file with new content with open(hwm_path, "w") as file: dump(raw_new_hwm_items, file) # Stop Python interpreter and update onETL # pip install onetl==0.10.0 # Check that new .yml file can be read from onetl.hwm.store import YAMLHWMStore qualified_name = ... # here you should pass the same arguments as used on production, if any yaml_hwm_store = YAMLHWMStore() yaml_hwm_store.get_hwm(qualified_name) # for column HWM # ColumnIntHWM( # name='col1#schema.table@postgres://host:5432/db#cde.abc.myprocess@myhost', # description='', # entity='schema.table', # value=123, # expression='col1', # modified_time=datetime.datetime(2023, 12, 18, 10, 39, 47, 377378), # ) # for file HWM # FileListHWM( # name='file_list#/absolute/path@ftp://ftp.server:21#cde.abc.myprocess@myhost', # description='', # entity=AbsolutePath('/absolute/path'), # value=frozenset({AbsolutePath('/absolute/path/file1.txt'), AbsolutePath('/absolute/path/file2.txt')}), # expression=None, # modified_time=datetime.datetime(2023, 12, 18, 11, 15, 36, 478462) # ) # That's all!
But most of users use other HWM store implementations which do not have such issues.
-
Several classes and functions were moved from
onetltoetl_entities:onETL 0.9.xand olderonETL 0.10.xand newerfrom onetl.hwm.store import ( detect_hwm_store, BaseHWMStore, HWMStoreClassRegistry, register_hwm_store_class, HWMStoreManager, MemoryHWMStore, )
from etl_entities.hwm_store import ( detect_hwm_store, BaseHWMStore, HWMStoreClassRegistry, register_hwm_store_class, HWMStoreManager, MemoryHWMStore, )
They s...