Skip to content

Commit 62db319

Browse files
authored
[lake] Tiering service support commit by time (apache#2185)
1 parent 3c7f502 commit 62db319

File tree

27 files changed

+1589
-316
lines changed

27 files changed

+1589
-316
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/LakeTieringJobBuilder.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ public JobClient build() throws Exception {
8989
tieringSourceBuilder.withPollTieringTableIntervalMs(
9090
flussConfig.get(POLL_TIERING_TABLE_INTERVAL).toMillis());
9191
}
92+
9293
TieringSource<?> tieringSource = tieringSourceBuilder.build();
9394
DataStreamSource<?> source =
9495
env.fromSource(

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,10 @@ private Committable commitWriteResults(
178178
// empty, means all write result is null, which is a empty commit,
179179
// return null to skip the empty commit
180180
if (committableWriteResults.isEmpty()) {
181+
LOG.info(
182+
"Commit tiering write results is empty for table {}, table path {}",
183+
tableId,
184+
tablePath);
181185
return null;
182186
}
183187

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.flink.tiering.event;
19+
20+
import org.apache.flink.api.connector.source.SourceEvent;
21+
22+
import java.util.Objects;
23+
24+
/**
25+
* SourceEvent used to notify TieringSourceReader that a table has reached the maximum tiering
26+
* duration and should be force completed.
27+
*/
28+
public class TieringReachMaxDurationEvent implements SourceEvent {
29+
30+
private static final long serialVersionUID = 1L;
31+
32+
private final long tableId;
33+
34+
public TieringReachMaxDurationEvent(long tableId) {
35+
this.tableId = tableId;
36+
}
37+
38+
public long getTableId() {
39+
return tableId;
40+
}
41+
42+
@Override
43+
public boolean equals(Object o) {
44+
if (this == o) {
45+
return true;
46+
}
47+
if (!(o instanceof TieringReachMaxDurationEvent)) {
48+
return false;
49+
}
50+
TieringReachMaxDurationEvent that = (TieringReachMaxDurationEvent) o;
51+
return tableId == that.tableId;
52+
}
53+
54+
@Override
55+
public int hashCode() {
56+
return Objects.hashCode(tableId);
57+
}
58+
59+
@Override
60+
public String toString() {
61+
return "TieringReachMaxDurationEvent{" + "tableId=" + tableId + '}';
62+
}
63+
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSource.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
import org.apache.flink.api.connector.source.SourceReaderContext;
3737
import org.apache.flink.api.connector.source.SplitEnumerator;
3838
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
39+
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
40+
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
3941
import org.apache.flink.core.io.SimpleVersionedSerializer;
4042
import org.apache.flink.runtime.jobgraph.OperatorID;
4143
import org.apache.flink.streaming.api.graph.StreamGraphHasherV2;
@@ -78,16 +80,15 @@ public Boundedness getBoundedness() {
7880

7981
@Override
8082
public SplitEnumerator<TieringSplit, TieringSourceEnumeratorState> createEnumerator(
81-
SplitEnumeratorContext<TieringSplit> splitEnumeratorContext) throws Exception {
83+
SplitEnumeratorContext<TieringSplit> splitEnumeratorContext) {
8284
return new TieringSourceEnumerator(
8385
flussConf, splitEnumeratorContext, pollTieringTableIntervalMs);
8486
}
8587

8688
@Override
8789
public SplitEnumerator<TieringSplit, TieringSourceEnumeratorState> restoreEnumerator(
8890
SplitEnumeratorContext<TieringSplit> splitEnumeratorContext,
89-
TieringSourceEnumeratorState tieringSourceEnumeratorState)
90-
throws Exception {
91+
TieringSourceEnumeratorState tieringSourceEnumeratorState) {
9192
// stateless operator
9293
return new TieringSourceEnumerator(
9394
flussConf, splitEnumeratorContext, pollTieringTableIntervalMs);
@@ -107,8 +108,11 @@ public SimpleVersionedSerializer<TieringSplit> getSplitSerializer() {
107108
@Override
108109
public SourceReader<TableBucketWriteResult<WriteResult>, TieringSplit> createReader(
109110
SourceReaderContext sourceReaderContext) {
111+
FutureCompletingBlockingQueue<RecordsWithSplitIds<TableBucketWriteResult<WriteResult>>>
112+
elementsQueue = new FutureCompletingBlockingQueue<>();
110113
Connection connection = ConnectionFactory.createConnection(flussConf);
111-
return new TieringSourceReader<>(sourceReaderContext, connection, lakeTieringFactory);
114+
return new TieringSourceReader<>(
115+
elementsQueue, sourceReaderContext, connection, lakeTieringFactory);
112116
}
113117

114118
/** This follows the operator uid hash generation logic of flink {@link StreamGraphHasherV2}. */
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.fluss.flink.tiering.source;
20+
21+
import org.apache.fluss.flink.adapter.SingleThreadFetcherManagerAdapter;
22+
import org.apache.fluss.flink.tiering.source.split.TieringSplit;
23+
24+
import org.apache.flink.configuration.Configuration;
25+
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
26+
import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher;
27+
import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherTask;
28+
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
29+
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
32+
33+
import java.util.Collection;
34+
import java.util.function.Consumer;
35+
import java.util.function.Supplier;
36+
37+
/**
38+
* The SplitFetcherManager for tiering source. This class is needed to help notify a table reaches
39+
* the max duration of tiering to {@link TieringSplitReader}.
40+
*/
41+
public class TieringSourceFetcherManager<WriteResult>
42+
extends SingleThreadFetcherManagerAdapter<
43+
TableBucketWriteResult<WriteResult>, TieringSplit> {
44+
45+
private static final Logger LOG = LoggerFactory.getLogger(TieringSourceFetcherManager.class);
46+
47+
public TieringSourceFetcherManager(
48+
FutureCompletingBlockingQueue<RecordsWithSplitIds<TableBucketWriteResult<WriteResult>>>
49+
elementsQueue,
50+
Supplier<SplitReader<TableBucketWriteResult<WriteResult>, TieringSplit>>
51+
splitReaderSupplier,
52+
Configuration configuration,
53+
Consumer<Collection<String>> splitFinishedHook) {
54+
super(elementsQueue, splitReaderSupplier, configuration, splitFinishedHook);
55+
}
56+
57+
public void markTableReachTieringMaxDuration(long tableId) {
58+
if (!fetchers.isEmpty()) {
59+
// The fetcher thread is still running. This should be the majority of the cases.
60+
LOG.info("fetchers is not empty, marking tiering max duration for table {}", tableId);
61+
fetchers.values()
62+
.forEach(
63+
splitFetcher ->
64+
enqueueMarkTableReachTieringMaxDurationTask(
65+
splitFetcher, tableId));
66+
} else {
67+
SplitFetcher<TableBucketWriteResult<WriteResult>, TieringSplit> splitFetcher =
68+
createSplitFetcher();
69+
LOG.info(
70+
"fetchers is empty, enqueue marking tiering max duration for table {}",
71+
tableId);
72+
enqueueMarkTableReachTieringMaxDurationTask(splitFetcher, tableId);
73+
startFetcher(splitFetcher);
74+
}
75+
}
76+
77+
private void enqueueMarkTableReachTieringMaxDurationTask(
78+
SplitFetcher<TableBucketWriteResult<WriteResult>, TieringSplit> splitFetcher,
79+
long reachTieringDeadlineTable) {
80+
splitFetcher.enqueueTask(
81+
new SplitFetcherTask() {
82+
@Override
83+
public boolean run() {
84+
((TieringSplitReader<WriteResult>) splitFetcher.getSplitReader())
85+
.handleTableReachTieringMaxDuration(reachTieringDeadlineTable);
86+
return true;
87+
}
88+
89+
@Override
90+
public void wakeUp() {
91+
// do nothing
92+
}
93+
});
94+
}
95+
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,36 +18,66 @@
1818
package org.apache.fluss.flink.tiering.source;
1919

2020
import org.apache.fluss.annotation.Internal;
21+
import org.apache.fluss.annotation.VisibleForTesting;
2122
import org.apache.fluss.client.Connection;
23+
import org.apache.fluss.flink.adapter.SingleThreadMultiplexSourceReaderBaseAdapter;
24+
import org.apache.fluss.flink.tiering.event.TieringReachMaxDurationEvent;
2225
import org.apache.fluss.flink.tiering.source.split.TieringSplit;
2326
import org.apache.fluss.flink.tiering.source.state.TieringSplitState;
2427
import org.apache.fluss.lake.writer.LakeTieringFactory;
2528

29+
import org.apache.flink.api.connector.source.SourceEvent;
2630
import org.apache.flink.api.connector.source.SourceReader;
2731
import org.apache.flink.api.connector.source.SourceReaderContext;
28-
import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
32+
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
33+
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
34+
import org.slf4j.Logger;
35+
import org.slf4j.LoggerFactory;
2936

37+
import java.time.Duration;
3038
import java.util.Collections;
3139
import java.util.List;
3240
import java.util.Map;
3341

42+
import static org.apache.fluss.flink.tiering.source.TieringSplitReader.DEFAULT_POLL_TIMEOUT;
43+
3444
/** A {@link SourceReader} that read records from Fluss and write to lake. */
3545
@Internal
3646
public final class TieringSourceReader<WriteResult>
37-
extends SingleThreadMultiplexSourceReaderBase<
47+
extends SingleThreadMultiplexSourceReaderBaseAdapter<
3848
TableBucketWriteResult<WriteResult>,
3949
TableBucketWriteResult<WriteResult>,
4050
TieringSplit,
4151
TieringSplitState> {
4252

53+
private static final Logger LOG = LoggerFactory.getLogger(TieringSourceReader.class);
54+
4355
private final Connection connection;
4456

4557
public TieringSourceReader(
58+
FutureCompletingBlockingQueue<RecordsWithSplitIds<TableBucketWriteResult<WriteResult>>>
59+
elementsQueue,
4660
SourceReaderContext context,
4761
Connection connection,
4862
LakeTieringFactory<WriteResult, ?> lakeTieringFactory) {
63+
this(elementsQueue, context, connection, lakeTieringFactory, DEFAULT_POLL_TIMEOUT);
64+
}
65+
66+
@VisibleForTesting
67+
TieringSourceReader(
68+
FutureCompletingBlockingQueue<RecordsWithSplitIds<TableBucketWriteResult<WriteResult>>>
69+
elementsQueue,
70+
SourceReaderContext context,
71+
Connection connection,
72+
LakeTieringFactory<WriteResult, ?> lakeTieringFactory,
73+
Duration pollTimeout) {
4974
super(
50-
() -> new TieringSplitReader<>(connection, lakeTieringFactory),
75+
elementsQueue,
76+
new TieringSourceFetcherManager<>(
77+
elementsQueue,
78+
() -> new TieringSplitReader<>(connection, lakeTieringFactory, pollTimeout),
79+
context.getConfiguration(),
80+
(ignore) -> {}),
5181
new TableBucketWriteResultEmitter<>(),
5282
context.getConfiguration(),
5383
context);
@@ -89,6 +119,18 @@ protected TieringSplit toSplitType(String splitId, TieringSplitState splitState)
89119
return splitState.toSourceSplit();
90120
}
91121

122+
@Override
123+
public void handleSourceEvents(SourceEvent sourceEvent) {
124+
if (sourceEvent instanceof TieringReachMaxDurationEvent) {
125+
TieringReachMaxDurationEvent reachMaxDurationEvent =
126+
(TieringReachMaxDurationEvent) sourceEvent;
127+
long tableId = reachMaxDurationEvent.getTableId();
128+
LOG.info("Received reach max duration for table {}", tableId);
129+
((TieringSourceFetcherManager<WriteResult>) splitFetcherManager)
130+
.markTableReachTieringMaxDuration(tableId);
131+
}
132+
}
133+
92134
@Override
93135
public void close() throws Exception {
94136
super.close();

0 commit comments

Comments
 (0)