Skip to content

Commit 8966c9b

Browse files
turboFeiSteNicholas
authored andcommitted
[CELEBORN-2208] Log the partition reader wait time if exceeds the threshold
### What changes were proposed in this pull request? Log the partition reader wait time if exceeds the threshold. ### Why are the changes needed? Now I see the task shuffle read wait time is very long, however there is no task log to indicate the slowness. <img width="1702" height="130" alt="image" src="https://github.com/user-attachments/assets/47973563-13c7-4178-8954-3d3a23181a02" /> <img width="1104" height="425" alt="image" src="https://github.com/user-attachments/assets/864448a8-de6b-47da-bb54-75b7b2f8a0c4" /> ### Does this PR resolve a correctness bug? No. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Code review. Closes #3544 from turboFei/log_time. Authored-by: Wang, Fei <[email protected]> Signed-off-by: SteNicholas <[email protected]>
1 parent 5d6ce21 commit 8966c9b

File tree

5 files changed

+89
-6
lines changed

5 files changed

+89
-6
lines changed

client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ public class DfsPartitionReader implements PartitionReader {
7575
private TransportClient client;
7676
private PbStreamHandler streamHandler;
7777
private MetricsCallback metricsCallback;
78+
private long partitionReaderWaitLogThreshold;
7879
private FileSystem hadoopFs;
7980

8081
private Path dataFilePath;
@@ -100,6 +101,7 @@ public DfsPartitionReader(
100101
results = new LinkedBlockingQueue<>();
101102

102103
this.metricsCallback = metricsCallback;
104+
this.partitionReaderWaitLogThreshold = conf.clientPartitionReaderWaitLogThreshold();
103105
this.location = location;
104106
if (location.getStorageInfo() != null
105107
&& location.getStorageInfo().getType() == StorageInfo.Type.S3) {
@@ -290,14 +292,36 @@ public ByteBuf next() throws Exception {
290292
});
291293
}
292294
try {
295+
long totalWaitTimeMs = 0;
296+
long lastLogTimeMs = 0;
297+
293298
while (chunk == null) {
294299
checkException();
295300
Long startFetchWait = System.nanoTime();
296301
chunk = results.poll(500, TimeUnit.MILLISECONDS);
297-
metricsCallback.incReadTime(
298-
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startFetchWait));
302+
long waitTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startFetchWait);
303+
metricsCallback.incReadTime(waitTimeMs);
304+
totalWaitTimeMs += waitTimeMs;
305+
// Log when wait time exceeds another threshold since last log
306+
if (chunk == null && totalWaitTimeMs >= lastLogTimeMs + partitionReaderWaitLogThreshold) {
307+
lastLogTimeMs = totalWaitTimeMs;
308+
logger.info(
309+
"Waiting for data from partition {}/{} for {}ms",
310+
location.getFileName(),
311+
location.hostAndPorts(),
312+
totalWaitTimeMs);
313+
}
314+
299315
logger.debug("poll result with result size: {}", results.size());
300316
}
317+
318+
if (totalWaitTimeMs >= partitionReaderWaitLogThreshold) {
319+
logger.info(
320+
"Finished waiting for data from partition {}/{} after {}ms",
321+
location.getFileName(),
322+
location.hostAndPorts(),
323+
totalWaitTimeMs);
324+
}
301325
} catch (Exception e) {
302326
logger.error("PartitionReader thread interrupted while fetching data.");
303327
throw e;

client/src/main/java/org/apache/celeborn/client/read/LocalPartitionReader.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ public class LocalPartitionReader implements PartitionReader {
6969
private PbStreamHandler streamHandler;
7070
private TransportClient client;
7171
private MetricsCallback metricsCallback;
72+
private long partitionReaderWaitLogThreshold;
7273
private int startChunkIndex;
7374
private int endChunkIndex;
7475

@@ -98,6 +99,7 @@ public LocalPartitionReader(
9899
results = new LinkedBlockingQueue<>();
99100
this.location = location;
100101
this.metricsCallback = metricsCallback;
102+
this.partitionReaderWaitLogThreshold = conf.clientPartitionReaderWaitLogThreshold();
101103
long fetchTimeoutMs = conf.clientFetchTimeoutMs();
102104
try {
103105
client = clientFactory.createClient(location.getHost(), location.getFetchPort(), 0);
@@ -219,14 +221,35 @@ public ByteBuf next() throws IOException, InterruptedException {
219221
}
220222
ByteBuf chunk = null;
221223
try {
224+
long totalWaitTimeMs = 0;
225+
long lastLogTimeMs = 0;
226+
222227
while (chunk == null) {
223228
checkException();
224229
Long startFetchWait = System.nanoTime();
225230
chunk = results.poll(100, TimeUnit.MILLISECONDS);
226-
metricsCallback.incReadTime(
227-
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startFetchWait));
231+
long waitTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startFetchWait);
232+
metricsCallback.incReadTime(waitTimeMs);
233+
totalWaitTimeMs += waitTimeMs;
234+
// Log when wait time exceeds another threshold since last log
235+
if (chunk == null && totalWaitTimeMs >= lastLogTimeMs + partitionReaderWaitLogThreshold) {
236+
lastLogTimeMs = totalWaitTimeMs;
237+
logger.info(
238+
"Waiting for data from partition {}/{} for {}ms",
239+
location.getFileName(),
240+
location.hostAndPorts(),
241+
totalWaitTimeMs);
242+
}
228243
logger.debug("Poll result with result size: {}", results.size());
229244
}
245+
246+
if (totalWaitTimeMs >= partitionReaderWaitLogThreshold) {
247+
logger.info(
248+
"Finished waiting for data from partition {}/{} after {}ms",
249+
location.getFileName(),
250+
location.hostAndPorts(),
251+
totalWaitTimeMs);
252+
}
230253
} catch (InterruptedException e) {
231254
logger.error("PartitionReader thread interrupted while fetching data.");
232255
throw e;

client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ public class WorkerPartitionReader implements PartitionReader {
5454
private PbStreamHandler streamHandler;
5555
private TransportClient client;
5656
private MetricsCallback metricsCallback;
57+
private long partitionReaderWaitLogThreshold;
5758

5859
private int lastReturnedChunkId = -1;
5960
private int returnedChunks;
@@ -101,6 +102,7 @@ public class WorkerPartitionReader implements PartitionReader {
101102
pollChunkWaitTime = conf.clientFetchPollChunkWaitTime();
102103
inflightRequestCount = 0;
103104
this.metricsCallback = metricsCallback;
105+
this.partitionReaderWaitLogThreshold = conf.clientPartitionReaderWaitLogThreshold();
104106
// only add the buffer to results queue if this reader is not closed.
105107
callback =
106108
new ChunkReceivedCallback() {
@@ -191,12 +193,34 @@ public ByteBuf next() throws IOException, InterruptedException {
191193
}
192194
Pair<Integer, ByteBuf> chunk = null;
193195
try {
196+
long totalWaitTimeMs = 0;
197+
long lastLogTimeMs = 0;
198+
194199
while (chunk == null) {
195200
checkException();
196201
Long startFetchWait = System.nanoTime();
197202
chunk = results.poll(pollChunkWaitTime, TimeUnit.MILLISECONDS);
198-
metricsCallback.incReadTime(
199-
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startFetchWait));
203+
long waitTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startFetchWait);
204+
metricsCallback.incReadTime(waitTimeMs);
205+
totalWaitTimeMs += waitTimeMs;
206+
// Log when wait time exceeds another threshold since last log
207+
if (chunk == null && totalWaitTimeMs >= lastLogTimeMs + partitionReaderWaitLogThreshold) {
208+
lastLogTimeMs = totalWaitTimeMs;
209+
logger.info(
210+
"Waiting for data from partition {}/{} for {}ms",
211+
location.getFileName(),
212+
location.hostAndPorts(),
213+
totalWaitTimeMs);
214+
}
215+
logger.debug("poll result with result size: {}", results.size());
216+
}
217+
218+
if (totalWaitTimeMs >= partitionReaderWaitLogThreshold) {
219+
logger.info(
220+
"Finished waiting for data from partition {}/{} after {}ms",
221+
location.getFileName(),
222+
location.hostAndPorts(),
223+
totalWaitTimeMs);
200224
}
201225
} catch (InterruptedException e) {
202226
logger.error("PartitionReader thread interrupted while polling data.");

common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1025,6 +1025,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
10251025
def clientFetchMaxReqsInFlight: Int = get(CLIENT_FETCH_MAX_REQS_IN_FLIGHT)
10261026
def isPartitionReaderCheckpointEnabled: Boolean =
10271027
get(PARTITION_READER_CHECKPOINT_ENABLED)
1028+
def clientPartitionReaderWaitLogThreshold: Long =
1029+
get(PARTITION_READER_WAIT_LOG_THRESHOLD)
10281030

10291031
def clientFetchMaxRetriesForEachReplica: Int = get(CLIENT_FETCH_MAX_RETRIES_FOR_EACH_REPLICA)
10301032
def clientStageRerunEnabled: Boolean = get(CLIENT_STAGE_RERUN_ENABLED)
@@ -5034,6 +5036,15 @@ object CelebornConf extends Logging {
50345036
.booleanConf
50355037
.createWithDefault(false)
50365038

5039+
val PARTITION_READER_WAIT_LOG_THRESHOLD: ConfigEntry[Long] =
5040+
buildConf("celeborn.client.partition.reader.waitLog.threshold")
5041+
.categories("client")
5042+
.version("0.6.2")
5043+
.doc("The threshold in milliseconds for logging partition read wait time. " +
5044+
"Log messages will be generated when wait time exceeds multiples of this threshold.")
5045+
.timeConf(TimeUnit.MILLISECONDS)
5046+
.createWithDefaultString("60s")
5047+
50375048
val CLIENT_FETCH_MAX_REQS_IN_FLIGHT: ConfigEntry[Int] =
50385049
buildConf("celeborn.client.fetch.maxReqsInFlight")
50395050
.withAlternative("celeborn.fetch.maxReqsInFlight")

docs/configuration/client.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ license: |
5252
| celeborn.client.inputStream.creation.window | 16 | false | Window size that CelebornShuffleReader pre-creates CelebornInputStreams, for coalesced scenario where multiple Partitions are read | 0.5.1 | |
5353
| celeborn.client.mr.pushData.max | 32m | false | Max size for a push data sent from mr client. | 0.4.0 | |
5454
| celeborn.client.partition.reader.checkpoint.enabled | false | false | Whether or not checkpoint reads when re-creating a partition reader. Setting to true minimizes the amount of unnecessary reads during partition read retries | 0.6.0 | |
55+
| celeborn.client.partition.reader.waitLog.threshold | 60s | false | The threshold in milliseconds for logging partition read wait time. Log messages will be generated when wait time exceeds multiples of this threshold. | 0.6.2 | |
5556
| celeborn.client.push.buffer.initial.size | 8k | false | | 0.3.0 | celeborn.push.buffer.initial.size |
5657
| celeborn.client.push.buffer.max.size | 64k | false | Max size of reducer partition buffer memory for shuffle hash writer. The pushed data will be buffered in memory before sending to Celeborn worker. For performance consideration keep this buffer size higher than 32K. Example: If reducer amount is 2000, buffer size is 64K, then each task will consume up to `64KiB * 2000 = 125MiB` heap memory. | 0.3.0 | celeborn.push.buffer.max.size |
5758
| celeborn.client.push.excludeWorkerOnFailure.enabled | false | false | Whether to enable shuffle client-side push exclude workers on failures. | 0.3.0 | |

0 commit comments

Comments
 (0)