Skip to content

Commit f4da19e

Browse files
authored
Add Avro compression (apache#1976)
# Rationale for this change PyIceberg did not compress the Avro. This will make gzip/deflate the same as in Java. # Are these changes tested? Existing round-trip tests with FastAvro and Spark. Some tests are extended to both write compressed and uncompressed data. # Are there any user-facing changes? Smaller and faster manifest files :) <!-- In the case of user-facing changes, please add the changelog label. -->
1 parent ea57cbb commit f4da19e

File tree

8 files changed

+167
-29
lines changed

8 files changed

+167
-29
lines changed

pyiceberg/avro/codecs/__init__.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,27 @@
2626

2727
from __future__ import annotations
2828

29-
from typing import Dict, Optional, Type
29+
from typing import Dict, Literal, Optional, Type
30+
31+
from typing_extensions import TypeAlias
3032

3133
from pyiceberg.avro.codecs.bzip2 import BZip2Codec
3234
from pyiceberg.avro.codecs.codec import Codec
3335
from pyiceberg.avro.codecs.deflate import DeflateCodec
3436
from pyiceberg.avro.codecs.snappy_codec import SnappyCodec
3537
from pyiceberg.avro.codecs.zstandard_codec import ZStandardCodec
3638

37-
KNOWN_CODECS: Dict[str, Optional[Type[Codec]]] = {
39+
AvroCompressionCodec: TypeAlias = Literal["null", "bzip2", "snappy", "zstandard", "deflate"]
40+
41+
AVRO_CODEC_KEY = "avro.codec"
42+
43+
KNOWN_CODECS: Dict[AvroCompressionCodec, Optional[Type[Codec]]] = {
3844
"null": None,
3945
"bzip2": BZip2Codec,
4046
"snappy": SnappyCodec,
4147
"zstandard": ZStandardCodec,
4248
"deflate": DeflateCodec,
4349
}
50+
51+
# Map to convert the naming from Iceberg to Avro
52+
CODEC_MAPPING_ICEBERG_TO_AVRO: Dict[str, str] = {"gzip": "deflate", "zstd": "zstandard"}

pyiceberg/avro/file.py

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
TypeVar,
3636
)
3737

38-
from pyiceberg.avro.codecs import KNOWN_CODECS
38+
from pyiceberg.avro.codecs import AVRO_CODEC_KEY, CODEC_MAPPING_ICEBERG_TO_AVRO, KNOWN_CODECS
3939
from pyiceberg.avro.codecs.codec import Codec
4040
from pyiceberg.avro.decoder import BinaryDecoder, new_decoder
4141
from pyiceberg.avro.encoder import BinaryEncoder
@@ -69,7 +69,6 @@
6969
NestedField(field_id=300, name="sync", field_type=FixedType(length=SYNC_SIZE), required=True),
7070
)
7171

72-
_CODEC_KEY = "avro.codec"
7372
_SCHEMA_KEY = "avro.schema"
7473

7574

