Skip to content

Commit 721f723

Browse files
committed
[core] Introduce ClearConsumersProcedure to clear consumers
Change-Id: I05d02f5c48c82d633436260f1ca5f6d0e9ddfcc4
1 parent 5dfc597 commit 721f723

File tree

7 files changed

+499
-0
lines changed

7 files changed

+499
-0
lines changed

paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,28 @@ public void expire(LocalDateTime expireDateTime) {
107107
}
108108
}
109109

110+
/** Clear consumers. */
111+
public void clearConsumers(List<String> consumerIds, Boolean clearUnspecified) {
112+
try {
113+
listVersionedFileStatus(fileIO, consumerDirectory(), CONSUMER_PREFIX)
114+
.forEach(
115+
status -> {
116+
String consumerName =
117+
status.getPath()
118+
.getName()
119+
.substring(CONSUMER_PREFIX.length());
120+
if (consumerIds == null
121+
|| (!clearUnspecified && consumerIds.contains(consumerName))
122+
|| (clearUnspecified
123+
&& !consumerIds.contains(consumerName))) {
124+
fileIO.deleteQuietly(status.getPath());
125+
}
126+
});
127+
} catch (IOException e) {
128+
throw new RuntimeException(e);
129+
}
130+
}
131+
110132
/** Get all consumer. */
111133
public Map<String, Long> consumers() throws IOException {
112134
Map<String, Long> consumers = new HashMap<>();
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.flink.procedure;
20+
21+
import org.apache.paimon.catalog.Catalog;
22+
import org.apache.paimon.catalog.Identifier;
23+
import org.apache.paimon.consumer.ConsumerManager;
24+
import org.apache.paimon.table.FileStoreTable;
25+
26+
import org.apache.flink.table.procedure.ProcedureContext;
27+
28+
import java.util.Arrays;
29+
import java.util.Collections;
30+
import java.util.List;
31+
import java.util.Optional;
32+
33+
/**
34+
* Clear consumers procedure. Usage:
35+
*
36+
* <pre><code>
37+
* -- clear all consumers except the specified consumer in the table
38+
* CALL sys.clear_consumers('tableId', 'consumerIds', true)
39+
*
40+
* -- clear all specified consumers in the table
41+
* CALL sys.clear_consumers('tableId', 'consumerIds') or CALL sys.clear_consumers('tableId', 'consumerIds', false)
42+
*
43+
* -- clear all consumers in the table
44+
* CALL sys.clear_unspecified_consumers('tableId')
45+
* </code></pre>
46+
*/
47+
public class ClearConsumersProcedure extends ProcedureBase {
48+
49+
public static final String IDENTIFIER = "clear_consumers";
50+
51+
public String[] call(
52+
ProcedureContext procedureContext,
53+
String tableId,
54+
String consumerIds,
55+
Boolean clearUnspecified)
56+
throws Catalog.TableNotExistException {
57+
FileStoreTable fileStoreTable =
58+
(FileStoreTable) catalog.getTable(Identifier.fromString(tableId));
59+
ConsumerManager consumerManager =
60+
new ConsumerManager(
61+
fileStoreTable.fileIO(),
62+
fileStoreTable.location(),
63+
fileStoreTable.snapshotManager().branch());
64+
List<String> specifiedConsumerIds =
65+
Optional.of(consumerIds)
66+
.map(s -> Arrays.asList(s.split(",")))
67+
.orElse(Collections.emptyList());
68+
consumerManager.clearConsumers(
69+
specifiedConsumerIds, Optional.of(clearUnspecified).orElse(false));
70+
71+
return new String[] {"Success"};
72+
}
73+
74+
public String[] call(ProcedureContext procedureContext, String tableId, String consumerIds)
75+
throws Catalog.TableNotExistException {
76+
return call(procedureContext, tableId, consumerIds, false);
77+
}
78+
79+
public String[] call(ProcedureContext procedureContext, String tableId)
80+
throws Catalog.TableNotExistException {
81+
FileStoreTable fileStoreTable =
82+
(FileStoreTable) catalog.getTable(Identifier.fromString(tableId));
83+
ConsumerManager consumerManager =
84+
new ConsumerManager(
85+
fileStoreTable.fileIO(),
86+
fileStoreTable.location(),
87+
fileStoreTable.snapshotManager().branch());
88+
consumerManager.clearConsumers(null, null);
89+
90+
return new String[] {"Success"};
91+
}
92+
93+
@Override
94+
public String identifier() {
95+
return IDENTIFIER;
96+
}
97+
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.flink.procedure;
20+
21+
import org.apache.paimon.catalog.Catalog;
22+
import org.apache.paimon.catalog.Identifier;
23+
import org.apache.paimon.consumer.ConsumerManager;
24+
import org.apache.paimon.table.FileStoreTable;
25+
26+
import org.apache.flink.table.annotation.ArgumentHint;
27+
import org.apache.flink.table.annotation.DataTypeHint;
28+
import org.apache.flink.table.annotation.ProcedureHint;
29+
import org.apache.flink.table.procedure.ProcedureContext;
30+
31+
import java.util.Arrays;
32+
import java.util.Collections;
33+
import java.util.List;
34+
import java.util.Optional;
35+
36+
/**
37+
* Clear consumers procedure. Usage:
38+
*
39+
* <pre><code>
40+
* -- clear all consumers except the specified consumer in the table
41+
* CALL sys.clear_consumers('tableId', 'consumerIds', true)
42+
*
43+
* -- clear all specified consumers in the table
44+
* CALL sys.clear_consumers('tableId', 'consumerIds') or CALL sys.clear_consumers('tableId', 'consumerIds', false)
45+
*
46+
* -- clear all consumers in the table
47+
* CALL sys.clear_unspecified_consumers('tableId')
48+
* </code></pre>
49+
*/
50+
public class ClearConsumersProcedure extends ProcedureBase {
51+
52+
public static final String IDENTIFIER = "clear_consumers";
53+
54+
@ProcedureHint(
55+
argument = {
56+
@ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
57+
@ArgumentHint(
58+
name = "consumer_ids",
59+
type = @DataTypeHint("STRING"),
60+
isOptional = true),
61+
@ArgumentHint(
62+
name = "clear_unspecified",
63+
type = @DataTypeHint("BOOLEAN"),
64+
isOptional = true)
65+
})
66+
public String[] call(
67+
ProcedureContext procedureContext,
68+
String tableId,
69+
String consumerIds,
70+
Boolean clearUnspecified)
71+
throws Catalog.TableNotExistException {
72+
FileStoreTable fileStoreTable =
73+
(FileStoreTable) catalog.getTable(Identifier.fromString(tableId));
74+
ConsumerManager consumerManager =
75+
new ConsumerManager(
76+
fileStoreTable.fileIO(),
77+
fileStoreTable.location(),
78+
fileStoreTable.snapshotManager().branch());
79+
if (consumerIds != null) {
80+
List<String> specifiedConsumerIds =
81+
Optional.of(consumerIds)
82+
.map(s -> Arrays.asList(s.split(",")))
83+
.orElse(Collections.emptyList());
84+
consumerManager.clearConsumers(
85+
specifiedConsumerIds, Optional.ofNullable(clearUnspecified).orElse(false));
86+
} else {
87+
consumerManager.clearConsumers(null, null);
88+
}
89+
90+
return new String[] {"Success"};
91+
}
92+
93+
@Override
94+
public String identifier() {
95+
return IDENTIFIER;
96+
}
97+
}

paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,3 +82,4 @@ org.apache.paimon.flink.procedure.MarkPartitionDoneProcedure
8282
org.apache.paimon.flink.procedure.CloneProcedure
8383
org.apache.paimon.flink.procedure.CompactManifestProcedure
8484
org.apache.paimon.flink.procedure.RefreshObjectTableProcedure
85+
org.apache.paimon.flink.procedure.ClearConsumersProcedure

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,4 +312,159 @@ public void testResetBranchConsumer(String invoker) throws Exception {
312312
Optional<Consumer> consumer3 = consumerManager.consumer("myid");
313313
assertThat(consumer3).isNotPresent();
314314
}
315+
316+
@ParameterizedTest
317+
@Timeout(120)
318+
@ValueSource(strings = {"procedure_indexed", "procedure_named"})
319+
public void testClearConsumers(String invoker) throws Exception {
320+
init(warehouse);
321+
322+
RowType rowType =
323+
RowType.of(
324+
new DataType[] {DataTypes.BIGINT(), DataTypes.STRING()},
325+
new String[] {"pk1", "col1"});
326+
FileStoreTable table =
327+
createFileStoreTable(
328+
rowType,
329+
Collections.emptyList(),
330+
Collections.singletonList("pk1"),
331+
Collections.emptyList(),
332+
Collections.emptyMap());
333+
334+
StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser);
335+
write = writeBuilder.newWrite();
336+
commit = writeBuilder.newCommit();
337+
338+
// 3 snapshots
339+
writeData(rowData(1L, BinaryString.fromString("Hi")));
340+
writeData(rowData(2L, BinaryString.fromString("Hello")));
341+
writeData(rowData(3L, BinaryString.fromString("Paimon")));
342+
343+
// use consumer streaming read table
344+
BlockingIterator<Row, Row> iterator1 =
345+
testStreamingRead(
346+
"SELECT * FROM `"
347+
+ tableName
348+
+ "` /*+ OPTIONS('consumer-id'='myid1','consumer.expiration-time'='3h') */",
349+
Arrays.asList(
350+
changelogRow("+I", 1L, "Hi"),
351+
changelogRow("+I", 2L, "Hello"),
352+
changelogRow("+I", 3L, "Paimon")));
353+
354+
ConsumerManager consumerManager = new ConsumerManager(table.fileIO(), table.location());
355+
while (!consumerManager.consumer("myid1").isPresent()) {
356+
Thread.sleep(1000);
357+
}
358+
iterator1.close();
359+
360+
// use consumer streaming read table
361+
BlockingIterator<Row, Row> iterator2 =
362+
testStreamingRead(
363+
"SELECT * FROM `"
364+
+ tableName
365+
+ "` /*+ OPTIONS('consumer-id'='myid2','consumer.expiration-time'='3h') */",
366+
Arrays.asList(
367+
changelogRow("+I", 1L, "Hi"),
368+
changelogRow("+I", 2L, "Hello"),
369+
changelogRow("+I", 3L, "Paimon")));
370+
371+
while (!consumerManager.consumer("myid2").isPresent()) {
372+
Thread.sleep(1000);
373+
}
374+
iterator2.close();
375+
376+
// use consumer streaming read table
377+
BlockingIterator<Row, Row> iterator3 =
378+
testStreamingRead(
379+
"SELECT * FROM `"
380+
+ tableName
381+
+ "` /*+ OPTIONS('consumer-id'='myid3','consumer.expiration-time'='3h') */",
382+
Arrays.asList(
383+
changelogRow("+I", 1L, "Hi"),
384+
changelogRow("+I", 2L, "Hello"),
385+
changelogRow("+I", 3L, "Paimon")));
386+
387+
while (!consumerManager.consumer("myid3").isPresent()) {
388+
Thread.sleep(1000);
389+
}
390+
iterator3.close();
391+
392+
Optional<Consumer> consumer1 = consumerManager.consumer("myid1");
393+
Optional<Consumer> consumer2 = consumerManager.consumer("myid2");
394+
Optional<Consumer> consumer3 = consumerManager.consumer("myid3");
395+
assertThat(consumer1).isPresent();
396+
assertThat(consumer2).isPresent();
397+
assertThat(consumer3).isPresent();
398+
399+
// clear all consumers except the specified consumer in the table
400+
switch (invoker) {
401+
case "procedure_indexed":
402+
executeSQL(
403+
String.format(
404+
"CALL sys.clear_consumers('%s.%s', 'myid1,myid2', true)",
405+
database, tableName));
406+
break;
407+
case "procedure_named":
408+
executeSQL(
409+
String.format(
410+
"CALL sys.clear_consumers(`table` => '%s.%s', consumer_ids => 'myid1,myid2', clear_unspecified => cast(true as BOOLEAN))",
411+
database, tableName));
412+
break;
413+
default:
414+
throw new UnsupportedOperationException(invoker);
415+
}
416+
417+
Optional<Consumer> consumer4 = consumerManager.consumer("myid1");
418+
Optional<Consumer> consumer5 = consumerManager.consumer("myid2");
419+
Optional<Consumer> consumer6 = consumerManager.consumer("myid3");
420+
assertThat(consumer4).isPresent();
421+
assertThat(consumer5).isPresent();
422+
assertThat(consumer6).isNotPresent();
423+
424+
// clear all specified consumers in the table
425+
switch (invoker) {
426+
case "procedure_indexed":
427+
executeSQL(
428+
String.format(
429+
"CALL sys.clear_consumers('%s.%s', 'myid1')", database, tableName));
430+
break;
431+
case "procedure_named":
432+
executeSQL(
433+
String.format(
434+
"CALL sys.clear_consumers(`table` => '%s.%s', consumer_ids => 'myid1')",
435+
database, tableName));
436+
break;
437+
default:
438+
throw new UnsupportedOperationException(invoker);
439+
}
440+
441+
Optional<Consumer> consumer7 = consumerManager.consumer("myid1");
442+
Optional<Consumer> consumer8 = consumerManager.consumer("myid2");
443+
Optional<Consumer> consumer9 = consumerManager.consumer("myid3");
444+
assertThat(consumer7).isNotPresent();
445+
assertThat(consumer8).isPresent();
446+
assertThat(consumer9).isNotPresent();
447+
448+
// clear all consumers in the table
449+
switch (invoker) {
450+
case "procedure_indexed":
451+
executeSQL(String.format("CALL sys.clear_consumers('%s.%s')", database, tableName));
452+
break;
453+
case "procedure_named":
454+
executeSQL(
455+
String.format(
456+
"CALL sys.clear_consumers(`table` => '%s.%s')",
457+
database, tableName));
458+
break;
459+
default:
460+
throw new UnsupportedOperationException(invoker);
461+
}
462+
463+
Optional<Consumer> consumer10 = consumerManager.consumer("myid1");
464+
Optional<Consumer> consumer11 = consumerManager.consumer("myid2");
465+
Optional<Consumer> consumer12 = consumerManager.consumer("myid3");
466+
assertThat(consumer10).isNotPresent();
467+
assertThat(consumer11).isNotPresent();
468+
assertThat(consumer12).isNotPresent();
469+
}
315470
}

0 commit comments

Comments
 (0)