Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,21 @@ public final class RecordAccumulator {
private final IdempotenceManager idempotenceManager;
private final Clock clock;

/**
* An upper bound on the time to report success or failure on record delivery when append into
* WriteBatch.
*/
private final long batchDeliveryTimeoutMs;

// TODO add retryBackoffMs to retry the produce request upon receiving an error.
// TODO add deliveryTimeoutMs to report success or failure on record delivery.
// TODO add nextBatchExpiryTimeMs
// TODO add nextBatchExpiryTimeMs. Trace by https://github.com/alibaba/fluss/pull/300

RecordAccumulator(
Configuration conf,
IdempotenceManager idempotenceManager,
WriterMetricGroup writerMetricGroup,
Clock clock) {
Clock clock,
long batchDeliveryTimeoutMs) {
this.closed = false;
this.flushesInProgress = new AtomicInteger(0);
this.appendsInProgress = new AtomicInteger(0);
Expand All @@ -136,6 +142,7 @@ public final class RecordAccumulator {
this.nodesDrainIndex = new HashMap<>();
this.idempotenceManager = idempotenceManager;
this.clock = clock;
this.batchDeliveryTimeoutMs = batchDeliveryTimeoutMs;
registerMetrics(writerMetricGroup);
}

Expand Down Expand Up @@ -294,6 +301,31 @@ public void reEnqueue(WriteBatch batch) {
}
}

public List<WriteBatch> expiredBatches(long now) {
List<WriteBatch> expiredBatches = new ArrayList<>();
for (BucketAndWriteBatches bucketAndWriteBatch : writeBatches.values()) {
for (Deque<WriteBatch> deque : bucketAndWriteBatch.batches.values()) {
// expire the batches in the order of sending.
synchronized (deque) {
while (!deque.isEmpty()) {
WriteBatch batch = deque.getFirst();
if (batch.hasReachedDeliveryTimeout(batchDeliveryTimeoutMs, now)) {
deque.poll();
expiredBatches.add(batch);
} else {
break;
}
}
}
}
}
return expiredBatches;
}

public long getDeliveryTimeoutMs() {
return batchDeliveryTimeoutMs;
}

/** Get the deque for the given table-bucket, creating it if necessary. */
private Deque<WriteBatch> getOrCreateDeque(
TableBucket tableBucket, PhysicalTablePath physicalTablePath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.alibaba.fluss.exception.InvalidMetadataException;
import com.alibaba.fluss.exception.OutOfOrderSequenceException;
import com.alibaba.fluss.exception.RetriableException;
import com.alibaba.fluss.exception.TimeoutException;
import com.alibaba.fluss.exception.UnknownTableOrBucketException;
import com.alibaba.fluss.metadata.PhysicalTablePath;
import com.alibaba.fluss.metadata.TableBucket;
Expand All @@ -37,6 +38,7 @@
import com.alibaba.fluss.rpc.messages.PutKvResponse;
import com.alibaba.fluss.rpc.protocol.ApiError;
import com.alibaba.fluss.rpc.protocol.Errors;
import com.alibaba.fluss.utils.clock.Clock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -46,6 +48,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -81,6 +84,9 @@ public class Sender implements Runnable {
/** the number of times to retry a failed write batch before giving up. */
private final int retries;

/* the clock instance used for getting the time */
private final Clock clock;

/** true while the sender thread is still running. */
private volatile boolean running;

Expand Down Expand Up @@ -111,6 +117,7 @@ public Sender(
int retries,
MetadataUpdater metadataUpdater,
IdempotenceManager idempotenceManager,
Clock clock,
WriterMetricGroup writerMetricGroup) {
this.accumulator = accumulator;
this.maxRequestSize = maxRequestSize;
Expand All @@ -124,6 +131,7 @@ public Sender(
checkNotNull(metadataUpdater.getCoordinatorServer());

this.idempotenceManager = idempotenceManager;
this.clock = clock;
this.writerMetricGroup = writerMetricGroup;

// TODO add retry logic while send failed. See FLUSS-56364375
Expand Down Expand Up @@ -175,7 +183,8 @@ public void runOnce() throws Exception {
}

// do send.
sendWriteData();
long currentTimeMillis = clock.milliseconds();
sendWriteData(currentTimeMillis);
}

public boolean isRunning() {
Expand All @@ -188,7 +197,7 @@ private void addToInflightBatches(Map<Integer, List<WriteBatch>> batches) {
}
}

private void sendWriteData() throws Exception {
private void sendWriteData(long now) throws Exception {
// get the list of buckets with data ready to send.
ReadyCheckResult readyCheckResult = accumulator.ready(metadataUpdater.getCluster());

Expand Down Expand Up @@ -216,7 +225,9 @@ private void sendWriteData() throws Exception {
if (!batches.isEmpty()) {
addToInflightBatches(batches);

// TODO add logic for batch expire.
// check and expire batches if they have reached batch delivery timeout. This can avoid
// the client to wait for a long time while the batch is forever retry.
checkAndExpireBatches(now);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the batch is expired here but is still in the sendWriteRequests(batches)? Will the expired batch still be sent to servers? What happens if the batches fail or success from the request?


sendWriteRequests(batches);

Expand All @@ -239,9 +250,8 @@ private void failBatch(WriteBatch batch, Exception exception, boolean adjustBatc
if (idempotenceManager.idempotenceEnabled()) {
try {
// This call can throw an exception in the rare case that there's an invalid
// state
// transition attempted. Catch these so as not to interfere with the rest of the
// logic.
// state transition attempted. Catch these so as not to interfere with the rest
// of the logic.
idempotenceManager.handleFailedBatch(batch, exception, adjustBatchSequences);
} catch (Exception e) {
LOG.debug(
Expand Down Expand Up @@ -280,6 +290,64 @@ private void maybeRemoveAndDeallocateBatch(WriteBatch batch) {
accumulator.deallocate(batch);
}

private void checkAndExpireBatches(long now) {
List<WriteBatch> expiredInflightBatches = getExpiredInflightBatches(now);
List<WriteBatch> expiredBatches = accumulator.expiredBatches(now);
expiredBatches.addAll(expiredInflightBatches);
if (!expiredBatches.isEmpty()) {
LOG.trace("Client found expired batches: {}", expiredBatches);
}

for (WriteBatch batch : expiredBatches) {
failBatch(
batch,
new TimeoutException(
String.format(
"Expiring %s records for %s : %s ms has passed since batch creation.",
batch.recordCount,
batch.tableBucket(),
now - batch.getCreatedMs())),
false);
}
}

/** Get the in-flight batches that has reached batch delivery timeout. */
@VisibleForTesting
List<WriteBatch> getExpiredInflightBatches(long now) {
List<WriteBatch> expiredBatches = new ArrayList<>();
for (Iterator<Map.Entry<TableBucket, List<WriteBatch>>> batchIt =
inFlightBatches.entrySet().iterator();
batchIt.hasNext(); ) {
Map.Entry<TableBucket, List<WriteBatch>> entry = batchIt.next();
List<WriteBatch> tbFlightBatches = entry.getValue();
if (tbFlightBatches != null) {
Iterator<WriteBatch> iter = tbFlightBatches.iterator();
while (iter.hasNext()) {
WriteBatch batch = iter.next();
if (batch.hasReachedDeliveryTimeout(accumulator.getDeliveryTimeoutMs(), now)) {
iter.remove();
if (!batch.isDone()) {
expiredBatches.add(batch);
} else {
throw new IllegalStateException(
batch.tableBucket()
+ " batch created at "
+ batch.getCreatedMs()
+ " gets unexpected final state "
+ batch.getFinalState());
}
} else {
break;
}
}
if (tbFlightBatches.isEmpty()) {
batchIt.remove();
}
}
}
return expiredBatches;
}

private void maybeRemoveFromInflightBatches(WriteBatch batch) {
synchronized (inFlightBatchesLock) {
List<WriteBatch> batches = inFlightBatches.get(batch.tableBucket());
Expand Down Expand Up @@ -355,12 +423,11 @@ private void sendProduceLogRequestAndHandleResponse(
ProduceLogRequest request,
long tableId,
Map<TableBucket, WriteBatch> recordsByBucket) {
long startTime = System.currentTimeMillis();
long startTime = clock.milliseconds();
gateway.produceLog(request)
.whenComplete(
(produceLogResponse, e) -> {
writerMetricGroup.setSendLatencyInMs(
System.currentTimeMillis() - startTime);
writerMetricGroup.setSendLatencyInMs(clock.milliseconds() - startTime);
if (e != null) {
handleWriteRequestException(e, recordsByBucket);
} else {
Expand All @@ -375,12 +442,11 @@ private void sendPutKvRequestAndHandleResponse(
PutKvRequest request,
long tableId,
Map<TableBucket, WriteBatch> recordsByBucket) {
long startTime = System.currentTimeMillis();
long startTime = clock.milliseconds();
gateway.putKv(request)
.whenComplete(
(putKvResponse, e) -> {
writerMetricGroup.setSendLatencyInMs(
System.currentTimeMillis() - startTime);
writerMetricGroup.setSendLatencyInMs(clock.milliseconds() - startTime);
if (e != null) {
handleWriteRequestException(e, recordsByBucket);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,18 @@ public RequestFuture getRequestFuture() {
return requestFuture;
}

long getCreatedMs() {
return createdMs;
}

boolean hasReachedDeliveryTimeout(long batchDeliveryTimeoutMs, long now) {
return batchDeliveryTimeoutMs <= now - this.createdMs;
}

FinalState getFinalState() {
return finalState.get();
}

public long waitedTimeMs(long nowMs) {
return Math.max(0, nowMs - createdMs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.alibaba.fluss.rpc.gateway.TabletServerGateway;
import com.alibaba.fluss.rpc.metrics.ClientMetricGroup;
import com.alibaba.fluss.utils.CopyOnWriteMap;
import com.alibaba.fluss.utils.clock.Clock;
import com.alibaba.fluss.utils.clock.SystemClock;
import com.alibaba.fluss.utils.concurrent.ExecutorThreadFactory;

Expand Down Expand Up @@ -97,10 +98,16 @@ public WriterClient(

short acks = configureAcks(idempotenceManager.idempotenceEnabled());
int retries = configureRetries(idempotenceManager.idempotenceEnabled());
long batchDeliveryTimeoutMs = configureBatchDeliveryTimeout(conf);
SystemClock clock = SystemClock.getInstance();
this.accumulator =
new RecordAccumulator(
conf, idempotenceManager, writerMetricGroup, SystemClock.getInstance());
this.sender = newSender(acks, retries);
conf,
idempotenceManager,
writerMetricGroup,
clock,
batchDeliveryTimeoutMs);
this.sender = newSender(acks, retries, clock);
this.ioThreadPool = createThreadPool();
ioThreadPool.submit(sender);
} catch (Throwable t) {
Expand Down Expand Up @@ -249,7 +256,27 @@ private int configureRetries(boolean idempotenceEnabled) {
return retries;
}

private Sender newSender(short acks, int retries) {
private long configureBatchDeliveryTimeout(Configuration conf) {
long batchDeliveryTimeoutMs =
conf.get(ConfigOptions.CLIENT_WRITER_BATCH_DELIVERY_TIMEOUT).toMillis();
long batchTimeoutMs = conf.get(ConfigOptions.CLIENT_WRITER_BATCH_TIMEOUT).toMillis();
long requestTimeoutMs = conf.get(ConfigOptions.CLIENT_REQUEST_TIMEOUT).toMillis();
long batchAndRequestTimeoutMs = Math.min(batchTimeoutMs + requestTimeoutMs, Long.MAX_VALUE);

if (batchDeliveryTimeoutMs < batchAndRequestTimeoutMs) {
throw new IllegalConfigurationException(
"The value of "
+ ConfigOptions.CLIENT_WRITER_BATCH_DELIVERY_TIMEOUT.key()
+ " should be greater than or equal to "
+ ConfigOptions.CLIENT_REQUEST_TIMEOUT.key()
+ " + "
+ ConfigOptions.CLIENT_WRITER_BATCH_TIMEOUT.key()
+ ".");
}
return batchDeliveryTimeoutMs;
}

private Sender newSender(short acks, int retries, Clock clock) {
return new Sender(
accumulator,
(int) conf.get(ConfigOptions.CLIENT_REQUEST_TIMEOUT).toMillis(),
Expand All @@ -258,6 +285,7 @@ private Sender newSender(short acks, int retries) {
retries,
metadataUpdater,
idempotenceManager,
clock,
writerMetricGroup);
}

Expand Down
Loading