Skip to content

Commit 319836b

Browse files
authored
Merge pull request #610 from alex268/master
Added MessageCommitter interface and implementation
2 parents 877ca3f + 3828672 commit 319836b

17 files changed

+405
-237
lines changed
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,20 @@
11
package tech.ydb.topic.description;
22

3+
import tech.ydb.topic.read.impl.OffsetsRangeImpl;
4+
35
/**
46
* @author Nikolay Perfilov
57
*/
68
public interface OffsetsRange {
79
long getStart();
810

911
long getEnd();
12+
13+
static OffsetsRange of(long start) {
14+
return new OffsetsRangeImpl(start, start + 1);
15+
}
16+
17+
static OffsetsRange of(long start, long end) {
18+
return new OffsetsRangeImpl(start, end);
19+
}
1020
}

topic/src/main/java/tech/ydb/topic/read/Message.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import java.util.concurrent.CompletableFuture;
77

88
import tech.ydb.topic.description.MetadataItem;
9+
import tech.ydb.topic.description.OffsetsRange;
910

1011
/**
1112
* @author Nikolay Perfilov
@@ -52,6 +53,13 @@ public interface Message {
5253
*/
5354
Instant getWrittenAt();
5455

56+
/**
57+
* Returns message offsets range for committing
58+
*
59+
* @return Message offsets range for committing
60+
*/
61+
OffsetsRange getRangeToCommit();
62+
5563
/**
5664
* @return message metadata items
5765
*/
@@ -65,6 +73,7 @@ public interface Message {
6573
/**
6674
* @return Partition offsets of this message
6775
*/
76+
@Deprecated
6877
PartitionOffsets getPartitionOffsets();
6978

7079
/**
@@ -75,6 +84,15 @@ public interface Message {
7584
*
7685
* @return CompletableFuture that will be completed when commit confirmation from server will be received
7786
*/
78-
CompletableFuture<Void> commit();
87+
default CompletableFuture<Void> commit() {
88+
return getCommitter().commit(getRangeToCommit());
89+
}
7990

91+
/**
92+
* The committer for this message. The committer is linked to the current partition reading session and is active
93+
* when this session is alive. The commits on nonactive committer will return failed {@link CompletableFuture }
94+
*
95+
* @return committer instance for this message
96+
*/
97+
MessageCommitter getCommitter();
8098
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package tech.ydb.topic.read;
2+
3+
import java.util.List;
4+
import java.util.concurrent.CompletableFuture;
5+
6+
import tech.ydb.topic.description.OffsetsRange;
7+
8+
/**
9+
*
10+
* @author Aleksandr Gorshenin
11+
*/
12+
public interface MessageCommitter {
13+
14+
CompletableFuture<Void> commit(OffsetsRange range);
15+
16+
void commitRanges(List<OffsetsRange> ranges);
17+
}

topic/src/main/java/tech/ydb/topic/read/TransactionMessageAccumulator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*
1717
* @author Nikolay Perfilov
1818
*/
19+
@Deprecated
1920
public interface TransactionMessageAccumulator extends MessageAccumulator {
2021
CompletableFuture<Status> updateOffsetsInTransaction(YdbTransaction transaction,
2122
UpdateOffsetsInTransactionSettings settings);

topic/src/main/java/tech/ydb/topic/read/events/DataReceivedEvent.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
import java.util.List;
44
import java.util.concurrent.CompletableFuture;
55

6+
import tech.ydb.topic.description.OffsetsRange;
67
import tech.ydb.topic.read.Message;
8+
import tech.ydb.topic.read.MessageCommitter;
79
import tech.ydb.topic.read.PartitionOffsets;
810
import tech.ydb.topic.read.PartitionSession;
911

@@ -27,11 +29,19 @@ public interface DataReceivedEvent {
2729
*/
2830
PartitionSession getPartitionSession();
2931

32+
/**
33+
* Returns offsets range for committing of this event
34+
*
35+
* @return Offsets range for committing of this event
36+
*/
37+
OffsetsRange getRangeToCommit();
38+
3039
/**
3140
* Returns partition offsets of this message
3241
*
3342
* @return Partition offsets of this message
3443
*/
44+
@Deprecated
3545
PartitionOffsets getPartitionOffsets();
3646

3747
/**
@@ -42,6 +52,15 @@ public interface DataReceivedEvent {
4252
*
4353
* @return a CompletableFuture that will be completed when commit confirmation from server will be received
4454
*/
45-
CompletableFuture<Void> commit();
55+
default CompletableFuture<Void> commit() {
56+
return getCommitter().commit(getRangeToCommit());
57+
}
4658

59+
/**
60+
* The committer for this event. The committer is linked to the current partition reading session and is active
61+
* when this session is alive. The commits on nonactive committer will return failed {@link CompletableFuture }
62+
*
63+
* @return committer instance for this message
64+
*/
65+
MessageCommitter getCommitter();
4766
}

topic/src/main/java/tech/ydb/topic/read/impl/DeferredCommitterImpl.java

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,50 +8,50 @@
88

99
import tech.ydb.topic.read.DeferredCommitter;
1010
import tech.ydb.topic.read.Message;
11+
import tech.ydb.topic.read.MessageCommitter;
12+
import tech.ydb.topic.read.PartitionSession;
1113
import tech.ydb.topic.read.events.DataReceivedEvent;
12-
import tech.ydb.topic.read.impl.events.DataReceivedEventImpl;
1314

1415
/**
1516
* @author Nikolay Perfilov
1617
*/
1718
public class DeferredCommitterImpl implements DeferredCommitter {
1819
private static final Logger logger = LoggerFactory.getLogger(DeferredCommitterImpl.class);
1920

20-
private final Map<ReadPartitionSession, DisjointOffsetRangeSet> rangesBySession = new ConcurrentHashMap<>();
21+
private final Map<MessageCommitter, DisjointOffsetRangeSet> rangesBySession = new ConcurrentHashMap<>();
2122

22-
private RuntimeException wrapExceptionWithSession(ReadPartitionSession session, RuntimeException ex) {
23-
String errorMessage = "[" + session.getId() + "] Error adding new offset range to " +
24-
"DeferredCommitter for " + session.getPartition() + ": " + ex.getMessage();
25-
logger.error(errorMessage);
26-
return new RuntimeException(errorMessage, ex);
23+
private RuntimeException wrapExceptionWithSession(PartitionSession session, RuntimeException ex) {
24+
String msg = "Error adding new offset range to DeferredCommitter for " + session + ": " + ex.getMessage();
25+
logger.error(msg);
26+
return new RuntimeException(msg, ex);
2727
}
2828

2929
@Override
3030
public void add(Message message) {
31-
ReadPartitionSession session = ((MessageImpl) message).getPartitionSessionImpl();
32-
DisjointOffsetRangeSet range = rangesBySession.computeIfAbsent(session, s -> new DisjointOffsetRangeSet());
31+
MessageCommitter committer = message.getCommitter();
32+
DisjointOffsetRangeSet range = rangesBySession.computeIfAbsent(committer, s -> new DisjointOffsetRangeSet());
3333
try {
34-
range.add(message.getPartitionOffsets().getOffsets());
34+
range.add(message.getRangeToCommit());
3535
} catch (RuntimeException exception) {
36-
throw wrapExceptionWithSession(session, exception);
36+
throw wrapExceptionWithSession(message.getPartitionSession(), exception);
3737
}
3838
}
3939

4040
@Override
4141
public void add(DataReceivedEvent event) {
42-
ReadPartitionSession session = ((DataReceivedEventImpl) event).getPartitionSessionImpl();
43-
DisjointOffsetRangeSet range = rangesBySession.computeIfAbsent(session, s -> new DisjointOffsetRangeSet());
42+
MessageCommitter committer = event.getCommitter();
43+
DisjointOffsetRangeSet range = rangesBySession.computeIfAbsent(committer, s -> new DisjointOffsetRangeSet());
4444
try {
45-
range.add(event.getPartitionOffsets().getOffsets());
45+
range.add(event.getRangeToCommit());
4646
} catch (RuntimeException exception) {
47-
throw wrapExceptionWithSession(session, exception);
47+
throw wrapExceptionWithSession(event.getPartitionSession(), exception);
4848
}
4949
}
5050

5151
@Override
5252
public void commit() {
53-
rangesBySession.forEach((session, ranges) -> {
54-
session.commit(ranges.getRangesAndClear());
53+
rangesBySession.forEach((committer, ranges) -> {
54+
committer.commitRanges(ranges.getRangesAndClear());
5555
});
5656
}
5757
}

topic/src/main/java/tech/ydb/topic/read/impl/DisjointOffsetRangeSet.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package tech.ydb.topic.read.impl;
22

33
import java.util.ArrayList;
4-
import java.util.Collection;
54
import java.util.List;
65
import java.util.Map;
76
import java.util.NavigableMap;
@@ -17,12 +16,10 @@ public class DisjointOffsetRangeSet {
1716
private final NavigableMap<Long, OffsetsRangeImpl> ranges = new TreeMap<>();
1817
private final ReentrantLock rangesLock = new ReentrantLock();
1918

20-
public void add(Collection<OffsetsRange> ranges) {
19+
public void add(OffsetsRange range) {
2120
rangesLock.lock();
2221
try {
23-
for (OffsetsRange range: ranges) {
24-
addImpl(range);
25-
}
22+
addImpl(range);
2623
} finally {
2724
rangesLock.unlock();
2825
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package tech.ydb.topic.read.impl;
2+
3+
import java.util.Collections;
4+
import java.util.List;
5+
import java.util.Map;
6+
import java.util.NavigableMap;
7+
import java.util.TreeMap;
8+
import java.util.concurrent.CompletableFuture;
9+
import java.util.concurrent.locks.ReentrantLock;
10+
11+
import org.slf4j.Logger;
12+
import org.slf4j.LoggerFactory;
13+
14+
import tech.ydb.topic.description.OffsetsRange;
15+
import tech.ydb.topic.read.MessageCommitter;
16+
17+
/**
18+
*
19+
* @author Aleksandr Gorshenin
20+
*/
21+
class MessageCommitterImpl implements MessageCommitter {
22+
private static final Logger logger = LoggerFactory.getLogger(ReaderImpl.class);
23+
24+
private final ReadPartitionSession session;
25+
26+
private final NavigableMap<Long, CompletableFuture<Void>> commitFutures = new TreeMap<>();
27+
private final ReentrantLock commitFuturesLock = new ReentrantLock();
28+
29+
private volatile long lastCommittedOffset;
30+
31+
MessageCommitterImpl(ReadPartitionSession session, long lastCommittedOffset) {
32+
this.session = session;
33+
this.lastCommittedOffset = lastCommittedOffset;
34+
}
35+
36+
private RuntimeException partitionIsClosedException() {
37+
return new RuntimeException("" + session.getPartition() + " is already stopped");
38+
}
39+
40+
public void confirmCommit(long committedOffset) {
41+
if (committedOffset <= lastCommittedOffset) { // never happens
42+
logger.error("{} received commit response. Committed offset: {} which is less than previous " +
43+
"committed offset: {}.", session, committedOffset, lastCommittedOffset);
44+
return;
45+
}
46+
47+
commitFuturesLock.lock();
48+
try {
49+
Map<Long, CompletableFuture<Void>> confirmed = commitFutures.headMap(committedOffset, true);
50+
51+
logger.debug("{} received commit response. Committed offset: {}. "
52+
+ "Previous committed offset: {} (diff is {} message(s)). Completing {} commit futures", session,
53+
committedOffset, lastCommittedOffset, committedOffset - lastCommittedOffset, confirmed.size());
54+
55+
lastCommittedOffset = committedOffset;
56+
confirmed.values().forEach(future -> future.complete(null));
57+
confirmed.clear();
58+
} finally {
59+
commitFuturesLock.unlock();
60+
}
61+
}
62+
63+
@Override
64+
public CompletableFuture<Void> commit(OffsetsRange range) {
65+
CompletableFuture<Void> future = new CompletableFuture<>();
66+
logger.debug("{} Offset range {} is requested to be committed. Last committed offset is {} (commit lag is {})",
67+
session, range, lastCommittedOffset, range.getStart() - lastCommittedOffset);
68+
69+
commitFuturesLock.lock();
70+
try {
71+
if (session.commitOffsets(Collections.singletonList(range))) {
72+
commitFutures.put(range.getEnd(), future);
73+
} else {
74+
logger.info("{} Offset range {} is requested to be committed, but partition session is already stopped",
75+
session, range);
76+
future.completeExceptionally(partitionIsClosedException());
77+
}
78+
} finally {
79+
commitFuturesLock.unlock();
80+
}
81+
82+
return future;
83+
}
84+
85+
@Override
86+
public void commitRanges(List<OffsetsRange> ranges) {
87+
session.commitOffsets(ranges);
88+
}
89+
90+
public void failPendingCommits() {
91+
commitFuturesLock.lock();
92+
try {
93+
logger.info("{} for {} is stopping. Failing {} commit futures...", session,
94+
session.getPartition().getPath(), commitFutures.size());
95+
commitFutures.values().forEach(f -> f.completeExceptionally(partitionIsClosedException()));
96+
commitFutures.clear();
97+
} finally {
98+
commitFuturesLock.unlock();
99+
}
100+
}
101+
}

topic/src/main/java/tech/ydb/topic/read/impl/MessageDecoder.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ public MessageDecoder(long maxBufferSize, Executor decompressionExecutor, CodecR
3030
this.codecRegistry = codecRegistry;
3131
}
3232

33-
public void decode(String fullId, Batch batch, Runnable readyHandler) {
34-
decodingQueue.offer(new DecodeTask(fullId, batch, readyHandler));
33+
public void decode(String traceID, Batch batch, Runnable readyHandler) {
34+
decodingQueue.offer(new DecodeTask(traceID, batch, readyHandler));
3535
tryToDecodeNextBatch();
3636
}
3737

@@ -79,12 +79,12 @@ private long getUncompressedSize(Batch batch) {
7979
}
8080

8181
private class DecodeTask implements Runnable {
82-
private final String fullId;
82+
private final String traceID;
8383
private final Batch batch;
8484
private final Runnable readyHandler;
8585

86-
DecodeTask(String fullId, Batch batch, Runnable readyHandler) {
87-
this.fullId = fullId;
86+
DecodeTask(String traceID, Batch batch, Runnable readyHandler) {
87+
this.traceID = traceID;
8888
this.batch = batch;
8989
this.readyHandler = readyHandler;
9090
}
@@ -96,21 +96,21 @@ public Batch getBatch() {
9696
@Override
9797
public void run() {
9898
if (logger.isTraceEnabled()) {
99-
logger.trace("[{}] Started decoding batch", fullId);
99+
logger.trace("[{}] Started decoding batch", traceID);
100100
}
101101

102102
batch.getMessages().forEach(message -> {
103103
try {
104104
message.setData(Encoder.decode(batch.getCodec(), message.getData(), codecRegistry));
105105
} catch (IOException exception) {
106106
message.setException(exception);
107-
logger.warn("[{}] Exception was thrown while decoding a message: ", fullId, exception);
107+
logger.warn("[{}] Exception was thrown while decoding a message: ", traceID, exception);
108108
}
109109
});
110110
batch.markAsReady();
111111

112112
if (logger.isTraceEnabled()) {
113-
logger.trace("[{}] Finished decoding batch", fullId);
113+
logger.trace("[{}] Finished decoding batch", traceID);
114114
}
115115

116116
readyHandler.run();

0 commit comments

Comments
 (0)