Skip to content

Commit 3cb667a

Browse files
committed
Merge remote-tracking branch 'upstream/master' into disk_ann
* upstream/master: (33 commits) [core] Fix merge adjacent files in DataEvolutionCompactCoordinator [python] Rename list_tag to list_tags [python] add list tag for TagManager (apache#7264) [core][python] Introduce DataFileMeta.nonNullRowIdRange to unify codes [python] with_shard should be evenly distributed for data evolution mode (apache#7271) [core] Remove useless version in Varant [core] Should work with Split in DataTableBatchScan [core] Fix paimon_incremental_query with limit push down (apache#7269) [rest] Improve RestCatalog OpenAPI nonce generation (apache#7270) [cdc] Avoid sending empty schema change events to Schema Evolution (apache#7261) [python] Fix avro write timestamp without timezone wrongly (apache#7259) [doc] add doc for filter by _ROW_ID on data evolution (apache#7262) [fs] Extract jindo dls to separate module (apache#7263) [core] Add listTableDetails method to Catalog interface (apache#7266) [python] Support filter by _ROW_ID for data evolution (apache#7252) [Flink] Add sourceParallelismUpperBound metric for auto-scaling systems (apache#7117) [github] Add whether it is an AI-generated tag in the PR template (apache#7257) [core] Improve HttpClient error response handling (apache#7254) [python] Light refactor: move _is_blob_file check into DataFileMeta (apache#7256) [core] RowIdPredicateVisitor supports converting between statement (apache#7255) ...
2 parents f42b377 + 305fe02 commit 3cb667a

File tree

142 files changed

+4246
-718
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

142 files changed

+4246
-718
lines changed

.github/PULL_REQUEST_TEMPLATE.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,12 @@ Linked issue: close #xxx
1818
### Documentation
1919

2020
<!-- Does this change introduce a new feature -->
21+
22+
### Generative AI tooling
23+
24+
<!--
25+
If generative AI tooling has been used in the process of authoring this patch, please include the
26+
phrase: 'Generated-by: ' followed by the name of the tool and its version.
27+
If no, write 'No'.
28+
Please refer to the [ASF Generative Tooling Guidance](https://www.apache.org/legal/generative-tooling.html) for details.
29+
-->

docs/content/append-table/blob.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,24 @@ For details about the blob file format structure, see [File Format - BLOB]({{< r
122122

123123
*Required for blob functionality to work correctly.
124124

125+
Specifically, if the storage system of the input BlobDescriptor differs from that used by Paimon,
126+
you can specify the storage configuration for the input blob descriptor using the prefix
127+
`blob-descriptor.`. For example, if the source data is stored in a different OSS endpoint,
128+
you can configure it as below (using flink sql as an example):
129+
```sql
130+
CREATE TABLE image_table (
131+
id INT,
132+
name STRING,
133+
image BYTES
134+
) WITH (
135+
'row-tracking.enabled' = 'true',
136+
'data-evolution.enabled' = 'true',
137+
'blob-field' = 'image',
138+
'fs.oss.endpoint' = 'aaa', -- This is for Paimon's own config
139+
'blob-descriptor.fs.oss.endpoint' = 'bbb' -- This is for input blob descriptors' config
140+
);
141+
```
142+
125143
## SQL Usage
126144

127145
### Creating a Table

docs/content/maintenance/metrics.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,12 @@ When using Flink to read and write, Paimon has implemented some key standard Fli
391391
<td>Gauge</td>
392392
<td>Time difference between reading the data file and file creation.</td>
393393
</tr>
394+
<tr>
395+
<td>sourceParallelismUpperBound</td>
396+
<td>Flink Source Enumerator</td>
397+
<td>Gauge</td>
398+
<td>Recommended upper bound of parallelism for auto-scaling systems. Note: This is a recommendation, not a hard limit.</td>
399+
</tr>
394400
</tbody>
395401
</table>
396402

docs/content/pypaimon/data-evolution.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,16 @@ table_commit.close()
9898
# 'f1': [-1001, 1002]
9999
```
100100

101+
## Filter by _ROW_ID
102+
103+
Requires the same [Prerequisites](#prerequisites) (row-tracking and data-evolution enabled). On such tables you can filter by `_ROW_ID` to prune files at scan time. Supported: `equal('_ROW_ID', id)`, `is_in('_ROW_ID', [id1, ...])`, `between('_ROW_ID', low, high)`.
104+
105+
```python
106+
pb = table.new_read_builder().new_predicate_builder()
107+
rb = table.new_read_builder().with_filter(pb.equal('_ROW_ID', 0))
108+
result = rb.new_read().to_arrow(rb.new_scan().plan().splits())
109+
```
110+
101111
## Update Columns By Shards
102112

103113
If you want to **compute a derived column** (or **update an existing column based on other columns**) without providing
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
---
2+
title: "Manage Tags"
3+
weight: 3
4+
type: docs
5+
aliases:
6+
- /pypaimon/manage-tags.html
7+
---
8+
9+
<!--
10+
Licensed to the Apache Software Foundation (ASF) under one
11+
or more contributor license agreements. See the NOTICE file
12+
distributed with this work for additional information
13+
regarding copyright ownership. The ASF licenses this file
14+
to you under the Apache License, Version 2.0 (the
15+
"License"); you may not use this file except in compliance
16+
with the License. You may obtain a copy of the License at
17+
18+
http://www.apache.org/licenses/LICENSE-2.0
19+
20+
Unless required by applicable law or agreed to in writing,
21+
software distributed under the License is distributed on an
22+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
23+
KIND, either express or implied. See the License for the
24+
specific language governing permissions and limitations
25+
under the License.
26+
-->
27+
28+
# Manage Tags
29+
30+
Just like Java API of Paimon, you can create a [tag]({{< ref "maintenance/manage-tags" >}}) based on a snapshot. The tag will maintain the manifests and data files of the snapshot.
31+
A typical usage is creating tags daily, then you can maintain the historical data of each day for batch reading.
32+
## Create and Delete Tag
33+
34+
You can create a tag with given name and snapshot ID, and delete a tag with given name.
35+
36+
```python
37+
38+
table = catalog.get_table('database_name.table_name')
39+
table.create_tag("tag2", snapshot_id=2) # create tag2 based on snapshot 2
40+
table.create_tag("tag2") # create tag2 based on latest snapshot
41+
table.delete_tag("tag2") # delete tag2
42+
```
43+
44+
If snapshot_id unset, snapshot_id defaults to the latest.
45+
46+
## Read Tag
47+
You can read data from a specific tag.
48+
```python
49+
50+
table = catalog.get_table('database_name.table_name')
51+
table.create_tag("tag2", snapshot_id=2)
52+
53+
# Read from tag2 using scan.tag-name option
54+
table_with_tag = table.copy({"scan.tag-name": "tag2"})
55+
read_builder = table_with_tag.new_read_builder()
56+
table_scan = read_builder.new_scan()
57+
table_read = read_builder.new_read()
58+
result = table_read.to_arrow(table_scan.plan().splits())
59+
```
60+
61+

docs/content/pypaimon/python-api.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ predicate_5 = predicate_builder.and_predicates([predicate3, predicate4])
258258
read_builder = read_builder.with_filter(predicate_5)
259259
```
260260

261-
See [Predicate]({{< ref "python-api#predicate" >}}) for all supported filters and building methods.
261+
See [Predicate]({{< ref "python-api#predicate" >}}) for all supported filters and building methods. Filter by `_ROW_ID`: see [Data Evolution]({{< ref "pypaimon/data-evolution#filter-by-_row_id" >}}).
262262

263263
You can also pushdown projection by `ReadBuilder`:
264264

docs/layouts/shortcodes/generated/core_configuration.html

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1452,6 +1452,24 @@
14521452
<td>String</td>
14531453
<td>The Variant shredding schema for writing.</td>
14541454
</tr>
1455+
<tr>
1456+
<td><h5>visibility-callback.check-interval</h5></td>
1457+
<td style="word-wrap: break-word;">10 s</td>
1458+
<td>Duration</td>
1459+
<td>The interval for checking visibility when visibility-callback enabled.</td>
1460+
</tr>
1461+
<tr>
1462+
<td><h5>visibility-callback.enabled</h5></td>
1463+
<td style="word-wrap: break-word;">false</td>
1464+
<td>Boolean</td>
1465+
<td>Whether to enable the visibility wait callback that waits for compaction to complete after commit. This is useful for primary key tables with deletion vectors or postpone bucket mode to ensure data visibility, only used for batch mode or bounded stream.</td>
1466+
</tr>
1467+
<tr>
1468+
<td><h5>visibility-callback.timeout</h5></td>
1469+
<td style="word-wrap: break-word;">30 min</td>
1470+
<td>Duration</td>
1471+
<td>The maximum time to wait for compaction to complete when visibility callback is enabled. If the timeout is reached, an exception will be thrown.</td>
1472+
</tr>
14551473
<tr>
14561474
<td><h5>write-buffer-for-append</h5></td>
14571475
<td style="word-wrap: break-word;">false</td>

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ public class CoreOptions implements Serializable {
9090

9191
public static final String COLUMNS = "columns";
9292

93+
public static final String BLOB_DESCRIPTOR_PREFIX = "blob-descriptor.";
94+
9395
public static final ConfigOption<TableType> TYPE =
9496
key("type")
9597
.enumType(TableType.class)
@@ -2204,6 +2206,30 @@ public InlineElement getDescription() {
22042206
.withDescription(
22052207
"Whether to try upgrading the data files after overwriting a primary key table.");
22062208

2209+
public static final ConfigOption<Boolean> VISIBILITY_CALLBACK_ENABLED =
2210+
key("visibility-callback.enabled")
2211+
.booleanType()
2212+
.defaultValue(false)
2213+
.withDescription(
2214+
"Whether to enable the visibility wait callback that waits for compaction to complete "
2215+
+ "after commit. This is useful for primary key tables with deletion vectors or "
2216+
+ "postpone bucket mode to ensure data visibility, only used for batch mode or bounded stream.");
2217+
2218+
public static final ConfigOption<Duration> VISIBILITY_CALLBACK_TIMEOUT =
2219+
key("visibility-callback.timeout")
2220+
.durationType()
2221+
.defaultValue(Duration.ofMinutes(30))
2222+
.withDescription(
2223+
"The maximum time to wait for compaction to complete when visibility callback is enabled. "
2224+
+ "If the timeout is reached, an exception will be thrown.");
2225+
2226+
public static final ConfigOption<Duration> VISIBILITY_CALLBACK_CHECK_INTERVAL =
2227+
key("visibility-callback.check-interval")
2228+
.durationType()
2229+
.defaultValue(Duration.ofSeconds(10))
2230+
.withDescription(
2231+
"The interval for checking visibility when visibility-callback enabled.");
2232+
22072233
private final Options options;
22082234

22092235
public CoreOptions(Map<String, String> options) {
@@ -3431,6 +3457,18 @@ public boolean overwriteUpgrade() {
34313457
return options.get(OVERWRITE_UPGRADE);
34323458
}
34333459

3460+
public boolean visibilityCallbackEnabled() {
3461+
return options.get(VISIBILITY_CALLBACK_ENABLED);
3462+
}
3463+
3464+
public Duration visibilityCallbackTimeout() {
3465+
return options.get(VISIBILITY_CALLBACK_TIMEOUT);
3466+
}
3467+
3468+
public Duration visibilityCallbackCheckInterval() {
3469+
return options.get(VISIBILITY_CALLBACK_CHECK_INTERVAL);
3470+
}
3471+
34343472
/** Specifies the merge engine for table with primary key. */
34353473
public enum MergeEngine implements DescribedEnum {
34363474
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),

paimon-api/src/main/java/org/apache/paimon/rest/HttpClient.java

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -131,19 +131,14 @@ private <T extends RESTResponse> T exec(HttpUriRequestBase request, Class<T> res
131131
response -> {
132132
String responseBodyStr = RESTUtil.extractResponseBodyAsString(response);
133133
if (!RESTUtil.isSuccessful(response)) {
134-
ErrorResponse error;
134+
ErrorResponse error = null;
135135
try {
136136
error = RESTApi.fromJson(responseBodyStr, ErrorResponse.class);
137137
} catch (JsonProcessingException e) {
138-
error =
139-
new ErrorResponse(
140-
null,
141-
null,
142-
responseBodyStr != null
143-
? responseBodyStr
144-
: "response body is null",
145-
response.getCode());
138+
// ignore exception
146139
}
140+
error = buildErrorResponse(error, responseBodyStr, response.getCode());
141+
147142
errorHandler.accept(error, extractRequestId(response));
148143
}
149144
if (responseType != null && responseBodyStr != null) {
@@ -228,4 +223,22 @@ private static Header[] getHeaders(
228223
.map(entry -> new BasicHeader(entry.getKey(), entry.getValue()))
229224
.toArray(Header[]::new);
230225
}
226+
227+
private static ErrorResponse buildErrorResponse(
228+
ErrorResponse error, String responseBodyStr, int errorCode) {
229+
if (error == null || error.getMessage() == null || error.getMessage().isEmpty()) {
230+
String resourceType =
231+
(error != null && error.getResourceType() != null)
232+
? error.getResourceType()
233+
: "";
234+
String resourceName =
235+
(error != null && error.getResourceName() != null)
236+
? error.getResourceName()
237+
: "";
238+
String message = responseBodyStr != null ? responseBodyStr : "response body is null";
239+
int code = (error != null && error.getCode() != null) ? error.getCode() : errorCode;
240+
error = new ErrorResponse(resourceType, resourceName, message, code);
241+
}
242+
return error;
243+
}
231244
}

paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,28 @@ public PagedList<GetTableResponse> listTableDetailsPaged(
420420
return new PagedList<>(tables, response.getNextPageToken());
421421
}
422422

423+
/**
424+
* List table details for a database.
425+
*
426+
* <p>Gets an array of table details for a database. There is no guarantee of a specific
427+
* ordering of the elements in the array.
428+
*
429+
* @param databaseName name of database.
430+
* @return a list of table details.
431+
* @throws NoSuchResourceException Exception thrown on HTTP 404 means the database not exists
432+
* @throws ForbiddenException Exception thrown on HTTP 403 means don't have the permission for
433+
* this database
434+
*/
435+
public List<GetTableResponse> listTableDetails(String databaseName) {
436+
return listDataFromPageApi(
437+
queryParams ->
438+
client.get(
439+
resourcePaths.tableDetails(databaseName),
440+
queryParams,
441+
ListTableDetailsResponse.class,
442+
restAuthFunction));
443+
}
444+
423445
/**
424446
* List table for a catalog.
425447
*

0 commit comments

Comments
 (0)