Skip to content

Commit 6d7f0dc

Browse files
Merge branch 'dapr:main' into javi-azure-cleanup
2 parents c380f8d + 5094800 commit 6d7f0dc

File tree

4 files changed

+76
-37
lines changed

4 files changed

+76
-37
lines changed

.sdkmanrc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# Enable auto-env through the sdkman_auto_env config
2+
# Add key=value pairs of SDKs to use below
3+
java=11.0.27-tem

client/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ plugins {
1010
}
1111

1212
group 'io.dapr'
13-
version = '1.5.6'
13+
version = '1.5.7'
1414
archivesBaseName = 'durabletask-client'
1515

1616
def grpcVersion = '1.69.0'

client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java

Lines changed: 60 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@
2020
import java.util.logging.Logger;
2121

2222
/**
23-
* Task hub worker that connects to a sidecar process over gRPC to execute orchestrator and activity events.
23+
* Task hub worker that connects to a sidecar process over gRPC to execute
24+
* orchestrator and activity events.
2425
*/
2526
public final class DurableTaskGrpcWorker implements AutoCloseable {
2627

@@ -39,6 +40,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
3940
private final TaskHubSidecarServiceBlockingStub sidecarClient;
4041
private final boolean isExecutorServiceManaged;
4142
private volatile boolean isNormalShutdown = false;
43+
private Thread workerThread;
4244

4345
DurableTaskGrpcWorker(DurableTaskGrpcWorkerBuilder builder) {
4446
this.orchestrationFactories.putAll(builder.orchestrationFactories);
@@ -66,43 +68,58 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
6668

6769
this.sidecarClient = TaskHubSidecarServiceGrpc.newBlockingStub(sidecarGrpcChannel);
6870
this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter();
69-
this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval : DEFAULT_MAXIMUM_TIMER_INTERVAL;
71+
this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval
72+
: DEFAULT_MAXIMUM_TIMER_INTERVAL;
7073
this.workerPool = builder.executorService != null ? builder.executorService : Executors.newCachedThreadPool();
7174
this.isExecutorServiceManaged = builder.executorService == null;
7275
}
7376

7477
/**
75-
* Establishes a gRPC connection to the sidecar and starts processing work-items in the background.
78+
* Establishes a gRPC connection to the sidecar and starts processing work-items
79+
* in the background.
7680
* <p>
77-
* This method retries continuously to establish a connection to the sidecar. If a connection fails,
78-
* a warning log message will be written and a new connection attempt will be made. This process
79-
* continues until either a connection succeeds or the process receives an interrupt signal.
81+
* This method retries continuously to establish a connection to the sidecar. If
82+
* a connection fails,
83+
* a warning log message will be written and a new connection attempt will be
84+
* made. This process
85+
* continues until either a connection succeeds or the process receives an
86+
* interrupt signal.
8087
*/
8188
public void start() {
82-
new Thread(this::startAndBlock).start();
89+
this.workerThread = new Thread(this::startAndBlock);
90+
this.workerThread.start();
8391
}
8492

8593
/**
86-
* Closes the internally managed gRPC channel and executor service, if one exists.
94+
* Closes the internally managed gRPC channel and executor service, if one
95+
* exists.
8796
* <p>
88-
* Only the internally managed GRPC Channel and Executor services are closed. If any of them are supplied,
97+
* Only the internally managed GRPC Channel and Executor services are closed. If
98+
* any of them are supplied,
8999
* it is the responsibility of the supplier to take care of them.
90100
*/
91101
public void close() {
102+
this.workerThread.interrupt();
92103
this.isNormalShutdown = true;
93104
this.shutDownWorkerPool();
94105
this.closeSideCarChannel();
95106
}
96107

97108
/**
98-
* Establishes a gRPC connection to the sidecar and starts processing work-items on the current thread.
99-
* This method call blocks indefinitely, or until the current thread is interrupted.
109+
* Establishes a gRPC connection to the sidecar and starts processing work-items
110+
* on the current thread.
111+
* This method call blocks indefinitely, or until the current thread is
112+
* interrupted.
100113
* <p>
101-
* Use can alternatively use the {@link #start} method to run orchestration processing in a background thread.
114+
* Use can alternatively use the {@link #start} method to run orchestration
115+
* processing in a background thread.
102116
* <p>
103-
* This method retries continuously to establish a connection to the sidecar. If a connection fails,
104-
* a warning log message will be written and a new connection attempt will be made. This process
105-
* continues until either a connection succeeds or the process receives an interrupt signal.
117+
* This method retries continuously to establish a connection to the sidecar. If
118+
* a connection fails,
119+
* a warning log message will be written and a new connection attempt will be
120+
* made. This process
121+
* continues until either a connection succeeds or the process receives an
122+
* interrupt signal.
106123
*/
107124
public void startAndBlock() {
108125
logger.log(Level.INFO, "Durable Task worker is connecting to sidecar at {0}.", this.getSidecarAddress());
@@ -117,7 +134,6 @@ public void startAndBlock() {
117134
this.dataConverter,
118135
logger);
119136

120-
// TODO: How do we interrupt manually?
121137
while (true) {
122138
try {
123139
GetWorkItemsRequest getWorkItemsRequest = GetWorkItemsRequest.newBuilder().build();
@@ -145,11 +161,17 @@ public void startAndBlock() {
145161
this.sidecarClient.completeOrchestratorTask(response);
146162
} catch (StatusRuntimeException e) {
147163
if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
148-
logger.log(Level.WARNING, "The sidecar at address {0} is unavailable while completing the orchestrator task.", this.getSidecarAddress());
164+
logger.log(Level.WARNING,
165+
"The sidecar at address {0} is unavailable while completing the orchestrator task.",
166+
this.getSidecarAddress());
149167
} else if (e.getStatus().getCode() == Status.Code.CANCELLED) {
150-
logger.log(Level.WARNING, "Durable Task worker has disconnected from {0} while completing the orchestrator task.", this.getSidecarAddress());
168+
logger.log(Level.WARNING,
169+
"Durable Task worker has disconnected from {0} while completing the orchestrator task.",
170+
this.getSidecarAddress());
151171
} else {
152-
logger.log(Level.WARNING, "Unexpected failure completing the orchestrator task at {0}.", this.getSidecarAddress());
172+
logger.log(Level.WARNING,
173+
"Unexpected failure completing the orchestrator task at {0}.",
174+
this.getSidecarAddress());
153175
}
154176
}
155177
});
@@ -189,29 +211,35 @@ public void startAndBlock() {
189211
this.sidecarClient.completeActivityTask(responseBuilder.build());
190212
} catch (StatusRuntimeException e) {
191213
if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
192-
logger.log(Level.WARNING, "The sidecar at address {0} is unavailable while completing the activity task.", this.getSidecarAddress());
214+
logger.log(Level.WARNING,
215+
"The sidecar at address {0} is unavailable while completing the activity task.",
216+
this.getSidecarAddress());
193217
} else if (e.getStatus().getCode() == Status.Code.CANCELLED) {
194-
logger.log(Level.WARNING, "Durable Task worker has disconnected from {0} while completing the activity task.", this.getSidecarAddress());
218+
logger.log(Level.WARNING,
219+
"Durable Task worker has disconnected from {0} while completing the activity task.",
220+
this.getSidecarAddress());
195221
} else {
196-
logger.log(Level.WARNING, "Unexpected failure completing the activity task at {0}.", this.getSidecarAddress());
222+
logger.log(Level.WARNING, "Unexpected failure completing the activity task at {0}.",
223+
this.getSidecarAddress());
197224
}
198225
}
199226
});
200-
}
201-
else if (requestType == RequestCase.HEALTHPING)
202-
{
227+
} else if (requestType == RequestCase.HEALTHPING) {
203228
// No-op
204229
} else {
205-
logger.log(Level.WARNING, "Received and dropped an unknown '{0}' work-item from the sidecar.", requestType);
230+
logger.log(Level.WARNING, "Received and dropped an unknown '{0}' work-item from the sidecar.",
231+
requestType);
206232
}
207233
}
208234
} catch (StatusRuntimeException e) {
209235
if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
210-
logger.log(Level.INFO, "The sidecar at address {0} is unavailable. Will continue retrying.", this.getSidecarAddress());
236+
logger.log(Level.INFO, "The sidecar at address {0} is unavailable. Will continue retrying.",
237+
this.getSidecarAddress());
211238
} else if (e.getStatus().getCode() == Status.Code.CANCELLED) {
212239
logger.log(Level.INFO, "Durable Task worker has disconnected from {0}.", this.getSidecarAddress());
213240
} else {
214-
logger.log(Level.WARNING, String.format("Unexpected failure connecting to %s", this.getSidecarAddress()), e);
241+
logger.log(Level.WARNING,
242+
String.format("Unexpected failure connecting to %s", this.getSidecarAddress()), e);
215243
}
216244

