Replies: 2 comments
-
Not sure if this could be related, but I also get lots of these errors:
WITH RUST_BACKTRACE=1:
WITH RUST_BACKTRACE=full:
Also these datafusion warnings too:
|
Beta Was this translation helpful? Give feedback.
-
I created a fully reproducible example, and found a way to make the merge skip files. There is a comment pointing out what like solved it, for anyone to test with or without it. Look for : # !pip install 'polars[pyarrow]' deltalake
import polars as pl
import deltalake
partitions = ["tennant"]
primary_key = "id"
def read_delta_table():
try:
return deltalake.DeltaTable("./delta")
except:
return "./delta"
def write(df):
predicate = " AND ".join(
[f"s.{primary_key} = t.{primary_key}"] # base predicate + for each partition
+ [f"s.{partition} = t.{partition}" for partition in partitions]
)
# read current tennant
tennant = df[partitions[0]][0]
# THIS LINE MAKES THE SKIPPER WORK
predicate = predicate + f" AND t.{partitions[0]} = '{tennant}' "
# no effect
# predicate = predicate + f" AND s.{partitions[0]} = '{tennant}' "
#
try:
dt = deltalake.DeltaTable("./data/delta")
metrics = (
df.write_delta(
dt,
mode="merge",
delta_write_options={
# "writer_properties": deltalake.WriterProperties(compression=compression.upper()),
"partition_by": partitions,
"schema_mode": "overwrite",
},
delta_merge_options={
"predicate": predicate,
"source_alias": "s",
"target_alias": "t",
},
)
.when_matched_update_all()
.when_not_matched_insert_all()
.execute()
)
except deltalake.exceptions.TableNotFoundError:
dt = "./data/delta"
metrics = df.write_delta(
dt,
mode="overwrite",
delta_write_options={
# "writer_properties": deltalake.WriterProperties(compression=compression.upper()),
"partition_by": partitions,
"schema_mode": "overwrite",
},
)
if metrics:
print(f"source_size: {len(df)}")
print(f"Partitions: {partitions}")
print(f"Predicate: {predicate}")
print(f"Files scanned: {metrics.get('num_target_files_scanned')}")
print(f"Files skipped: {metrics.get('num_target_files_skipped_during_scan')}")
print(f"Execution time: {metrics.get('execution_time_ms')} ms")
else:
print("metrics not generated")
def add_partition(df: pl.DataFrame, tennant: str) -> pl.DataFrame:
print(f"Writing tennant {tennant}")
return df.with_columns(pl.lit(tennant).alias(partitions[0]))
df = pl.DataFrame(
{
"id": [1, 2, 3, 4, 5, 6, 7],
"dt": pl.Series(
[
"2019-09-01",
"2019-09-02",
"2019-09-03",
"2019-09-04",
"2019-09-05",
"2019-09-06",
"2019-09-07",
],
).str.to_date(),
"groups": [[], ["a"], ["b"], ["a", "b"], ["c"], ["d", "e"], []],
}
)
write(add_partition(df, "tennant1"))
write(add_partition(df, "tennant2"))
# write_deltalake("./data/delta", df)
# Load data from the delta table
dt = deltalake.DeltaTable("./data/delta")
df = pl.read_delta(dt)
print(df)
df = pl.DataFrame(
{
"id": [6, 7, 8, 9, 10, 11, 12],
"dt": pl.Series(
[
"2029-09-01",
"2029-09-02",
"2029-09-03",
"2029-09-04",
"2029-09-05",
"2029-09-06",
"2029-09-07",
],
).str.to_date(),
"groups": [[], ["a"], ["b"], ["a", "b"], ["c"], ["d", "e"], []],
}
)
write(add_partition(df, "tennant1"))
write(add_partition(df, "tennant2"))
df = pl.read_delta(dt).sort("id")
pl.Config.set_tbl_rows(-1)
print(df) |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Hi.
I am facing a very weird behavior when running upserts: my entire base is being loaded into memory, even with two partitions in place.
I have a "multi tenant" database, that I extract one tenant at a time.
In one dataframe, there is only data related to one
tennant
. So I expected the upsert to only load data for this tennant.I looked into optimizing-merge-performance, but something is off for me.
The structure is equivalent to:
My source code and output metrics look like this:
"polars~=1.32",
"deltalake>=1.1.4",
Beta Was this translation helpful? Give feedback.
All reactions