Skip to content

Commit 8c581cd

Browse files
authored
[Spark] Add vacuum, upgrade, generate, clone command APIs to Delta Connect (Scala client + server) (delta-io#4516)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [] Other ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> - Adds the following new API to the Scala Delta Connect client : `clone`, `vacuum`, `generate` and `upgradeTableProtocol`. - Adds the proto definitions for the aforementioned APIs. - Adds the server-side implementation of the aforementioned APIs. ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> New tests in `DeltaConnectPlannerSuite` (server-side changes) and in `DeltaTableSuite` (Note: these tests are set to `ignore` until delta-io#4468 is merged) ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> Yes. Adds the `clone`, `vacuum`, `generate` and `upgradeTableProtocol` APIs to the Scala Delta Connect client.
1 parent 972d2bf commit 8c581cd

File tree

10 files changed

+936
-35
lines changed

10 files changed

+936
-35
lines changed

python/delta/connect/proto/commands_pb2.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131

3232

3333
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
34-
b'\n\x1c\x64\x65lta/connect/commands.proto\x12\rdelta.connect\x1a\x18\x64\x65lta/connect/base.proto"\\\n\x0c\x44\x65ltaCommand\x12<\n\x0b\x63lone_table\x18\x01 \x01(\x0b\x32\x19.delta.connect.CloneTableH\x00R\ncloneTableB\x0e\n\x0c\x63ommand_type"\xec\x02\n\nCloneTable\x12/\n\x05table\x18\x01 \x01(\x0b\x32\x19.delta.connect.DeltaTableR\x05table\x12\x16\n\x06target\x18\x02 \x01(\tR\x06target\x12\x1a\n\x07version\x18\x03 \x01(\x05H\x00R\x07version\x12\x1e\n\ttimestamp\x18\x04 \x01(\tH\x00R\ttimestamp\x12\x1d\n\nis_shallow\x18\x05 \x01(\x08R\tisShallow\x12\x18\n\x07replace\x18\x06 \x01(\x08R\x07replace\x12I\n\nproperties\x18\x07 \x03(\x0b\x32).delta.connect.CloneTable.PropertiesEntryR\nproperties\x1a=\n\x0fPropertiesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x16\n\x14version_or_timestampB\x1a\n\x16io.delta.connect.protoP\x01\x62\x06proto3'
34+
b'\n\x1c\x64\x65lta/connect/commands.proto\x12\rdelta.connect\x1a\x18\x64\x65lta/connect/base.proto"\xb1\x02\n\x0c\x44\x65ltaCommand\x12<\n\x0b\x63lone_table\x18\x01 \x01(\x0b\x32\x19.delta.connect.CloneTableH\x00R\ncloneTable\x12?\n\x0cvacuum_table\x18\x02 \x01(\x0b\x32\x1a.delta.connect.VacuumTableH\x00R\x0bvacuumTable\x12[\n\x16upgrade_table_protocol\x18\x03 \x01(\x0b\x32#.delta.connect.UpgradeTableProtocolH\x00R\x14upgradeTableProtocol\x12\x35\n\x08generate\x18\x04 \x01(\x0b\x32\x17.delta.connect.GenerateH\x00R\x08generateB\x0e\n\x0c\x63ommand_type"\xec\x02\n\nCloneTable\x12/\n\x05table\x18\x01 \x01(\x0b\x32\x19.delta.connect.DeltaTableR\x05table\x12\x16\n\x06target\x18\x02 \x01(\tR\x06target\x12\x1a\n\x07version\x18\x03 \x01(\x05H\x00R\x07version\x12\x1e\n\ttimestamp\x18\x04 \x01(\tH\x00R\ttimestamp\x12\x1d\n\nis_shallow\x18\x05 \x01(\x08R\tisShallow\x12\x18\n\x07replace\x18\x06 \x01(\x08R\x07replace\x12I\n\nproperties\x18\x07 \x03(\x0b\x32).delta.connect.CloneTable.PropertiesEntryR\nproperties\x1a=\n\x0fPropertiesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x16\n\x14version_or_timestamp"\x80\x01\n\x0bVacuumTable\x12/\n\x05table\x18\x01 \x01(\x0b\x32\x19.delta.connect.DeltaTableR\x05table\x12,\n\x0fretention_hours\x18\x02 \x01(\x01H\x00R\x0eretentionHours\x88\x01\x01\x42\x12\n\x10_retention_hours"\x95\x01\n\x14UpgradeTableProtocol\x12/\n\x05table\x18\x01 \x01(\x0b\x32\x19.delta.connect.DeltaTableR\x05table\x12%\n\x0ereader_version\x18\x02 \x01(\x05R\rreaderVersion\x12%\n\x0ewriter_version\x18\x03 \x01(\x05R\rwriterVersion"O\n\x08Generate\x12/\n\x05table\x18\x01 \x01(\x0b\x32\x19.delta.connect.DeltaTableR\x05table\x12\x12\n\x04mode\x18\x02 \x01(\tR\x04modeB\x1a\n\x16io.delta.connect.protoP\x01\x62\x06proto3'
3535
)
3636