217245
// Retry after 5 seconds
@@ -225,7 +253,8 @@ else if (requestType == RequestCase.HEALTHPING)
225253
}
226254

227255
/**
228-
* Stops the current worker's listen loop, preventing any new orchestrator or activity events from being processed.
256+
* Stops the current worker's listen loop, preventing any new orchestrator or
257+
* activity events from being processed.
229258
*/
230259
public void stop() {
231260
this.close();
@@ -246,7 +275,8 @@ private void closeSideCarChannel() {
246275
private void shutDownWorkerPool() {
247276
if (this.isExecutorServiceManaged) {
248277
if (!this.isNormalShutdown) {
249-
logger.log(Level.WARNING, "ExecutorService shutdown initiated unexpectedly. No new tasks will be accepted");
278+
logger.log(Level.WARNING,
279+
"ExecutorService shutdown initiated unexpectedly. No new tasks will be accepted");
250280
}
251281

252282
this.workerPool.shutdown();

client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1183,19 +1183,25 @@ private boolean shouldRetry() {
11831183
boolean shouldRetryBasedOnPolicy = this.policy != null ? this.shouldRetryBasedOnPolicy() : true;
11841184
boolean shouldRetryBasedOnHandler = this.handler != null ? this.handler.handle(retryContext) : true;
11851185

1186-
if (this.policy != null) {
1187-
logger.info(() -> String.format("shouldRetryBasedOnPolicy: %s", shouldRetryBasedOnPolicy));
1188-
}
1186+
// Only log when not replaying, so only the current attempt is logged and not all previous attempts.
1187+
if(!this.context.getIsReplaying()) {
1188+
if (this.policy != null) {
1189+
logger.fine(() -> String.format("shouldRetryBasedOnPolicy: %s", shouldRetryBasedOnPolicy));
1190+
}
11891191

1190-
if (this.handler != null) {
1191-
logger.info(() -> String.format("shouldRetryBasedOnHandler: %s", shouldRetryBasedOnHandler));
1192+
if (this.handler != null) {
1193+
logger.fine(() -> String.format("shouldRetryBasedOnHandler: %s", shouldRetryBasedOnHandler));
1194+
}
11921195
}
11931196

11941197
return shouldRetryBasedOnPolicy && shouldRetryBasedOnHandler;
11951198
}
11961199

11971200
private boolean shouldRetryBasedOnPolicy() {
1198-
logger.warning(() -> String.format("Retry Policy: %d retries out of total %d performed ", this.attemptNumber, this.policy.getMaxNumberOfAttempts()));
1201+
// Only log when not replaying, so only the current attempt is logged and not all previous attempts.
1202+
if(!this.context.getIsReplaying()) {
1203+
logger.fine(() -> String.format("Retry Policy: %d retries out of total %d performed ", this.attemptNumber, this.policy.getMaxNumberOfAttempts()));
1204+
}
11991205

12001206
if (this.attemptNumber >= this.policy.getMaxNumberOfAttempts()) {
12011207
// Max number of attempts exceeded

0 commit comments

Comments
 (0)