Skip to content

Commit 1dfa8f7

Browse files
fix(s3stream): avoid StreamMetadataManager add callback when retry (#… (#2661)
fix(s3stream): avoid StreamMetadataManager add callback when retry (#2659) Co-authored-by: lifepuzzlefun <[email protected]>
1 parent f473d7d commit 1dfa8f7

File tree

1 file changed

+7
-5
lines changed

1 file changed

+7
-5
lines changed

core/src/main/scala/kafka/log/stream/s3/metadata/StreamMetadataManager.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -138,12 +138,12 @@ public Set<Long> getStreamSetObjectIds() {
138138
public CompletableFuture<InRangeObjects> fetch(long streamId, long startOffset, long endOffset, int limit) {
139139
// TODO: cache the object list for next search
140140
CompletableFuture<InRangeObjects> cf = new CompletableFuture<>();
141-
exec(() -> fetch0(cf, streamId, startOffset, endOffset, limit), cf, LOGGER, "fetchObjects");
141+
exec(() -> fetch0(cf, streamId, startOffset, endOffset, limit, false), cf, LOGGER, "fetchObjects");
142142
return cf;
143143
}
144144

145145
private void fetch0(CompletableFuture<InRangeObjects> cf, long streamId,
146-
long startOffset, long endOffset, int limit) {
146+
long startOffset, long endOffset, int limit, boolean retryFetch) {
147147
Image image = getImage();
148148
try {
149149
final S3StreamsMetadataImage streamsImage = image.streamsMetadata();
@@ -179,9 +179,11 @@ private void fetch0(CompletableFuture<InRangeObjects> cf, long streamId,
179179
streamId, startOffset, endOffset, limit, rst.objects().size(), rst.endOffset());
180180

181181
CompletableFuture<Void> pendingCf = pendingFetch();
182-
pendingCf.thenAccept(nil -> fetch0(cf, streamId, startOffset, endOffset, limit));
183-
cf.whenComplete((r, ex) ->
184-
LOGGER.info("[FetchObjects],[COMPLETE_PENDING],streamId={} startOffset={} endOffset={} limit={}", streamId, startOffset, endOffset, limit));
182+
pendingCf.thenAccept(nil -> fetch0(cf, streamId, startOffset, endOffset, limit, true));
183+
if (!retryFetch) {
184+
cf.whenComplete((r, ex) ->
185+
LOGGER.info("[FetchObjects],[COMPLETE_PENDING],streamId={} startOffset={} endOffset={} limit={}", streamId, startOffset, endOffset, limit));
186+
}
185187
}).exceptionally(ex -> {
186188
cf.completeExceptionally(ex);
187189
return null;

0 commit comments

Comments
 (0)