3737
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
@@ -41,10 +41,16 @@
4141
DESCRIPTOR._serialized_options = b"\n\026io.delta.connect.protoP\001"
4242
_CLONETABLE_PROPERTIESENTRY._options = None
4343
_CLONETABLE_PROPERTIESENTRY._serialized_options = b"8\001"
44-
_DELTACOMMAND._serialized_start = 73
45-
_DELTACOMMAND._serialized_end = 165
46-
_CLONETABLE._serialized_start = 168
47-
_CLONETABLE._serialized_end = 532
48-
_CLONETABLE_PROPERTIESENTRY._serialized_start = 447
49-
_CLONETABLE_PROPERTIESENTRY._serialized_end = 508
44+
_DELTACOMMAND._serialized_start = 74
45+
_DELTACOMMAND._serialized_end = 379
46+
_CLONETABLE._serialized_start = 382
47+
_CLONETABLE._serialized_end = 746
48+
_CLONETABLE_PROPERTIESENTRY._serialized_start = 661
49+
_CLONETABLE_PROPERTIESENTRY._serialized_end = 722
50+
_VACUUMTABLE._serialized_start = 749
51+
_VACUUMTABLE._serialized_end = 877
52+
_UPGRADETABLEPROTOCOL._serialized_start = 880
53+
_UPGRADETABLEPROTOCOL._serialized_end = 1029
54+
_GENERATE._serialized_start = 1031
55+
_GENERATE._serialized_end = 1110
5056
# @@protoc_insertion_point(module_scope)

python/delta/connect/proto/commands_pb2.pyi

Lines changed: 154 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,28 +52,63 @@ class DeltaCommand(google.protobuf.message.Message):
5252
DESCRIPTOR: google.protobuf.descriptor.Descriptor
5353

5454
CLONE_TABLE_FIELD_NUMBER: builtins.int
55+
VACUUM_TABLE_FIELD_NUMBER: builtins.int
56+
UPGRADE_TABLE_PROTOCOL_FIELD_NUMBER: builtins.int
57+
GENERATE_FIELD_NUMBER: builtins.int
5558
@property
5659
def clone_table(self) -> global___CloneTable: ...
60+
@property
61+
def vacuum_table(self) -> global___VacuumTable: ...
62+
@property
63+
def upgrade_table_protocol(self) -> global___UpgradeTableProtocol: ...
64+
@property
65+
def generate(self) -> global___Generate: ...
5766
def __init__(
5867
self,
5968
*,
6069
clone_table: global___CloneTable | None = ...,
70+
vacuum_table: global___VacuumTable | None = ...,
71+
upgrade_table_protocol: global___UpgradeTableProtocol | None = ...,
72+
generate: global___Generate | None = ...,
6173
) -> None: ...
6274
def HasField(
6375
self,
6476
field_name: typing_extensions.Literal[
65-
"clone_table", b"clone_table", "command_type", b"command_type"
77+
"clone_table",
78+
b"clone_table",
79+
"command_type",
80+
b"command_type",
81+
"generate",
82+
b"generate",
83+
"upgrade_table_protocol",
84+
b"upgrade_table_protocol",
85+
"vacuum_table",
86+
b"vacuum_table",
6687
],
6788
) -> builtins.bool: ...
6889
def ClearField(
6990
self,
7091
field_name: typing_extensions.Literal[
71-
"clone_table", b"clone_table", "command_type", b"command_type"
92+
"clone_table",
93+
b"clone_table",
94+
"command_type",
95+
b"command_type",
96+
"generate",
97+
b"generate",
98+
"upgrade_table_protocol",
99+
b"upgrade_table_protocol",
100+
"vacuum_table",
101+
b"vacuum_table",
72102
],
73103
) -> None: ...
74104
def WhichOneof(
75105
self, oneof_group: typing_extensions.Literal["command_type", b"command_type"]
76-
) -> typing_extensions.Literal["clone_table"] | None: ...
106+
) -> (
107+
typing_extensions.Literal[
108+
"clone_table", "vacuum_table", "upgrade_table_protocol", "generate"
109+
]
110+
| None
111+
): ...
77112

78113
global___DeltaCommand = DeltaCommand
79114

