-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Fix request processing scheduling #127464
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
de2f695
a5fde53
7780ed7
3425c26
b2dd3da
90d297c
733e5c8
8a0dcc6
b193ec8
95ed34f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -265,9 +265,8 @@ void onAfter(DriverCompletionInfo info) { | |
| concurrentRequests.release(); | ||
| } | ||
|
|
||
| if (pendingRetries.isEmpty() == false && remainingUnavailableShardResolutionAttempts.decrementAndGet() >= 0) { | ||
| if (sendingLock.isHeldByCurrentThread()) { | ||
| try { | ||
| sendingLock.lock(); | ||
| var resolutions = resolveShards(pendingRetries); | ||
| for (var entry : resolutions.entrySet()) { | ||
| targetShards.shards.get(entry.getKey()).remainingNodes.addAll(entry.getValue()); | ||
|
|
@@ -292,8 +291,8 @@ public void onResponse(DataNodeComputeResponse response) { | |
| for (var entry : response.shardLevelFailures().entrySet()) { | ||
| final ShardId shardId = entry.getKey(); | ||
| trackShardLevelFailure(shardId, false, entry.getValue()); | ||
| pendingShardIds.add(shardId); | ||
| maybeScheduleRetry(shardId, false, entry.getValue()); | ||
| pendingShardIds.add(shardId); | ||
| } | ||
| onAfter(response.completionInfo()); | ||
| } | ||
|
|
@@ -302,8 +301,8 @@ public void onResponse(DataNodeComputeResponse response) { | |
| public void onFailure(Exception e, boolean receivedData) { | ||
| for (ShardId shardId : request.shardIds) { | ||
| trackShardLevelFailure(shardId, receivedData, e); | ||
| pendingShardIds.add(shardId); | ||
| maybeScheduleRetry(shardId, receivedData, e); | ||
| pendingShardIds.add(shardId); | ||
| } | ||
| onAfter(DriverCompletionInfo.EMPTY); | ||
| } | ||
|
|
@@ -322,6 +321,9 @@ private void maybeScheduleRetry(ShardId shardId, boolean receivedData, Exception | |
| if (receivedData == false | ||
| && targetShards.getShard(shardId).remainingNodes.isEmpty() | ||
| && unwrapFailure(shardId, e) instanceof NoShardAvailableActionException) { | ||
| if (pendingRetries.isEmpty() && remainingUnavailableShardResolutionAttempts.decrementAndGet() >= 0) { | ||
| sendingLock.lock(); | ||
|
||
| } | ||
| pendingRetries.add(shardId); | ||
| } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think
isHeldByCurrentThreadshould be used for assertions or debugging purposes, not in production code.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. let me find a way to replace it.