Skip to content

Commit be4d1a3

Browse files
authored
[python] Support conflict detection when updating existing data concurrently. (apache#7323)
1 parent 4cac917 commit be4d1a3

File tree

14 files changed

+1018
-30
lines changed

14 files changed

+1018
-30
lines changed

paimon-python/pypaimon/manifest/manifest_file_manager.py

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -53,17 +53,6 @@ def read_entries_parallel(self, manifest_files: List[ManifestFileMeta], manifest
5353
def _process_single_manifest(manifest_file: ManifestFileMeta) -> List[ManifestEntry]:
5454
return self.read(manifest_file.file_name, manifest_entry_filter, drop_stats)
5555

56-
def _entry_identifier(e: ManifestEntry) -> tuple:
57-
return (
58-
tuple(e.partition.values),
59-
e.bucket,
60-
e.file.level,
61-
e.file.file_name,
62-
tuple(e.file.extra_files) if e.file.extra_files else (),
63-
e.file.embedded_index,
64-
e.file.external_path,
65-
)
66-
6756
deleted_entry_keys = set()
6857
added_entries = []
6958
with ThreadPoolExecutor(max_workers=max_workers) as executor:
@@ -73,11 +62,11 @@ def _entry_identifier(e: ManifestEntry) -> tuple:
7362
if entry.kind == 0: # ADD
7463
added_entries.append(entry)
7564
else: # DELETE
76-
deleted_entry_keys.add(_entry_identifier(entry))
65+
deleted_entry_keys.add(entry.identifier())
7766

7867
final_entries = [
7968
entry for entry in added_entries
80-
if _entry_identifier(entry) not in deleted_entry_keys
69+
if entry.identifier() not in deleted_entry_keys
8170
]
8271
return final_entries
8372

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
"""
19+
Entry representing a file.
20+
"""
21+
22+
23+
class FileEntry:
24+
"""Entry representing a file.
25+
26+
The same Identifier indicates that the FileEntry refers to the same data file.
27+
"""
28+
29+
class Identifier:
30+
"""Unique identifier for a file entry.
31+
32+
Uses partition, bucket, level, fileName, extraFiles,
33+
embeddedIndex and externalPath to identify a file.
34+
"""
35+
36+
def __init__(self, partition, bucket, level, file_name,
37+
extra_files, embedded_index, external_path):
38+
self.partition = partition
39+
self.bucket = bucket
40+
self.level = level
41+
self.file_name = file_name
42+
self.extra_files = extra_files
43+
self.embedded_index = embedded_index
44+
self.external_path = external_path
45+
self._hash = None
46+
47+
def __eq__(self, other):
48+
if self is other:
49+
return True
50+
if other is None or not isinstance(other, FileEntry.Identifier):
51+
return False
52+
return (self.bucket == other.bucket
53+
and self.level == other.level
54+
and self.partition == other.partition
55+
and self.file_name == other.file_name
56+
and self.extra_files == other.extra_files
57+
and self.embedded_index == other.embedded_index
58+
and self.external_path == other.external_path)
59+
60+
def __hash__(self):
61+
if self._hash is None:
62+
self._hash = hash((
63+
self.partition,
64+
self.bucket,
65+
self.level,
66+
self.file_name,
67+
self.extra_files,
68+
self.embedded_index,
69+
self.external_path,
70+
))
71+
return self._hash
72+
73+
def identifier(self):
74+
"""Build a unique Identifier for this file entry.
75+
76+
Returns:
77+
An Identifier instance.
78+
"""
79+
extra_files = (tuple(self.file.extra_files)
80+
if self.file.extra_files else ())
81+
return FileEntry.Identifier(
82+
partition=self.partition,
83+
bucket=self.bucket,
84+
level=self.file.level,
85+
file_name=self.file.file_name,
86+
extra_files=extra_files,
87+
embedded_index=self.file.embedded_index,
88+
external_path=self.file.external_path,
89+
)
90+
91+
@staticmethod
92+
def merge_entries(entries):
93+
"""Merge file entries: ADD and DELETE of the same file cancel each other.
94+
95+
- ADD: if identifier already in map, raise error; otherwise add to map.
96+
- DELETE: if identifier already in map, remove both (cancel);
97+
otherwise add to map.
98+
99+
Args:
100+
entries: Iterable of FileEntry.
101+
102+
Returns:
103+
List of merged FileEntry values, preserving insertion order.
104+
105+
Raises:
106+
RuntimeError: If trying to add a file that is already in the map.
107+
"""
108+
entry_map = {}
109+
110+
for entry in entries:
111+
entry_identifier = entry.identifier()
112+
if entry.kind == 0: # ADD
113+
if entry_identifier in entry_map:
114+
raise RuntimeError(
115+
"Trying to add file {} which is already added.".format(
116+
entry.file.file_name))
117+
entry_map[entry_identifier] = entry
118+
elif entry.kind == 1: # DELETE
119+
if entry_identifier in entry_map:
120+
del entry_map[entry_identifier]
121+
else:
122+
entry_map[entry_identifier] = entry
123+
else:
124+
raise RuntimeError(
125+
"Unknown entry kind: {}".format(entry.kind))
126+
127+
return list(entry_map.values())

paimon-python/pypaimon/manifest/schema/manifest_entry.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,12 @@
2020

2121
from pypaimon.manifest.schema.data_file_meta import (DATA_FILE_META_SCHEMA,
2222
DataFileMeta)
23+
from pypaimon.manifest.schema.file_entry import FileEntry
2324
from pypaimon.table.row.generic_row import GenericRow
2425

2526

2627
@dataclass
27-
class ManifestEntry:
28+
class ManifestEntry(FileEntry):
2829
kind: int
2930
partition: GenericRow
3031
bucket: int

paimon-python/pypaimon/schema/data_types.py

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,16 @@ def __init__(self, type: str, nullable: bool = True):
7373
super().__init__(nullable)
7474
self.type = type
7575

76+
def __eq__(self, other):
77+
if self is other:
78+
return True
79+
if not isinstance(other, AtomicType):
80+
return False
81+
return self.type == other.type and self.nullable == other.nullable
82+
83+
def __hash__(self):
84+
return hash((self.type, self.nullable))
85+
7686
def to_dict(self) -> str:
7787
if not self.nullable:
7888
return self.type + " NOT NULL"
@@ -95,6 +105,16 @@ def __init__(self, nullable: bool, element_type: DataType):
95105
super().__init__(nullable)
96106
self.element = element_type
97107

108+
def __eq__(self, other):
109+
if self is other:
110+
return True
111+
if not isinstance(other, ArrayType):
112+
return False
113+
return self.element == other.element and self.nullable == other.nullable
114+
115+
def __hash__(self):
116+
return hash((self.element, self.nullable))
117+
98118
def to_dict(self) -> Dict[str, Any]:
99119
return {
100120
"type": "ARRAY" + (" NOT NULL" if not self.nullable else ""),
@@ -119,6 +139,16 @@ def __init__(self, nullable: bool, element_type: DataType):
119139
super().__init__(nullable)
120140
self.element = element_type
121141

142+
def __eq__(self, other):
143+
if self is other:
144+
return True
145+
if not isinstance(other, MultisetType):
146+
return False
147+
return self.element == other.element and self.nullable == other.nullable
148+
149+
def __hash__(self):
150+
return hash((self.element, self.nullable))
151+
122152
def to_dict(self) -> Dict[str, Any]:
123153
return {
124154
"type": "MULTISET{}{}".format('<' + str(self.element) + '>' if self.element else '',
@@ -150,6 +180,18 @@ def __init__(
150180
self.key = key_type
151181
self.value = value_type
152182

183+
def __eq__(self, other):
184+
if self is other:
185+
return True
186+
if not isinstance(other, MapType):
187+
return False
188+
return (self.key == other.key
189+
and self.value == other.value
190+
and self.nullable == other.nullable)
191+
192+
def __hash__(self):
193+
return hash((self.key, self.value, self.nullable))
194+
153195
def to_dict(self) -> Dict[str, Any]:
154196
return {
155197
"type": "MAP<{}, {}>".format(self.key, self.value),
@@ -199,6 +241,21 @@ def __init__(
199241
def from_dict(cls, data: Dict[str, Any]) -> "DataField":
200242
return DataTypeParser.parse_data_field(data)
201243

244+
def __eq__(self, other):
245+
if self is other:
246+
return True
247+
if not isinstance(other, DataField):
248+
return False
249+
return (self.id == other.id
250+
and self.name == other.name
251+
and self.type == other.type
252+
and self.description == other.description
253+
and self.default_value == other.default_value)
254+
255+
def __hash__(self):
256+
return hash((self.id, self.name, self.type,
257+
self.description, self.default_value))
258+
202259
def to_dict(self) -> Dict[str, Any]:
203260
result = {
204261
self.FIELD_ID: self.id,
@@ -223,6 +280,16 @@ def __init__(self, nullable: bool, fields: List[DataField]):
223280
super().__init__(nullable)
224281
self.fields = fields or []
225282

283+
def __eq__(self, other):
284+
if self is other:
285+
return True
286+
if not isinstance(other, RowType):
287+
return False
288+
return self.fields == other.fields and self.nullable == other.nullable
289+
290+
def __hash__(self):
291+
return hash((tuple(self.fields), self.nullable))
292+
226293
def to_dict(self) -> Dict[str, Any]:
227294
return {
228295
"type": "ROW" + ("" if self.nullable else " NOT NULL"),
@@ -587,7 +654,7 @@ def to_avro_type(field_type: pyarrow.DataType, field_name: str,
587654
parent_name: str = "record") -> Union[str, Dict[str, Any]]:
588655
if pyarrow.types.is_integer(field_type):
589656
if (pyarrow.types.is_signed_integer(field_type) and field_type.bit_width <= 32) or \
590-
(pyarrow.types.is_unsigned_integer(field_type) and field_type.bit_width < 32):
657+
(pyarrow.types.is_unsigned_integer(field_type) and field_type.bit_width < 32):
591658
return "int"
592659
else:
593660
return "long"

paimon-python/pypaimon/table/row/generic_row.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,16 @@ def get_row_kind(self) -> RowKind:
5151
def __len__(self) -> int:
5252
return len(self.values)
5353

54+
def __eq__(self, other):
55+
if self is other:
56+
return True
57+
if not isinstance(other, GenericRow):
58+
return False
59+
return self.values == other.values and self.row_kind == other.row_kind
60+
61+
def __hash__(self):
62+
return hash((tuple(self.values), tuple(self.fields), self.row_kind))
63+
5464
def __str__(self):
5565
field_strs = [f"{field.name}={repr(value)}" for field, value in zip(self.fields, self.values)]
5666
return f"GenericRow(row_kind={self.row_kind.name}, {', '.join(field_strs)})"

0 commit comments

Comments
 (0)