Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.fluss.exception.PartitionNotExistException;
import org.apache.fluss.exception.RetriableException;
import org.apache.fluss.exception.StaleMetadataException;
import org.apache.fluss.exception.TableNotExistException;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TablePartition;
Expand Down Expand Up @@ -287,6 +288,9 @@ public void updateMetadata(
} else if (t instanceof PartitionNotExistException) {
LOG.warn("Failed to update metadata because the partition does not exist", t);
throw (PartitionNotExistException) t;
} else if (t instanceof TableNotExistException) {
LOG.warn("Failed to update metadata because the table does not exist", t);
throw (TableNotExistException) t;
} else {
throw new FlussRuntimeException("Failed to update metadata", t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.fluss.cluster.ServerType;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.StaleMetadataException;
import org.apache.fluss.exception.TableNotExistException;
import org.apache.fluss.rpc.RpcClient;
import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway;
import org.apache.fluss.rpc.messages.MetadataRequest;
Expand Down Expand Up @@ -70,6 +71,24 @@ void testInitializeClusterWithRetries() throws Exception {
.hasMessageContaining("The metadata is stale.");
}

@Test
void testTableNotExistExceptionIsNotWrapped() throws Exception {
Configuration configuration = new Configuration();
RpcClient rpcClient =
RpcClient.create(configuration, TestingClientMetricGroup.newInstance(), false);

// Gateway that throws TableNotExistException on metadata request
AdminReadOnlyGateway gateway = new TestingTableNotExistGateway();

// TableNotExistException should propagate directly without being wrapped
assertThatThrownBy(
() ->
MetadataUpdater.tryToInitializeClusterWithRetries(
rpcClient, CS_NODE, gateway, 3))
.isInstanceOf(TableNotExistException.class)
.hasMessageContaining("test_table");
}

private static final class TestingAdminReadOnlyGateway extends TestCoordinatorGateway {

private final int maxRetryCount;
Expand All @@ -95,4 +114,16 @@ public CompletableFuture<MetadataResponse> metadata(MetadataRequest request) {
}
}
}

/**
* A testing gateway that throws TableNotExistException when metadata is requested. Used to
* verify that TableNotExistException is propagated without being wrapped.
*/
private static final class TestingTableNotExistGateway extends TestCoordinatorGateway {

@Override
public CompletableFuture<MetadataResponse> metadata(MetadataRequest request) {
throw new TableNotExistException("Table 'test_table' does not exist.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.fluss.annotation.PublicEvolving;
import org.apache.fluss.record.LogRecord;

import javax.annotation.Nullable;

import java.io.Closeable;
import java.io.IOException;

Expand All @@ -43,8 +45,9 @@ public interface LakeWriter<WriteResult> extends Closeable {
/**
* Completes the writing process and returns the write result.
*
* @return the write result
* @return the write result, or null if no data was written (empty write scenario)
* @throws IOException if an I/O error occurs
*/
@Nullable
WriteResult complete() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,17 @@ public void processElement(StreamRecord<TableBucketWriteResult<WriteResult>> str
collectTableAllBucketWriteResult(tableId);

if (committableWriteResults != null) {
// Check if any result is cancelled (table was dropped)
boolean isCancelled =
committableWriteResults.stream().anyMatch(TableBucketWriteResult::isCancelled);
if (isCancelled) {
LOG.info(
"Skipping commit for dropped table {}, table path {}.",
tableId,
tableBucketWriteResult.tablePath());
collectedTableBucketWriteResults.remove(tableId);
return;
}
try {
CommitResult commitResult =
commitWriteResults(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.flink.tiering.event;

import org.apache.flink.api.connector.source.SourceEvent;

import java.util.Objects;

/**
* SourceEvent used to notify TieringSourceReader that a table has been dropped or recreated, and
* all pending splits for this table should be skipped.
*/
public class TieringTableDroppedEvent implements SourceEvent {

private static final long serialVersionUID = 1L;

private final long tableId;

public TieringTableDroppedEvent(long tableId) {
this.tableId = tableId;
}

public long getTableId() {
return tableId;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof TieringTableDroppedEvent)) {
return false;
}
TieringTableDroppedEvent that = (TieringTableDroppedEvent) o;
return tableId == that.tableId;
}

@Override
public int hashCode() {
return Objects.hashCode(tableId);
}

@Override
public String toString() {
return "TieringTableDroppedEvent{" + "tableId=" + tableId + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@
* that the write result is for, the end log offset of tiering, the total number of write results in
* one round of tiering. It'll be passed to downstream committer operator to collect all the write
* results of a table and do commit.
*
* <p>When {@code cancelled} is {@code true}, it indicates this result originates from a tiering
* round that was aborted (e.g., the table was dropped). In this case the {@link WriteResult} will
* always be {@code null} and the downstream committer should skip the commit and instead report the
* cancellation back to the coordinator.
*/
public class TableBucketWriteResult<WriteResult> implements Serializable {

Expand Down Expand Up @@ -57,21 +62,26 @@ public class TableBucketWriteResult<WriteResult> implements Serializable {
// for the round of tiering is finished
private final int numberOfWriteResults;

// indicates whether this result is from a cancelled tiering (e.g., table was dropped)
private final boolean cancelled;

public TableBucketWriteResult(
TablePath tablePath,
TableBucket tableBucket,
@Nullable String partitionName,
@Nullable WriteResult writeResult,
long logEndOffset,
long maxTimestamp,
int numberOfWriteResults) {
int numberOfWriteResults,
boolean cancelled) {
this.tablePath = tablePath;
this.tableBucket = tableBucket;
this.partitionName = partitionName;
this.writeResult = writeResult;
this.logEndOffset = logEndOffset;
this.maxTimestamp = maxTimestamp;
this.numberOfWriteResults = numberOfWriteResults;
this.cancelled = cancelled;
}

public TablePath tablePath() {
Expand Down Expand Up @@ -103,4 +113,8 @@ public long logEndOffset() {
public long maxTimestamp() {
return maxTimestamp;
}

public boolean isCancelled() {
return cancelled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ public byte[] serialize(TableBucketWriteResult<WriteResult> tableBucketWriteResu
// serialize number of write results
out.writeInt(tableBucketWriteResult.numberOfWriteResults());

// serialize cancelled flag
out.writeBoolean(tableBucketWriteResult.isCancelled());

final byte[] result = out.getCopyOfBuffer();
out.clear();
return result;
Expand Down Expand Up @@ -136,13 +139,18 @@ public TableBucketWriteResult<WriteResult> deserialize(int version, byte[] seria
long maxTimestamp = in.readLong();
// deserialize number of write results
int numberOfWriteResults = in.readInt();

// deserialize cancelled flag
boolean cancelled = in.readBoolean();

return new TableBucketWriteResult<>(
tablePath,
tableBucket,
partitionName,
writeResult,
logEndOffset,
maxTimestamp,
numberOfWriteResults);
numberOfWriteResults,
cancelled);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,41 +55,61 @@ public TieringSourceFetcherManager(
}

public void markTableReachTieringMaxDuration(long tableId) {
LOG.info("Enqueueing handleTableReachTieringMaxDuration task for table {}", tableId);
enqueueTaskForTable(
tableId,
reader -> {
LOG.debug(
"Executing handleTableReachTieringMaxDuration in split reader for table {}",
tableId);
reader.handleTableReachTieringMaxDuration(tableId);
},
"handleTableReachTieringMaxDuration");
}

public void markTableDropped(long tableId) {
LOG.info("Enqueueing handleTableDropped task for table {}", tableId);
enqueueTaskForTable(
tableId,
reader -> {
LOG.debug("Executing handleTableDropped in split reader for table {}", tableId);
reader.handleTableDropped(tableId);
},
"handleTableDropped");
}

private void enqueueTaskForTable(
long tableId, Consumer<TieringSplitReader<WriteResult>> action, String actionDesc) {
SplitFetcher<TableBucketWriteResult<WriteResult>, TieringSplit> splitFetcher;
if (!fetchers.isEmpty()) {
// The fetcher thread is still running. This should be the majority of the cases.
LOG.info("fetchers is not empty, marking tiering max duration for table {}", tableId);
fetchers.values()
.forEach(
splitFetcher ->
enqueueMarkTableReachTieringMaxDurationTask(
splitFetcher, tableId));
LOG.info("Fetchers are active, enqueueing {} task for table {}", actionDesc, tableId);
fetchers.values().forEach(f -> enqueueReaderTask(f, action));
} else {
SplitFetcher<TableBucketWriteResult<WriteResult>, TieringSplit> splitFetcher =
createSplitFetcher();
LOG.info(
"fetchers is empty, enqueue marking tiering max duration for table {}",
"No active fetchers, creating new fetcher and enqueueing {} task for table {}",
actionDesc,
tableId);
enqueueMarkTableReachTieringMaxDurationTask(splitFetcher, tableId);
splitFetcher = createSplitFetcher();
enqueueReaderTask(splitFetcher, action);
startFetcher(splitFetcher);
}
}

private void enqueueMarkTableReachTieringMaxDurationTask(
@SuppressWarnings("unchecked")
private void enqueueReaderTask(
SplitFetcher<TableBucketWriteResult<WriteResult>, TieringSplit> splitFetcher,
long reachTieringDeadlineTable) {
Consumer<TieringSplitReader<WriteResult>> action) {
splitFetcher.enqueueTask(
new SplitFetcherTask() {
@Override
public boolean run() {
((TieringSplitReader<WriteResult>) splitFetcher.getSplitReader())
.handleTableReachTieringMaxDuration(reachTieringDeadlineTable);
action.accept(
(TieringSplitReader<WriteResult>) splitFetcher.getSplitReader());
return true;
}

@Override
public void wakeUp() {
// do nothing
}
public void wakeUp() {}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.fluss.client.Connection;
import org.apache.fluss.flink.adapter.SingleThreadMultiplexSourceReaderBaseAdapter;
import org.apache.fluss.flink.tiering.event.TieringReachMaxDurationEvent;
import org.apache.fluss.flink.tiering.event.TieringTableDroppedEvent;
import org.apache.fluss.flink.tiering.source.split.TieringSplit;
import org.apache.fluss.flink.tiering.source.state.TieringSplitState;
import org.apache.fluss.lake.writer.LakeTieringFactory;
Expand Down Expand Up @@ -125,9 +126,15 @@ public void handleSourceEvents(SourceEvent sourceEvent) {
TieringReachMaxDurationEvent reachMaxDurationEvent =
(TieringReachMaxDurationEvent) sourceEvent;
long tableId = reachMaxDurationEvent.getTableId();
LOG.info("Received reach max duration for table {}", tableId);
LOG.info("Received reach max duration event for table {}", tableId);
((TieringSourceFetcherManager<WriteResult>) splitFetcherManager)
.markTableReachTieringMaxDuration(tableId);
} else if (sourceEvent instanceof TieringTableDroppedEvent) {
TieringTableDroppedEvent tableDroppedEvent = (TieringTableDroppedEvent) sourceEvent;
long tableId = tableDroppedEvent.getTableId();
LOG.info("Received table dropped event for table {}", tableId);
((TieringSourceFetcherManager<WriteResult>) splitFetcherManager)
.markTableDropped(tableId);
}
}

Expand Down
Loading
Loading