@@ -177,3 +212,119 @@ class CloneTable(google.protobuf.message.Message):
177212
) -> typing_extensions.Literal["version", "timestamp"] | None: ...
178213

179214
global___CloneTable = CloneTable
215+
216+
class VacuumTable(google.protobuf.message.Message):
217+
"""Command that deletes files and directories in the table that are not needed by the table for
218+
maintaining older versions up to the given retention threshold.
219+
"""
220+
221+
DESCRIPTOR: google.protobuf.descriptor.Descriptor
222+
223+
TABLE_FIELD_NUMBER: builtins.int
224+
RETENTION_HOURS_FIELD_NUMBER: builtins.int
225+
@property
226+
def table(self) -> delta.connect.proto.base_pb2.DeltaTable:
227+
"""(Required) The Delta table to vacuum."""
228+
retention_hours: builtins.float
229+
"""(Optional) Number of hours retain history for. If not specified, then the default retention
230+
period will be used.
231+
"""
232+
def __init__(
233+
self,
234+
*,
235+
table: delta.connect.proto.base_pb2.DeltaTable | None = ...,
236+
retention_hours: builtins.float | None = ...,
237+
) -> None: ...
238+
def HasField(
239+
self,
240+
field_name: typing_extensions.Literal[
241+
"_retention_hours",
242+
b"_retention_hours",
243+
"retention_hours",
244+
b"retention_hours",
245+
"table",
246+
b"table",
247+
],
248+
) -> builtins.bool: ...
249+
def ClearField(
250+
self,
251+
field_name: typing_extensions.Literal[
252+
"_retention_hours",
253+
b"_retention_hours",
254+
"retention_hours",
255+
b"retention_hours",
256+
"table",
257+
b"table",
258+
],
259+
) -> None: ...
260+
def WhichOneof(
261+
self, oneof_group: typing_extensions.Literal["_retention_hours", b"_retention_hours"]
262+
) -> typing_extensions.Literal["retention_hours"] | None: ...
263+
264+
global___VacuumTable = VacuumTable
265+
266+
class UpgradeTableProtocol(google.protobuf.message.Message):
267+
"""Command to updates the protocol version of the table so that new features can be used."""
268+
269+
DESCRIPTOR: google.protobuf.descriptor.Descriptor
270+
271+
TABLE_FIELD_NUMBER: builtins.int
272+
READER_VERSION_FIELD_NUMBER: builtins.int
273+
WRITER_VERSION_FIELD_NUMBER: builtins.int
274+
@property
275+
def table(self) -> delta.connect.proto.base_pb2.DeltaTable:
276+
"""(Required) The Delta table to upgrade the protocol of."""
277+
reader_version: builtins.int
278+
"""(Required) The minimum required reader protocol version."""
279+
writer_version: builtins.int
280+
"""(Required) The minimum required writer protocol version."""
281+
def __init__(
282+
self,
283+
*,
284+
table: delta.connect.proto.base_pb2.DeltaTable | None = ...,
285+
reader_version: builtins.int = ...,
286+
writer_version: builtins.int = ...,
287+
) -> None: ...
288+
def HasField(
289+
self, field_name: typing_extensions.Literal["table", b"table"]
290+
) -> builtins.bool: ...
291+
def ClearField(
292+
self,
293+
field_name: typing_extensions.Literal[
294+
"reader_version",
295+
b"reader_version",
296+
"table",
297+
b"table",
298+
"writer_version",
299+
b"writer_version",
300+
],
301+
) -> None: ...
302+
303+
global___UpgradeTableProtocol = UpgradeTableProtocol
304+
305+
class Generate(google.protobuf.message.Message):
306+
"""Command that generates manifest files for a given Delta table."""
307+
308+
DESCRIPTOR: google.protobuf.descriptor.Descriptor
309+
310+
TABLE_FIELD_NUMBER: builtins.int
311+
MODE_FIELD_NUMBER: builtins.int
312+
@property
313+
def table(self) -> delta.connect.proto.base_pb2.DeltaTable:
314+
"""(Required) The Delta table to generate the manifest files for."""
315+
mode: builtins.str
316+
"""(Required) The type of manifest file to be generated."""
317+
def __init__(
318+
self,
319+
*,
320+
table: delta.connect.proto.base_pb2.DeltaTable | None = ...,
321+
mode: builtins.str = ...,
322+
) -> None: ...
323+
def HasField(
324+
self, field_name: typing_extensions.Literal["table", b"table"]
325+
) -> builtins.bool: ...
326+
def ClearField(
327+
self, field_name: typing_extensions.Literal["mode", b"mode", "table", b"table"]
328+
) -> None: ...
329+
330+
global___Generate = Generate

0 commit comments

Comments
 (0)