Skip to content

Commit 0558399

Browse files
zustonJunfan Zhang
andcommitted
[apache#291] feat(client): Record background fetch time for prefetch (apache#2366)
### What changes were proposed in this pull request? Followup apache#2365 . Record the background fetch time to compare with the real read time to get the overlapping time to know more about the prefetch performance ### Why are the changes needed? To know about the performance for online spark jobs ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Needn't --------- Co-authored-by: Junfan Zhang <zhangjunfan@qiyi.com>
1 parent 6fee8e5 commit 0558399

File tree

1 file changed

+15
-0
lines changed

1 file changed

+15
-0
lines changed

storage/src/main/java/org/apache/uniffle/storage/handler/impl/PrefetchableClientReadHandler.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.concurrent.TimeUnit;
2525
import java.util.concurrent.atomic.AtomicBoolean;
2626
import java.util.concurrent.atomic.AtomicInteger;
27+
import java.util.concurrent.atomic.AtomicLong;
2728

2829
import org.slf4j.Logger;
2930
import org.slf4j.LoggerFactory;
@@ -42,6 +43,7 @@ public abstract class PrefetchableClientReadHandler extends AbstractClientReadHa
4243
private AtomicBoolean abnormalFetchTag;
4344
private AtomicBoolean finishedTag;
4445
private AtomicInteger queueingNumber;
46+
private AtomicLong fetchTime;
4547

4648
public PrefetchableClientReadHandler(Optional<PrefetchOption> prefetchOptional) {
4749
if (prefetchOptional.isPresent()) {
@@ -59,6 +61,7 @@ public PrefetchableClientReadHandler(Optional<PrefetchOption> prefetchOptional)
5961
this.abnormalFetchTag = new AtomicBoolean(false);
6062
this.finishedTag = new AtomicBoolean(false);
6163
this.queueingNumber = new AtomicInteger(0);
64+
this.fetchTime = new AtomicLong(0);
6265
} else {
6366
this.prefetchEnabled = false;
6467
}
@@ -87,6 +90,7 @@ public ShuffleDataResult readShuffleData() {
8790
queueingNumber.incrementAndGet();
8891
prefetchExecutors.submit(
8992
() -> {
93+
long start = System.currentTimeMillis();
9094
try {
9195
if (abnormalFetchTag.get() || finishedTag.get()) {
9296
return;
@@ -101,6 +105,7 @@ public ShuffleDataResult readShuffleData() {
101105
LOG.error("Errors on doing readShuffleData", e);
102106
} finally {
103107
queueingNumber.decrementAndGet();
108+
fetchTime.addAndGet(System.currentTimeMillis() - start);
104109
}
105110
});
106111
}
@@ -138,4 +143,14 @@ public void close() {
138143
prefetchExecutors.shutdown();
139144
}
140145
}
146+
147+
@Override
148+
public void logConsumedBlockInfo() {
149+
LOG.info(
150+
"Metrics for shuffleId[{}], partitionId[{}], background fetch cost {} ms",
151+
shuffleId,
152+
partitionId,
153+
fetchTime);
154+
super.logConsumedBlockInfo();
155+
}
141156
}

0 commit comments

Comments
 (0)