Skip to content

Commit 9a5bf99

Browse files
authored
[core] Introduce ClearConsumersProcedure to clear consumers (#4893)
1 parent ecdf46f commit 9a5bf99

File tree

11 files changed

+726
-0
lines changed

11 files changed

+726
-0
lines changed

docs/content/flink/procedures.md

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,31 @@ All available procedures are listed below.
388388
</td>
389389
<td>CALL sys.reset_consumer(`table` => 'default.T', consumer_id => 'myid', next_snapshot_id => cast(10 as bigint))</td>
390390
</tr>
391+
<tr>
392+
<td>clear_consumers</td>
393+
<td>
394+
-- Use named argument<br/>
395+
CALL [catalog.]sys.clear_consumers(`table` => 'identifier', including_consumers => 'includingConsumers', excluding_consumers => 'excludingConsumers') <br/><br/>
396+
-- Use indexed argument<br/>
397+
-- clear all consumers in the table
398+
CALL [catalog.]sys.clear_consumers('identifier')
399+
-- clear some consumers in the table (accept regular expression)<br/>
400+
CALL [catalog.]sys.clear_consumers('identifier', 'includingConsumers')<br/><br/>
401+
-- exclude some consumers (accept regular expression)<br/>
402+
CALL [catalog.]sys.clear_consumers('identifier', 'includingConsumers', 'excludingConsumers')
403+
</td>
404+
<td>
405+
To reset or delete consumer. Arguments:
406+
<li>identifier: the target table identifier. Cannot be empty.</li>
407+
<li>includingConsumers: consumers to be cleared.</li>
408+
<li>excludingConsumers: consumers which not to be cleared.</li>
409+
</td>
410+
<td>CALL sys.clear_consumers(`table` => 'default.T')<br/><br/>
411+
CALL sys.clear_consumers(`table` => 'default.T', including_consumers => 'myid.*')<br/><br/>
412+
CALL sys.clear_consumers(table => 'default.T', including_consumers => '', excluding_consumers => 'myid1.*')<br/><br/>
413+
CALL sys.clear_consumers(table => 'default.T', including_consumers => 'myid.*', excluding_consumers => 'myid1.*')
414+
</td>
415+
</tr>
391416
<tr>
392417
<td>rollback_to</td>
393418
<td>

docs/content/spark/procedures.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,25 @@ This section introduce all available spark procedures about paimon.
338338
-- delete consumer<br/>
339339
CALL sys.reset_consumer(table => 'default.T', consumerId => 'myid')
340340
</td>
341+
</tr>
342+
<tr>
343+
<td>clear_consumers</td>
344+
<td>
345+
To clear consumers. Arguments:
346+
<li>identifier: the target table identifier. Cannot be empty.</li>
347+
<li>includingConsumers: consumers to be cleared.</li>
348+
<li>excludingConsumers: consumers which not to be cleared.</li>
349+
</td>
350+
<td>
351+
-- clear all consumers in the table<br/>
352+
CALL sys.clear_consumers(table => 'default.T')<br/><br/>
353+
-- clear some consumers in the table (accept regular expression)<br/>
354+
CALL sys.clear_consumers(table => 'default.T', includingConsumers => 'myid.*')<br/><br/>
355+
-- clear all consumers except excludingConsumers in the table (accept regular expression)<br/>
356+
CALL sys.clear_consumers(table => 'default.T', includingConsumers => '', excludingConsumers => 'myid1.*')<br/><br/>
357+
-- clear all consumers with includingConsumers and excludingConsumers (accept regular expression)<br/>
358+
CALL sys.clear_consumers(table => 'default.T', includingConsumers => 'myid.*', excludingConsumers => 'myid1.*')
359+
</td>
341360
</tr>
342361
<tr>
343362
<td>mark_partition_done</td>

paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.Map;
3333
import java.util.Optional;
3434
import java.util.OptionalLong;
35+
import java.util.regex.Pattern;
3536
import java.util.stream.Collectors;
3637

3738
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
@@ -107,6 +108,35 @@ public void expire(LocalDateTime expireDateTime) {
107108
}
108109
}
109110

