Skip to content

Commit a651b0b

Browse files
committed
Add context to rpc audit log to output necessary context
1 parent e62fe7c commit a651b0b

File tree

3 files changed

+45
-2
lines changed

3 files changed

+45
-2
lines changed

common/src/main/java/org/apache/uniffle/common/audit/RpcAuditContext.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,33 @@
1818
package org.apache.uniffle.common.audit;
1919

2020
import java.io.Closeable;
21+
import java.util.Optional;
2122

2223
import org.slf4j.Logger;
2324

2425
import org.apache.uniffle.common.rpc.StatusCode;
2526

2627
/** Context for rpc audit logging. */
2728
public abstract class RpcAuditContext implements Closeable {
29+
private static final ThreadLocal<RpcAuditContext> RPC_AUDIT_CONTEXT_THREAD_LOCAL =
30+
new ThreadLocal<>();
2831
private final Logger log;
2932
private String command;
3033
private String statusCode;
3134
private String args;
3235
private String returnValue;
36+
private String context;
3337
private String from;
3438
private long creationTimeNs;
3539
protected long executionTimeNs;
3640

3741
public RpcAuditContext(Logger log) {
3842
this.log = log;
43+
RPC_AUDIT_CONTEXT_THREAD_LOCAL.set(this);
44+
}
45+
46+
public static final Optional<RpcAuditContext> getRpcAuditContext() {
47+
return Optional.ofNullable(RPC_AUDIT_CONTEXT_THREAD_LOCAL.get());
3948
}
4049

4150
protected abstract String content();
@@ -119,6 +128,21 @@ public RpcAuditContext withFrom(String from) {
119128
return this;
120129
}
121130

131+
/**
132+
* Sets context field, context can be concat by invoke multiply time.
133+
*
134+
* @param contextPart the new context part
135+
* @return this {@link RpcAuditContext} instance
136+
*/
137+
public RpcAuditContext withContext(String contextPart) {
138+
if (context == null) {
139+
context = contextPart;
140+
} else {
141+
this.context += ", " + contextPart;
142+
}
143+
return this;
144+
}
145+
122146
@Override
123147
public void close() {
124148
if (log == null) {
@@ -140,6 +164,9 @@ public String toString() {
140164
if (returnValue != null) {
141165
line += String.format("\treturn{%s}", returnValue);
142166
}
167+
if (context != null) {
168+
line += String.format("\tcontext{%s}", context);
169+
}
143170
return line;
144171
}
145172
}

server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -806,6 +806,8 @@ public void reportShuffleResult(
806806
appId,
807807
shuffleId);
808808
}
809+
auditContext.withContext("updatedBlockCount=" + updatedBlockCount);
810+
auditContext.withContext("expectedBlockCount=" + expectedBlockCount);
809811
} catch (Exception e) {
810812
status = StatusCode.INTERNAL_ERROR;
811813
msg = "error happened when report shuffle result, check shuffle server for detail";
@@ -878,6 +880,7 @@ public void getShuffleResult(
878880
}
879881

880882
auditContext.withStatusCode(status);
883+
auditContext.withReturnValue("serializedBlockIdsBytes=" + serializedBlockIdsBytes.size());
881884
reply =
882885
GetShuffleResultResponse.newBuilder()
883886
.setStatus(status.toProto())
@@ -906,7 +909,7 @@ public void getShuffleResultForMultiPart(
906909

907910
auditContext.withAppId(appId).withShuffleId(shuffleId);
908911
auditContext.withArgs(
909-
"partitionsListSize=" + partitionsList.size() + ", blockIdLayout=" + blockIdLayout);
912+
"partitionsList=" + partitionsList + ", blockIdLayout=" + blockIdLayout);
910913

911914
StatusCode status = verifyRequest(appId);
912915
if (status != StatusCode.SUCCESS) {
@@ -947,6 +950,7 @@ public void getShuffleResultForMultiPart(
947950
LOG.error("Error happened when get shuffle result for {}", requestInfo, e);
948951
}
949952

953+
auditContext.withReturnValue("serializedBlockIdsBytes=" + serializedBlockIdsBytes.size());
950954
auditContext.withStatusCode(status);
951955
reply =
952956
GetShuffleResultForMultiPartResponse.newBuilder()

server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.apache.uniffle.common.ShuffleIndexResult;
5959
import org.apache.uniffle.common.ShufflePartitionedBlock;
6060
import org.apache.uniffle.common.ShufflePartitionedData;
61+
import org.apache.uniffle.common.audit.RpcAuditContext;
6162
import org.apache.uniffle.common.config.RssBaseConf;
6263
import org.apache.uniffle.common.exception.FileNotFoundException;
6364
import org.apache.uniffle.common.exception.InvalidRequestException;
@@ -641,9 +642,20 @@ public byte[] getFinishedBlockIds(
641642
Roaring64NavigableMap res = Roaring64NavigableMap.bitmapOf();
642643
for (Map.Entry<Integer, Set<Integer>> entry : bitmapIndexToPartitions.entrySet()) {
643644
Set<Integer> requestPartitions = entry.getValue();
644-
Roaring64NavigableMap bitmap = blockIds[entry.getKey()];
645+
int bitmapIndex = entry.getKey();
646+
Roaring64NavigableMap bitmap = blockIds[bitmapIndex];
645647
getBlockIdsByPartitionId(requestPartitions, bitmap, res, blockIdLayout);
648+
RpcAuditContext.getRpcAuditContext()
649+
.ifPresent(
650+
context ->
651+
context.withContext(
652+
String.format(
653+
"bitmap[%d].<size,byte>=<%d,%d>",
654+
bitmapIndex, bitmap.getLongCardinality(), bitmap.getLongSizeInBytes())));
646655
}
656+
RpcAuditContext.getRpcAuditContext()
657+
.ifPresent(
658+
context -> context.withContext("partitionBlockCount=" + res.getLongCardinality()));
647659

648660
if (res.getLongCardinality() != expectedBlockNumber) {
649661
throw new RssException(

0 commit comments

Comments
 (0)