@@ -92,11 +91,13 @@ def compression_codec(self) -> Optional[Type[Codec]]:
9291
In the case of a null codec, we return a None indicating that we
9392
don't need to compress/decompress.
9493
"""
95-
codec_name = self.meta.get(_CODEC_KEY, "null")
94+
from pyiceberg.table import TableProperties
95+
96+
codec_name = self.meta.get(AVRO_CODEC_KEY, TableProperties.WRITE_AVRO_COMPRESSION_DEFAULT)
9697
if codec_name not in KNOWN_CODECS:
9798
raise ValueError(f"Unsupported codec: {codec_name}")
9899

99-
return KNOWN_CODECS[codec_name]
100+
return KNOWN_CODECS[codec_name] # type: ignore
100101

101102
def get_schema(self) -> Schema:
102103
if _SCHEMA_KEY in self.meta:
@@ -276,11 +277,36 @@ def __exit__(
276277
self.output_stream.close()
277278

278279
def _write_header(self) -> None:
280+
from pyiceberg.table import TableProperties
281+
282+
codec_name = self.metadata.get(AVRO_CODEC_KEY, TableProperties.WRITE_AVRO_COMPRESSION_DEFAULT)
283+
if avro_codec_name := CODEC_MAPPING_ICEBERG_TO_AVRO.get(codec_name):
284+
codec_name = avro_codec_name
285+
279286
json_schema = json.dumps(AvroSchemaConversion().iceberg_to_avro(self.file_schema, schema_name=self.schema_name))
280-
meta = {**self.metadata, _SCHEMA_KEY: json_schema, _CODEC_KEY: "null"}
287+
288+
meta = {**self.metadata, _SCHEMA_KEY: json_schema, AVRO_CODEC_KEY: codec_name}
281289
header = AvroFileHeader(MAGIC, meta, self.sync_bytes)
282290
construct_writer(META_SCHEMA).write(self.encoder, header)
283291

292+
def compression_codec(self) -> Optional[Type[Codec]]:
293+
"""Get the file's compression codec algorithm from the file's metadata.
294+
295+
In the case of a null codec, we return a None indicating that we
296+
don't need to compress/decompress.
297+
"""
298+
from pyiceberg.table import TableProperties
299+
300+
codec_name = self.metadata.get(AVRO_CODEC_KEY, TableProperties.WRITE_AVRO_COMPRESSION_DEFAULT)
301+
302+
if avro_codec_name := CODEC_MAPPING_ICEBERG_TO_AVRO.get(codec_name):
303+
codec_name = avro_codec_name
304+
305+
if codec_name not in KNOWN_CODECS:
306+
raise ValueError(f"Unsupported codec: {codec_name}")
307+
308+
return KNOWN_CODECS[codec_name] # type: ignore
309+
284310
def write_block(self, objects: List[D]) -> None:
285311
in_memory = io.BytesIO()
286312
block_content_encoder = BinaryEncoder(output_stream=in_memory)
@@ -289,6 +315,13 @@ def write_block(self, objects: List[D]) -> None:
289315
block_content = in_memory.getvalue()
290316

291317
self.encoder.write_int(len(objects))
292-
self.encoder.write_int(len(block_content))
293-
self.encoder.write(block_content)
318+
319+
if codec := self.compression_codec():
320+
content, content_length = codec.compress(block_content)
321+
self.encoder.write_int(content_length)
322+
self.encoder.write(content)
323+
else:
324+
self.encoder.write_int(len(block_content))
325+
self.encoder.write(block_content)
326+
294327
self.encoder.write(self.sync_bytes)

pyiceberg/manifest.py

Lines changed: 58 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
from cachetools.keys import hashkey
3838
from pydantic_core import to_json
3939

40+
from pyiceberg.avro.codecs import AVRO_CODEC_KEY, AvroCompressionCodec
4041
from pyiceberg.avro.file import AvroFile, AvroOutputFile
4142
from pyiceberg.conversions import to_bytes
4243
from pyiceberg.exceptions import ValidationError
@@ -946,9 +947,16 @@ class ManifestWriter(ABC):
946947
_deleted_rows: int
947948
_min_sequence_number: Optional[int]
948949
_partitions: List[Record]
949-
_reused_entry_wrapper: ManifestEntry
950+
_compression: AvroCompressionCodec
950951

951-
def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int) -> None:
952+
def __init__(
953+
self,
954+
spec: PartitionSpec,
955+
schema: Schema,
956+
output_file: OutputFile,
957+
snapshot_id: int,
958+
avro_compression: AvroCompressionCodec,
959+
) -> None:
952960
self.closed = False
953961
self._spec = spec
954962
self._schema = schema
@@ -963,6 +971,7 @@ def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile,
963971
self._deleted_rows = 0
964972
self._min_sequence_number = None
965973
self._partitions = []
974+
self._compression = avro_compression
966975

967976
def __enter__(self) -> ManifestWriter:
968977
"""Open the writer."""
@@ -998,6 +1007,7 @@ def _meta(self) -> Dict[str, str]:
9981007
"partition-spec": to_json(self._spec.fields).decode("utf-8"),
9991008
"partition-spec-id": str(self._spec.spec_id),
10001009
"format-version": str(self.version),
1010+
AVRO_CODEC_KEY: self._compression,
10011011
}
10021012

10031013
def _with_partition(self, format_version: TableVersion) -> Schema:
@@ -1109,13 +1119,15 @@ def existing(self, entry: ManifestEntry) -> ManifestWriter:
11091119

11101120

11111121
class ManifestWriterV1(ManifestWriter):
1112-
def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int):
1113-
super().__init__(
1114-
spec,
1115-
schema,
1116-
output_file,
1117-
snapshot_id,
1118-
)
1122+
def __init__(
1123+
self,
1124+
spec: PartitionSpec,
1125+
schema: Schema,
1126+
output_file: OutputFile,
1127+
snapshot_id: int,
1128+
avro_compression: AvroCompressionCodec,
1129+
):
1130+
super().__init__(spec, schema, output_file, snapshot_id, avro_compression)
11191131

11201132
def content(self) -> ManifestContent:
11211133
return ManifestContent.DATA
@@ -1129,8 +1141,15 @@ def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry:
11291141

11301142

11311143
class ManifestWriterV2(ManifestWriter):
1132-
def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int):
1133-
super().__init__(spec, schema, output_file, snapshot_id)
1144+
def __init__(
1145+
self,
1146+
spec: PartitionSpec,
1147+
schema: Schema,
1148+
output_file: OutputFile,
1149+
snapshot_id: int,
1150+
avro_compression: AvroCompressionCodec,
1151+
):
1152+
super().__init__(spec, schema, output_file, snapshot_id, avro_compression)
11341153

11351154
def content(self) -> ManifestContent:
11361155
return ManifestContent.DATA
@@ -1156,12 +1175,17 @@ def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry:
11561175

11571176

11581177
def write_manifest(
1159-
format_version: TableVersion, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int
1178+
format_version: TableVersion,
1179+
spec: PartitionSpec,
1180+
schema: Schema,
1181+
output_file: OutputFile,
1182+
snapshot_id: int,
1183+
avro_compression: AvroCompressionCodec,
11601184
) -> ManifestWriter:
11611185
if format_version == 1:
1162-
return ManifestWriterV1(spec, schema, output_file, snapshot_id)
1186+
return ManifestWriterV1(spec, schema, output_file, snapshot_id, avro_compression)
11631187
elif format_version == 2:
1164-
return ManifestWriterV2(spec, schema, output_file, snapshot_id)
1188+
return ManifestWriterV2(spec, schema, output_file, snapshot_id, avro_compression)
11651189
else:
11661190
raise ValueError(f"Cannot write manifest for table version: {format_version}")
11671191

@@ -1211,14 +1235,21 @@ def add_manifests(self, manifest_files: List[ManifestFile]) -> ManifestListWrite
12111235

12121236

12131237
class ManifestListWriterV1(ManifestListWriter):
1214-
def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: Optional[int]):
1238+
def __init__(
1239+
self,
1240+
output_file: OutputFile,
1241+
snapshot_id: int,
1242+
parent_snapshot_id: Optional[int],
1243+
compression: AvroCompressionCodec,
1244+
):
12151245
super().__init__(
12161246
format_version=1,
12171247
output_file=output_file,
12181248
meta={
12191249
"snapshot-id": str(snapshot_id),
12201250
"parent-snapshot-id": str(parent_snapshot_id) if parent_snapshot_id is not None else "null",
12211251
"format-version": "1",
1252+
AVRO_CODEC_KEY: compression,
12221253
},
12231254
)
12241255

@@ -1232,7 +1263,14 @@ class ManifestListWriterV2(ManifestListWriter):
12321263
_commit_snapshot_id: int
12331264
_sequence_number: int
12341265

1235-
def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: Optional[int], sequence_number: int):
1266+
def __init__(
1267+
self,
1268+
output_file: OutputFile,
1269+
snapshot_id: int,
1270+
parent_snapshot_id: Optional[int],
1271+
sequence_number: int,
1272+
compression: AvroCompressionCodec,
1273+
):
12361274
super().__init__(
12371275
format_version=2,
12381276
output_file=output_file,
@@ -1241,6 +1279,7 @@ def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id
12411279
"parent-snapshot-id": str(parent_snapshot_id) if parent_snapshot_id is not None else "null",
12421280
"sequence-number": str(sequence_number),
12431281
"format-version": "2",
1282+
AVRO_CODEC_KEY: compression,
12441283
},
12451284
)
12461285
self._commit_snapshot_id = snapshot_id
@@ -1275,12 +1314,13 @@ def write_manifest_list(
12751314
snapshot_id: int,
12761315
parent_snapshot_id: Optional[int],
12771316
sequence_number: Optional[int],
1317+
avro_compression: AvroCompressionCodec,
12781318
) -> ManifestListWriter:
12791319
if format_version == 1:
1280-
return ManifestListWriterV1(output_file, snapshot_id, parent_snapshot_id)
1320+
return ManifestListWriterV1(output_file, snapshot_id, parent_snapshot_id, avro_compression)
12811321
elif format_version == 2:
12821322
if sequence_number is None:
12831323
raise ValueError(f"Sequence-number is required for V2 tables: {sequence_number}")
1284-
return ManifestListWriterV2(output_file, snapshot_id, parent_snapshot_id, sequence_number)
1324+
return ManifestListWriterV2(output_file, snapshot_id, parent_snapshot_id, sequence_number, avro_compression)
12851325
else:
12861326
raise ValueError(f"Cannot write manifest list for table version: {format_version}")

pyiceberg/table/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,9 @@ class TableProperties:
192192
WRITE_TARGET_FILE_SIZE_BYTES = "write.target-file-size-bytes"
193193
WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT = 512 * 1024 * 1024 # 512 MB
194194

195+
WRITE_AVRO_COMPRESSION = "write.avro.compression-codec"
196+
WRITE_AVRO_COMPRESSION_DEFAULT = "gzip"
197+
195198
DEFAULT_WRITE_METRICS_MODE = "write.metadata.metrics.default"
196199
DEFAULT_WRITE_METRICS_MODE_DEFAULT = "truncate(16)"
197200

pyiceberg/table/update/snapshot.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
from sortedcontainers import SortedList
2929

30+
from pyiceberg.avro.codecs import AvroCompressionCodec
3031
from pyiceberg.expressions import (
3132
AlwaysFalse,
3233
BooleanExpression,
@@ -104,6 +105,7 @@ class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]):
104105
_added_data_files: List[DataFile]
105106
_manifest_num_counter: itertools.count[int]
106107
_deleted_data_files: Set[DataFile]
108+
_compression: AvroCompressionCodec
107109

108110
def __init__(
109111
self,
@@ -126,6 +128,11 @@ def __init__(
126128
self._deleted_data_files = set()
127129
self.snapshot_properties = snapshot_properties
128130
self._manifest_num_counter = itertools.count(0)
131+
from pyiceberg.table import TableProperties
132+
133+
self._compression = self._transaction.table_metadata.properties.get( # type: ignore
134+
TableProperties.WRITE_AVRO_COMPRESSION, TableProperties.WRITE_AVRO_COMPRESSION_DEFAULT
135+
)
129136

130137
def append_data_file(self, data_file: DataFile) -> _SnapshotProducer[U]:
131138
self._added_data_files.append(data_file)
@@ -154,6 +161,7 @@ def _write_added_manifest() -> List[ManifestFile]:
154161
schema=self._transaction.table_metadata.schema(),
155162
output_file=self.new_manifest_output(),
156163
snapshot_id=self._snapshot_id,
164+
avro_compression=self._compression,
157165
) as writer:
158166
for data_file in self._added_data_files:
159167
writer.add(
@@ -184,6 +192,7 @@ def _write_delete_manifest() -> List[ManifestFile]:
184192
schema=self._transaction.table_metadata.schema(),
185193
output_file=self.new_manifest_output(),
186194
snapshot_id=self._snapshot_id,
195+
avro_compression=self._compression,
187196
) as writer:
188197
for entry in entries:
189198
writer.add_entry(entry)
@@ -249,12 +258,14 @@ def _commit(self) -> UpdatesAndRequirements:
249258
)
250259
location_provider = self._transaction._table.location_provider()
251260
manifest_list_file_path = location_provider.new_metadata_location(file_name)
261+
252262
with write_manifest_list(
253263
format_version=self._transaction.table_metadata.format_version,
254264
output_file=self._io.new_output(manifest_list_file_path),
255265
snapshot_id=self._snapshot_id,
256266
parent_snapshot_id=self._parent_snapshot_id,
257267
sequence_number=next_sequence_number,
268+
avro_compression=self._compression,
258269
) as writer:
259270
writer.add_manifests(new_manifests)
260271

@@ -291,6 +302,7 @@ def new_manifest_writer(self, spec: PartitionSpec) -> ManifestWriter:
291302
schema=self._transaction.table_metadata.schema(),
292303
output_file=self.new_manifest_output(),
293304
snapshot_id=self._snapshot_id,
305+
avro_compression=self._compression,
294306
)
295307

296308
def new_manifest_output(self) -> OutputFile:
@@ -416,6 +428,7 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) ->
416428
schema=self._transaction.table_metadata.schema(),
417429
output_file=self.new_manifest_output(),
418430
snapshot_id=self._snapshot_id,
431+
avro_compression=self._compression,
419432
) as writer:
420433
for existing_entry in existing_entries:
421434
writer.add_entry(existing_entry)
@@ -550,6 +563,7 @@ def _existing_manifests(self) -> List[ManifestFile]:
550563
schema=self._transaction.table_metadata.schema(),
551564
output_file=self.new_manifest_output(),
552565
snapshot_id=self._snapshot_id,
566+
avro_compression=self._compression,
553567
) as writer:
554568
[
555569
writer.add_entry(

0 commit comments

Comments
 (0)