111+
/** Clear consumers. */
112+
public void clearConsumers(Pattern includingPattern, Pattern excludingPattern) {
113+
try {
114+
listVersionedFileStatus(fileIO, consumerDirectory(), CONSUMER_PREFIX)
115+
.forEach(
116+
fileStatus -> {
117+
String consumerName =
118+
fileStatus
119+
.getPath()
120+
.getName()
121+
.substring(CONSUMER_PREFIX.length());
122+
boolean shouldClear =
123+
includingPattern.matcher(consumerName).matches();
124+
if (excludingPattern != null) {
125+
shouldClear =
126+
shouldClear
127+
&& !excludingPattern
128+
.matcher(consumerName)
129+
.matches();
130+
}
131+
if (shouldClear) {
132+
fileIO.deleteQuietly(fileStatus.getPath());
133+
}
134+
});
135+
} catch (IOException e) {
136+
throw new RuntimeException(e);
137+
}
138+
}
139+
110140
/** Get all consumer. */
111141
public Map<String, Long> consumers() throws IOException {
112142
Map<String, Long> consumers = new HashMap<>();
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.flink.procedure;
20+
21+
import org.apache.paimon.catalog.Catalog;
22+
import org.apache.paimon.catalog.Identifier;
23+
import org.apache.paimon.consumer.ConsumerManager;
24+
import org.apache.paimon.table.FileStoreTable;
25+
import org.apache.paimon.utils.StringUtils;
26+
27+
import org.apache.flink.table.procedure.ProcedureContext;
28+
29+
import java.util.regex.Pattern;
30+
31+
/**
32+
* Clear consumers procedure. Usage:
33+
*
34+
* <pre><code>
35+
* -- NOTE: use '' as placeholder for optional arguments
36+
*
37+
* -- clear all consumers in the table
38+
* CALL sys.clear_consumers('tableId')
39+
*
40+
* -- clear some consumers in the table (accept regular expression)
41+
* CALL sys.clear_consumers('tableId', 'includingConsumers')
42+
*
43+
* -- exclude some consumers (accept regular expression)
44+
* CALL sys.clear_consumers('tableId', 'includingConsumers', 'excludingConsumers')
45+
* </code></pre>
46+
*/
47+
public class ClearConsumersProcedure extends ProcedureBase {
48+
49+
public static final String IDENTIFIER = "clear_consumers";
50+
51+
public String[] call(
52+
ProcedureContext procedureContext,
53+
String tableId,
54+
String includingConsumers,
55+
String excludingConsumers)
56+
throws Catalog.TableNotExistException {
57+
FileStoreTable fileStoreTable =
58+
(FileStoreTable) catalog.getTable(Identifier.fromString(tableId));
59+
ConsumerManager consumerManager =
60+
new ConsumerManager(
61+
fileStoreTable.fileIO(),
62+
fileStoreTable.location(),
63+
fileStoreTable.snapshotManager().branch());
64+
65+
Pattern includingPattern =
66+
StringUtils.isNullOrWhitespaceOnly(includingConsumers)
67+
? Pattern.compile(".*")
68+
: Pattern.compile(includingConsumers);
69+
Pattern excludingPattern =
70+
StringUtils.isNullOrWhitespaceOnly(excludingConsumers)
71+
? null
72+
: Pattern.compile(excludingConsumers);
73+
consumerManager.clearConsumers(includingPattern, excludingPattern);
74+
75+
return new String[] {"Success"};
76+
}
77+
78+
public String[] call(
79+
ProcedureContext procedureContext, String tableId, String includingConsumers)
80+
throws Catalog.TableNotExistException {
81+
return call(procedureContext, tableId, includingConsumers, null);
82+
}
83+
84+
public String[] call(ProcedureContext procedureContext, String tableId)
85+
throws Catalog.TableNotExistException {
86+
return call(procedureContext, tableId, null, null);
87+
}
88+
89+
@Override
90+
public String identifier() {
91+
return IDENTIFIER;
92+
}
93+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.flink.action;
20+
21+
import org.apache.paimon.consumer.ConsumerManager;
22+
import org.apache.paimon.table.FileStoreTable;
23+
import org.apache.paimon.utils.StringUtils;
24+
25+
import javax.annotation.Nullable;
26+
27+
import java.util.Map;
28+
import java.util.regex.Pattern;
29+
30+
/** Clear consumers action for Flink. */
31+
public class ClearConsumerAction extends TableActionBase {
32+
33+
private String includingConsumers;
34+
private String excludingConsumers;
35+
36+
protected ClearConsumerAction(
37+
String databaseName, String tableName, Map<String, String> catalogConfig) {
38+
super(databaseName, tableName, catalogConfig);
39+
}
40+
41+
public ClearConsumerAction withIncludingConsumers(@Nullable String includingConsumers) {
42+
this.includingConsumers = includingConsumers;
43+
return this;
44+
}
45+
46+
public ClearConsumerAction withExcludingConsumers(@Nullable String excludingConsumers) {
47+
this.excludingConsumers = excludingConsumers;
48+
return this;
49+
}
50+
51+
@Override
52+
public void run() throws Exception {
53+
FileStoreTable dataTable = (FileStoreTable) table;
54+
ConsumerManager consumerManager =
55+
new ConsumerManager(
56+
dataTable.fileIO(),
57+
dataTable.location(),
58+
dataTable.snapshotManager().branch());
59+
60+
Pattern includingPattern =
61+
StringUtils.isNullOrWhitespaceOnly(includingConsumers)
62+
? Pattern.compile(".*")
63+
: Pattern.compile(includingConsumers);
64+
Pattern excludingPattern =
65+
StringUtils.isNullOrWhitespaceOnly(excludingConsumers)
66+
? null
67+
: Pattern.compile(excludingConsumers);
68+
consumerManager.clearConsumers(includingPattern, excludingPattern);
69+
}
70+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.flink.action;
20+
21+
import java.util.Optional;
22+
23+
/** Factory to create {@link ClearConsumerAction}. */
24+
public class ClearConsumerActionFactory implements ActionFactory {
25+
26+
public static final String IDENTIFIER = "clear_consumers";
27+
28+
private static final String INCLUDING_CONSUMERS = "including_consumers";
29+
private static final String EXCLUDING_CONSUMERS = "excluding_consumers";
30+
31+
@Override
32+
public String identifier() {
33+
return IDENTIFIER;
34+
}
35+
36+
@Override
37+
public Optional<Action> create(MultipleParameterToolAdapter params) {
38+
ClearConsumerAction action =
39+
new ClearConsumerAction(
40+
params.getRequired(DATABASE),
41+
params.getRequired(TABLE),
42+
catalogConfigMap(params));
43+
44+
if (params.has(INCLUDING_CONSUMERS)) {
45+
action.withIncludingConsumers(params.get(INCLUDING_CONSUMERS));
46+
}
47+
48+
if (params.has(EXCLUDING_CONSUMERS)) {
49+
action.withExcludingConsumers(params.get(EXCLUDING_CONSUMERS));
50+
}
51+
52+
return Optional.of(action);
53+
}
54+
55+
@Override
56+
public void printHelp() {
57+
System.out.println(
58+
"Action \"clear_consumers\" clear consumers with including consumers and excluding consumers.");
59+
System.out.println();
60+
61+
System.out.println("Syntax:");
62+
System.out.println(
63+
" clear_consumers --warehouse <warehouse_path> --database <database_name> "
64+
+ "--table <table_name> [--including_consumers <including_pattern> --excluding_consumers <excluding_pattern>]");
65+
66+
System.out.println();
67+
System.out.println("Note:");
68+
System.out.println(
69+
" use '' as placeholder for including_consumers if you want to clear all consumers except excludingConsumers in the table.");
70+
System.out.println();
71+
}
72+
}

0 commit comments

Comments
 (0)