Skip to content

Commit 5ee9373

Browse files
[doc] Add Flink Procedure with named argument to documents (#4167)
1 parent 15d3302 commit 5ee9373

File tree

9 files changed

+352
-105
lines changed

9 files changed

+352
-105
lines changed

docs/content/flink/procedures.md

Lines changed: 109 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,16 @@ All available procedures are listed below.
5959
<tr>
6060
<td>compact</td>
6161
<td>
62+
-- Use named argument<br/>
63+
CALL [catalog.]sys.compact(
64+
`table` => 'table',
65+
partitions => 'partitions',
66+
order_strategy => 'order_strategy',
67+
order_by => 'order_by',
68+
options => 'options',
69+
`where` => 'where',
70+
partition_idle_time => 'partition_idle_time') <br/><br/>
71+
-- Use indexed argument<br/>
6272
CALL [catalog.]sys.compact('table') <br/><br/>
6373
CALL [catalog.]sys.compact('table', 'partitions') <br/><br/>
6474
CALL [catalog.]sys.compact('table', 'partitions', 'order_strategy', 'order_by') <br/><br/>
@@ -86,12 +96,21 @@ All available procedures are listed below.
8696
<tr>
8797
<td>compact_database</td>
8898
<td>
99+
-- Use named argument<br/>
100+
CALL [catalog.]sys.compact_database(
101+
including_databases => 'includingDatabases',
102+
mode => 'mode',
103+
including_tables => 'includingTables',
104+
excluding_tables => 'excludingTables',
105+
table_options => 'tableOptions',
106+
partition_idle_time => 'partitionIdleTime') <br/><br/>
107+
-- Use indexed argument<br/>
89108
CALL [catalog.]sys.compact_database() <br/><br/>
90109
CALL [catalog.]sys.compact_database('includingDatabases') <br/><br/>
91110
CALL [catalog.]sys.compact_database('includingDatabases', 'mode') <br/><br/>
92111
CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables') <br/><br/>
93112
CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables') <br/><br/>
94-
CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables', 'tableOptions')
113+
CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables', 'tableOptions') <br/><br/>
95114
CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables', 'tableOptions', 'partitionIdleTime')
96115
</td>
97116
<td>
@@ -106,12 +125,23 @@ All available procedures are listed below.
106125
<li>partition_idle_time: this is used to do a full compaction for partition which had not received any new data for 'partition_idle_time'. And only these partitions will be compacted.</li>
107126
</td>
108127
<td>
109-
CALL sys.compact_database('db1|db2', 'combined', 'table_.*', 'ignore', 'sink.parallelism=4')
128+
CALL sys.compact_database(
129+
including_databases => 'db1|db2',
130+
mode => 'combined',
131+
including_tables => 'table_.*',
132+
excluding_tables => 'ignore',
133+
table_options => 'sink.parallelism=4')
110134
</td>
111135
</tr>
112136
<tr>
113137
<td>create_tag</td>
114138
<td>
139+
-- Use named argument<br/>
140+
-- based on the specified snapshot <br/>
141+
CALL [catalog.]sys.create_tag(`table` => 'identifier', tag => 'tagName', snapshot_id => snapshotId) <br/>
142+
-- based on the latest snapshot <br/>
143+
CALL [catalog.]sys.create_tag(`table` => 'identifier', snapshot_id => 'tagName') <br/><br/>
144+
-- Use indexed argument<br/>
115145
-- based on the specified snapshot <br/>
116146
CALL [catalog.]sys.create_tag('identifier', 'tagName', snapshotId) <br/>
117147
-- based on the latest snapshot <br/>
@@ -125,13 +155,16 @@ All available procedures are listed below.
125155
<li>time_retained: The maximum time retained for newly created tags.</li>
126156
</td>
127157
<td>
128-
CALL sys.create_tag('default.T', 'my_tag', 10, '1 d')
158+
CALL sys.create_tag(`table` => 'default.T', tag => 'my_tag', snapshot_id => cast(10 as bigint), time_retained => '1 d')
129159
</td>
130160
</tr>
131161
<tr>
132162
<td>create_tag_from_timestamp</td>
133163
<td>
134164
-- Create a tag from the first snapshot whose commit-time greater than the specified timestamp. <br/>
165+
-- Use named argument<br/>
166+
CALL [catalog.]sys.create_tag_from_timestamp(`table` => 'identifier', tag => 'tagName', timestamp => timestamp, time_retained => time_retained) <br/><br/>
167+
-- Use indexed argument<br/>
135168
CALL [catalog.]sys.create_tag_from_timestamp('identifier', 'tagName', timestamp, time_retained)
136169
</td>
137170
<td>
@@ -152,6 +185,9 @@ All available procedures are listed below.
152185
<td>create_tag_from_watermark</td>
153186
<td>
154187
-- Create a tag from the first snapshot whose watermark greater than the specified timestamp.<br/>
188+
-- Use named argument<br/>
189+
CALL [catalog.]sys.create_tag_from_watermark(`table` => 'identifier', tag => 'tagName', watermark => watermark, time_retained => time_retained) <br/><br/>
190+
-- Use indexed argument<br/>
155191
CALL [catalog.]sys.create_tag_from_watermark('identifier', 'tagName', watermark, time_retained)
156192
</td>
157193
<td>
@@ -171,6 +207,9 @@ All available procedures are listed below.
171207
<tr>
172208
<td>delete_tag</td>
173209
<td>
210+
-- Use named argument<br/>
211+
CALL [catalog.]sys.delete_tag(`table` => 'identifier', tag => 'tagName') <br/><br/>
212+
-- Use indexed argument<br/>
174213
CALL [catalog.]sys.delete_tag('identifier', 'tagName')
175214
</td>
176215
<td>
@@ -179,17 +218,30 @@ All available procedures are listed below.
179218
<li>tagName: name of the tag to be deleted. If you specify multiple tags, delimiter is ','.</li>
180219
</td>
181220
<td>
182-
CALL sys.delete_tag('default.T', 'my_tag')
221+
CALL sys.delete_tag(`table` => 'default.T', tag => 'my_tag')
183222
</td>
184223
</tr>
185224
<tr>
186225
<td>merge_into</td>
187226
<td>
227+
-- for Flink 1.18<br/>
188228
CALL [catalog].sys.merge_into('identifier','targetAlias',<br/>
189229
'sourceSqls','sourceTable','mergeCondition',<br/>
190230
'matchedUpsertCondition','matchedUpsertSetting',<br/>
191231
'notMatchedInsertCondition','notMatchedInsertValues',<br/>
192232
'matchedDeleteCondition')<br/><br/>
233+
-- for Flink 1.19 and later <br/>
234+
CALL [catalog].sys.merge_into(<br/>
235+
target_table => 'identifier',<br/>
236+
target_alias => 'targetAlias',<br/>
237+
source_sqls => 'sourceSqls',<br/>
238+
source_table => 'sourceTable',<br/>
239+
merge_condition => 'mergeCondition',<br/>
240+
matched_upsert_condition => 'matchedUpsertCondition',<br/>
241+
matched_upsert_setting => 'matchedUpsertSetting',<br/>
242+
not_matched_insert_condition => 'notMatchedInsertCondition',<br/>
243+
not_matched_insert_values => 'notMatchedInsertValues',<br/>
244+
matched_delete_condition => 'matchedDeleteCondition') <br/><br/>
193245
</td>
194246
<td>
195247
To perform "MERGE INTO" syntax. See <a href="/how-to/writing-tables#merging-into-table">merge_into action</a> for
@@ -201,6 +253,9 @@ All available procedures are listed below.
201253
-- and if there is no match,<br/>
202254
-- insert the order from<br/>
203255
-- the source table<br/>
256+
-- for Flink 1.18<br/>
257+
CALL [catalog].sys.merge_into('default.T','','','default.S','T.id=S.order_id','','price=T.price+20','','*','')<br/><br/>
258+
-- for Flink 1.19 and later <br/>
204259
CALL sys.merge_into(<br/>
205260
target_table => 'default.T',<br/>
206261
source_table => 'default.S',<br/>
@@ -212,6 +267,9 @@ All available procedures are listed below.
212267
<tr>
213268
<td>remove_orphan_files</td>
214269
<td>
270+
-- Use named argument<br/>
271+
CALL [catalog.]sys.remove_orphan_files(`table` => 'identifier', older_than => 'olderThan', dry_run => 'dryRun') <br/><br/>
272+
-- Use indexed argument<br/>
215273
CALL [catalog.]sys.remove_orphan_files('identifier')<br/><br/>
216274
CALL [catalog.]sys.remove_orphan_files('identifier', 'olderThan')<br/><br/>
217275
CALL [catalog.]sys.remove_orphan_files('identifier', 'olderThan', 'dryRun')<br/><br/>
@@ -226,15 +284,18 @@ All available procedures are listed below.
226284
<li>dryRun: when true, view only orphan files, don't actually remove files. Default is false.</li>
227285
<li>parallelism: The maximum number of concurrent deleting files. By default is the number of processors available to the Java virtual machine.</li>
228286
</td>
229-
<td>CALL remove_orphan_files('default.T', '2023-10-31 12:00:00')<br/><br/>
230-
CALL remove_orphan_files('default.*', '2023-10-31 12:00:00')<br/><br/>
231-
CALL remove_orphan_files('default.T', '2023-10-31 12:00:00', true)<br/><br/>
232-
CALL remove_orphan_files('default.T', '2023-10-31 12:00:00', false, '5')
287+
<td>CALL remove_orphan_files(`table` => 'default.T', older_than => '2023-10-31 12:00:00')<br/><br/>
288+
CALL remove_orphan_files(`table` => 'default.*', older_than => '2023-10-31 12:00:00')<br/><br/>
289+
CALL remove_orphan_files(`table` => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => true)<br/><br/>
290+
CALL remove_orphan_files(`table` => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => false, parallelism => '5')
233291
</td>
234292
</tr>
235293
<tr>
236294
<td>reset_consumer</td>
237295
<td>
296+
-- Use named argument<br/>
297+
CALL [catalog.]sys.reset_consumer(`table` => 'identifier', consumer_id => 'consumerId', next_snapshot_id => 'nextSnapshotId') <br/><br/>
298+
-- Use indexed argument<br/>
238299
-- reset the new next snapshot id in the consumer<br/>
239300
CALL [catalog.]sys.reset_consumer('identifier', 'consumerId', nextSnapshotId)<br/><br/>
240301
-- delete consumer<br/>
@@ -246,11 +307,17 @@ All available procedures are listed below.
246307
<li>consumerId: consumer to be reset or deleted.</li>
247308
<li>nextSnapshotId (Long): the new next snapshot id of the consumer.</li>
248309
</td>
249-
<td>CALL sys.reset_consumer('default.T', 'myid', 10)</td>
310+
<td>CALL sys.reset_consumer(`table` => 'default.T', consumer_id => 'myid', next_snapshot_id => cast(10 as bigint))</td>
250311
</tr>
251312
<tr>
252313
<td>rollback_to</td>
253314
<td>
315+
-- for Flink 1.18<br/>
316+
-- rollback to a snapshot<br/>
317+
CALL sys.rollback_to('identifier', snapshotId)<br/><br/>
318+
-- rollback to a tag<br/>
319+
CALL sys.rollback_to('identifier', 'tagName')<br/><br/>
320+
-- for Flink 1.19 and later<br/>
254321
-- rollback to a snapshot<br/>
255322
CALL sys.rollback_to(`table` => 'identifier', snapshot_id => snapshotId)<br/><br/>
256323
-- rollback to a tag<br/>
@@ -262,11 +329,24 @@ All available procedures are listed below.
262329
<li>snapshotId (Long): id of the snapshot that will roll back to.</li>
263330
<li>tagName: name of the tag that will roll back to.</li>
264331
</td>
265-
<td>CALL sys.rollback_to(`table` => 'default.T', snapshot_id => 10)</td>
332+
<td>
333+
-- for Flink 1.18<br/>
334+
CALL sys.rollback_to('default.T', 10)
335+
-- for Flink 1.19 and later<br/>
336+
CALL sys.rollback_to(`table` => 'default.T', snapshot_id => 10)
337+
</td>
266338
</tr>
267339
<tr>
268340
<td>expire_snapshots</td>
269341
<td>
342+
-- Use named argument<br/>
343+
CALL [catalog.]sys.reset_consumer(<br/>
344+
`table` => 'identifier', <br/>
345+
retain_max => 'retain_max', <br/>
346+
retain_min => 'retain_min', <br/>
347+
older_than => 'older_than', <br/>
348+
max_deletes => 'max_deletes') <br/><br/>
349+
-- Use indexed argument<br/>
270350
-- for Flink 1.18<br/>
271351
CALL sys.expire_snapshots(table, retain_max)<br/><br/>
272352
-- for Flink 1.19 and later<br/>
@@ -329,11 +409,14 @@ All available procedures are listed below.
329409
<li>databaseName : the target database name.</li>
330410
<li>tableName: the target table identifier.</li>
331411
</td>
332-
<td>CALL sys.repair('test_db.T')</td>
412+
<td>CALL sys.repair(`table` => 'test_db.T')</td>
333413
</tr>
334414
<tr>
335415
<td>rewrite_file_index</td>
336416
<td>
417+
-- Use named argument<br/>
418+
CALL sys.rewrite_file_index(&lt`table` => identifier&gt [, &ltpartitions => partitions&gt])<br/><br/>
419+
-- Use indexed argument<br/>
337420
CALL sys.rewrite_file_index(&ltidentifier&gt [, &ltpartitions&gt])<br/><br/>
338421
</td>
339422
<td>
@@ -343,13 +426,16 @@ All available procedures are listed below.
343426
</td>
344427
<td>
345428
-- rewrite the file index for the whole table<br/>
346-
CALL sys.rewrite_file_index('test_db.T')<br/><br/>
429+
CALL sys.rewrite_file_index(`table` => 'test_db.T')<br/><br/>
347430
-- repair all tables in a specific partition<br/>
348-
CALL sys.rewrite_file_index('test_db.T', 'pt=a')<br/><br/>
431+
CALL sys.rewrite_file_index(`table` => 'test_db.T', partitions => 'pt=a')<br/><br/>
349432
</td>
350433
<tr>
351434
<td>create_branch</td>
352435
<td>
436+
-- Use named argument<br/>
437+
CALL [catalog.]sys.create_branch(`table` => 'identifier', branch => 'branchName', tag => 'tagName')<br/><br/>
438+
-- Use indexed argument<br/>
353439
-- based on the specified tag <br/>
354440
CALL [catalog.]sys.create_branch('identifier', 'branchName', 'tagName')
355441
-- create empty branch <br/>
@@ -362,13 +448,16 @@ All available procedures are listed below.
362448
<li>tagName: name of the tag which the new branch is based on.</li>
363449
</td>
364450
<td>
365-
CALL sys.create_branch('default.T', 'branch1', 'tag1')<br/><br/>
366-
CALL sys.create_branch('default.T', 'branch1')<br/><br/>
451+
CALL sys.create_branch(`table` => 'default.T', branch => 'branch1', tag => 'tag1')<br/><br/>
452+
CALL sys.create_branch(`table` => 'default.T', branch => 'branch1')<br/><br/>
367453
</td>
368454
</tr>
369455
<tr>
370456
<td>delete_branch</td>
371457
<td>
458+
-- Use named argument<br/>
459+
CALL [catalog.]sys.delete_branch(`table` => 'identifier', branch => 'branchName')<br/><br/>
460+
-- Use indexed argument<br/>
372461
CALL [catalog.]sys.delete_branch('identifier', 'branchName')
373462
</td>
374463
<td>
@@ -377,12 +466,15 @@ All available procedures are listed below.
377466
<li>branchName: name of the branch to be deleted. If you specify multiple branches, delimiter is ','.</li>
378467
</td>
379468
<td>
380-
CALL sys.delete_branch('default.T', 'branch1')
469+
CALL sys.delete_branch(`table` => 'default.T', branch => 'branch1')
381470
</td>
382471
</tr>
383472
<tr>
384473
<td>fast_forward</td>
385474
<td>
475+
-- Use named argument<br/>
476+
CALL [catalog.]sys.fast_forward(`table` => 'identifier', branch => 'branchName')<br/><br/>
477+
-- Use indexed argument<br/>
386478
CALL [catalog.]sys.fast_forward('identifier', 'branchName')
387479
</td>
388480
<td>
@@ -391,7 +483,7 @@ All available procedures are listed below.
391483
<li>branchName: name of the branch to be merged.</li>
392484
</td>
393485
<td>
394-
CALL sys.fast_forward('default.T', 'branch1')
486+
CALL sys.fast_forward(`table` => 'default.T', branch => 'branch1')
395487
</td>
396488
</tr>
397489
</tbody>

docs/content/flink/sql-query.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,21 @@ First, you need to stop the streaming task using this consumer ID, and then exec
204204

205205
Run the following command:
206206

207+
{{< tabs "reset_consumer" >}}
208+
209+
{{< tab "Flink SQL" >}}
210+
211+
```sql
212+
CALL sys.reset_consumer(
213+
`table` => 'database_name.table_name',
214+
consumer_id => 'consumer_id',
215+
next_snapshot_id -> <snapshot_id>
216+
);
217+
```
218+
{{< /tab >}}
219+
220+
{{< tab "Flink Action" >}}
221+
207222
```bash
208223
<FLINK_HOME>/bin/flink run \
209224
/path/to/paimon-flink-action-{{< version >}}.jar \
@@ -215,6 +230,9 @@ Run the following command:
215230
[--next_snapshot <next-snapshot-id>] \
216231
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]
217232
```
233+
{{< /tab >}}
234+
235+
{{< /tabs >}}
218236
219237
please don't specify --next_snapshot parameter if you want to delete the consumer.
220238

docs/content/learn-paimon/understand-files.md

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,22 @@ is needed in order to reduce the number of small files.
245245
Let's trigger the full-compaction now, and run a dedicated compaction job through `flink run`:
246246

247247
{{< label Batch >}}
248+
249+
{{< tabs "compact" >}}
250+
251+
{{< tab "Flink SQL" >}}
252+
```sql
253+
CALL sys.compact(
254+
`table` => 'database_name.table_name',
255+
partitions => 'partition_name',
256+
order_strategy => 'order_strategy',
257+
order_by => 'order_by',
258+
options => 'paimon_table_dynamic_conf'
259+
);
260+
```
261+
{{< /tab >}}
262+
263+
{{< tab "Flink Action" >}}
248264
```bash
249265
<FLINK_HOME>/bin/flink run \
250266
-D execution.runtime-mode=batch \
@@ -257,15 +273,32 @@ Let's trigger the full-compaction now, and run a dedicated compaction job throug
257273
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] \
258274
[--table_conf <paimon-table-dynamic-conf> [--table_conf <paimon-table-dynamic-conf>] ...]
259275
```
276+
{{< /tab >}}
277+
278+
{{< /tabs >}}
260279
261280
an example would be (suppose you're already in Flink home)
262281
282+
{{< tabs "compact example" >}}
283+
284+
{{< tab "Flink SQL" >}}
285+
286+
```sql
287+
CALL sys.compact('T');
288+
```
289+
{{< /tab >}}
290+
291+
{{< tab "Flink Action" >}}
292+
263293
```bash
264294
./bin/flink run \
265295
./lib/paimon-flink-action-{{< version >}}.jar \
266296
compact \
267297
--path file:///tmp/paimon/default.db/T
268298
```
299+
{{< /tab >}}
300+
301+
{{< /tabs >}}
269302
270303
All current table files will be compacted and a new snapshot, namely `snapshot-4`, is
271304
made and contains the following information:

0 commit comments

Comments
 (0)