Skip to content

Commit b726422

Browse files
venkatsambathOneSizeFitsQuorum
authored andcommitted
RATIS-2244. Reduce the number of log messages during bootstrap (#1217)
1 parent 6708ab9 commit b726422

File tree

5 files changed

+44
-22
lines changed

5 files changed

+44
-22
lines changed

ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ private void sendRequestWithRetry(PendingOrderedRequest pending) {
213213
final Throwable exception = e;
214214
final String key = client.getId() + "-" + request.getCallId() + "-" + exception;
215215
final Consumer<String> op = suffix -> LOG.error("{} {}: Failed* {}", suffix, client.getId(), request, exception);
216-
BatchLogger.warn(BatchLogKey.SEND_REQUEST_EXCEPTION, key, op);
216+
BatchLogger.print(BatchLogKey.SEND_REQUEST_EXCEPTION, key, op);
217217
handleException(pending, request, e);
218218
return null;
219219
});

ratis-common/src/main/java/org/apache/ratis/util/BatchLogger.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,9 @@ default TimeDuration getBatchDuration() {
4545

4646
private static final class UniqueId {
4747
private final Key key;
48-
private final String name;
48+
private final Object name;
4949

50-
private UniqueId(Key key, String name) {
50+
private UniqueId(Key key, Object name) {
5151
this.key = Objects.requireNonNull(key, "key == null");
5252
this.name = name;
5353
}
@@ -99,15 +99,15 @@ private synchronized boolean tryStartBatch(Consumer<String> op) {
9999
private static final TimeoutExecutor SCHEDULER = TimeoutExecutor.getInstance();
100100
private static final ConcurrentMap<UniqueId, BatchedLogEntry> LOG_CACHE = new ConcurrentHashMap<>();
101101

102-
public static void warn(Key key, String name, Consumer<String> op) {
103-
warn(key, name, op, key.getBatchDuration(), true);
102+
public static void print(Key key, Object name, Consumer<String> op) {
103+
print(key, name, op, key.getBatchDuration(), true);
104104
}
105105

106-
public static void warn(Key key, String name, Consumer<String> op, TimeDuration batchDuration) {
107-
warn(key, name, op, batchDuration, true);
106+
public static void print(Key key, Object name, Consumer<String> op, TimeDuration batchDuration) {
107+
print(key, name, op, batchDuration, true);
108108
}
109109

110-
public static void warn(Key key, String name, Consumer<String> op, TimeDuration batchDuration, boolean shouldBatch) {
110+
public static void print(Key key, Object name, Consumer<String> op, TimeDuration batchDuration, boolean shouldBatch) {
111111
if (!shouldBatch || batchDuration.isNonPositive()) {
112112
op.accept("");
113113
return;

ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ public class GrpcLogAppender extends LogAppenderBase {
6666

6767
private enum BatchLogKey implements BatchLogger.Key {
6868
RESET_CLIENT,
69+
INCONSISTENCY_REPLY,
6970
APPEND_LOG_RESPONSE_HANDLER_ON_ERROR
7071
}
7172

@@ -217,7 +218,7 @@ private void resetClient(AppendEntriesRequest request, Event event) {
217218
.orElseGet(f::getMatchIndex);
218219
if (event.isError() && request == null) {
219220
final long followerNextIndex = f.getNextIndex();
220-
BatchLogger.warn(BatchLogKey.RESET_CLIENT, f.getId() + "-" + followerNextIndex, suffix ->
221+
BatchLogger.print(BatchLogKey.RESET_CLIENT, f.getId() + "-" + followerNextIndex, suffix ->
221222
LOG.warn("{}: Follower failed (request=null, errorCount={}); keep nextIndex ({}) unchanged and retry.{}",
222223
this, errorCount, followerNextIndex, suffix), logMessageBatchDuration);
223224
return;
@@ -516,8 +517,9 @@ private void onNextImpl(AppendEntriesRequest request, AppendEntriesReplyProto re
516517
break;
517518
case INCONSISTENCY:
518519
grpcServerMetrics.onRequestInconsistency(getFollowerId().toString());
519-
LOG.warn("{}: received {} reply with nextIndex {}, errorCount={}, request={}",
520-
this, reply.getResult(), reply.getNextIndex(), errorCount, request);
520+
BatchLogger.print(BatchLogKey.INCONSISTENCY_REPLY, getFollower().getName() + "_" + reply.getNextIndex(),
521+
suffix -> LOG.warn("{}: received {} reply with nextIndex {}, errorCount={}, request={} {}",
522+
this, reply.getResult(), reply.getNextIndex(), errorCount, request, suffix));
521523
final long requestFirstIndex = request != null? request.getFirstIndex(): RaftLog.INVALID_LOG_INDEX;
522524
updateNextIndex(getNextIndexForInconsistency(requestFirstIndex, reply.getNextIndex()));
523525
break;
@@ -537,7 +539,7 @@ public void onError(Throwable t) {
537539
LOG.info("{} is already stopped", GrpcLogAppender.this);
538540
return;
539541
}
540-
BatchLogger.warn(BatchLogKey.APPEND_LOG_RESPONSE_HANDLER_ON_ERROR, AppendLogResponseHandler.this.name,
542+
BatchLogger.print(BatchLogKey.APPEND_LOG_RESPONSE_HANDLER_ON_ERROR, AppendLogResponseHandler.this.name,
541543
suffix -> GrpcUtil.warn(LOG, () -> this + ": Failed appendEntries" + suffix, t),
542544
logMessageBatchDuration, t instanceof StatusRuntimeException);
543545
grpcServerMetrics.onRequestRetry(); // Update try counter

ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
3030
import org.apache.ratis.proto.RaftProtos.*;
3131
import org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceImplBase;
32+
import org.apache.ratis.util.BatchLogger;
33+
import org.apache.ratis.util.MemoizedSupplier;
3234
import org.apache.ratis.util.ProtoUtils;
3335
import org.slf4j.Logger;
3436
import org.slf4j.LoggerFactory;
@@ -43,6 +45,11 @@
4345
class GrpcServerProtocolService extends RaftServerProtocolServiceImplBase {
4446
public static final Logger LOG = LoggerFactory.getLogger(GrpcServerProtocolService.class);
4547

48+
private enum BatchLogKey implements BatchLogger.Key {
49+
COMPLETED_REQUEST,
50+
COMPLETED_REPLY
51+
}
52+
4653
static class PendingServerRequest<REQUEST> {
4754
private final REQUEST request;
4855
private final CompletableFuture<Void> future = new CompletableFuture<>();
@@ -62,6 +69,7 @@ CompletableFuture<Void> getFuture() {
6269

6370
abstract class ServerRequestStreamObserver<REQUEST, REPLY> implements StreamObserver<REQUEST> {
6471
private final RaftServer.Op op;
72+
private final Supplier<String> nameSupplier;
6573
private final StreamObserver<REPLY> responseObserver;
6674
/** For ordered {@link #onNext(Object)} requests. */
6775
private final AtomicReference<PendingServerRequest<REQUEST>> previousOnNext = new AtomicReference<>();
@@ -72,9 +80,14 @@ abstract class ServerRequestStreamObserver<REQUEST, REPLY> implements StreamObse
7280

7381
ServerRequestStreamObserver(RaftServer.Op op, StreamObserver<REPLY> responseObserver) {
7482
this.op = op;
83+
this.nameSupplier = MemoizedSupplier.valueOf(() -> getId() + "_" + op);
7584
this.responseObserver = responseObserver;
7685
}
7786

87+
String getName() {
88+
return nameSupplier.get();
89+
}
90+
7891
private String getPreviousRequestString() {
7992
return Optional.ofNullable(previousOnNext.get())
8093
.map(PendingServerRequest::getRequest)
@@ -154,9 +167,12 @@ public void onNext(REQUEST request) {
154167
@Override
155168
public void onCompleted() {
156169
if (isClosed.compareAndSet(false, true)) {
157-
LOG.info("{}: Completed {}, lastRequest: {}", getId(), op, getPreviousRequestString());
170+
BatchLogger.print(BatchLogKey.COMPLETED_REQUEST, getName(),
171+
suffix -> LOG.info("{}: Completed {}, lastRequest: {} {}",
172+
getId(), op, getPreviousRequestString(), suffix));
158173
requestFuture.get().thenAccept(reply -> {
159-
LOG.info("{}: Completed {}, lastReply: {}", getId(), op, reply);
174+
BatchLogger.print(BatchLogKey.COMPLETED_REPLY, getName(),
175+
suffix -> LOG.info("{}: Completed {}, lastReply: {} {}", getId(), op, reply, suffix));
160176
responseObserver.onCompleted();
161177
});
162178
}

ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.ratis.server.protocol.TermIndex;
3737
import org.apache.ratis.server.raftlog.LogProtoUtils;
3838
import org.apache.ratis.server.util.ServerStringUtils;
39+
import org.apache.ratis.util.BatchLogger;
3940
import org.apache.ratis.util.CodeInjectionForTesting;
4041
import org.apache.ratis.util.LifeCycle;
4142
import org.apache.ratis.util.Preconditions;
@@ -59,6 +60,11 @@
5960
class SnapshotInstallationHandler {
6061
static final Logger LOG = LoggerFactory.getLogger(SnapshotInstallationHandler.class);
6162

63+
private enum BatchLogKey implements BatchLogger.Key {
64+
INSTALL_SNAPSHOT_REQUEST,
65+
INSTALL_SNAPSHOT_REPLY
66+
}
67+
6268
static final TermIndex INVALID_TERM_INDEX = TermIndex.valueOf(0, INVALID_LOG_INDEX);
6369

6470
private final RaftServerImpl server;
@@ -93,21 +99,19 @@ long getInProgressInstallSnapshotIndex() {
9399
}
94100

95101
InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto request) throws IOException {
96-
if (LOG.isInfoEnabled()) {
97-
LOG.info("{}: receive installSnapshot: {}", getMemberId(),
98-
ServerStringUtils.toInstallSnapshotRequestString(request));
99-
}
102+
BatchLogger.print(BatchLogKey.INSTALL_SNAPSHOT_REQUEST, getMemberId(),
103+
suffix -> LOG.info("{}: receive installSnapshot: {} {}",
104+
getMemberId(), ServerStringUtils.toInstallSnapshotRequestString(request), suffix));
100105
final InstallSnapshotReplyProto reply;
101106
try {
102107
reply = installSnapshotImpl(request);
103108
} catch (Exception e) {
104109
LOG.error("{}: installSnapshot failed", getMemberId(), e);
105110
throw e;
106111
}
107-
if (LOG.isInfoEnabled()) {
108-
LOG.info("{}: reply installSnapshot: {}", getMemberId(),
109-
ServerStringUtils.toInstallSnapshotReplyString(reply));
110-
}
112+
BatchLogger.print(BatchLogKey.INSTALL_SNAPSHOT_REPLY, getMemberId(),
113+
suffix -> LOG.info("{}: reply installSnapshot: {} {}",
114+
getMemberId(), ServerStringUtils.toInstallSnapshotReplyString(reply), suffix));
111115
return reply;
112116
}
113117

0 commit comments

Comments
 (0)