Skip to content

Commit 0b7d1b7

Browse files
authored
Allow Data Converter code to escape deadlock detection (#1723)
Issue #1301
1 parent c0b3f6b commit 0b7d1b7

File tree

15 files changed

+438
-146
lines changed

15 files changed

+438
-146
lines changed

build.gradle

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ plugins {
99
id 'net.ltgt.errorprone' version '3.0.1' apply false
1010
id 'org.cadixdev.licenser' version '0.6.1'
1111
id 'com.palantir.git-version' version "${palantirGitVersionVersion}" apply false
12-
id 'io.github.gradle-nexus.publish-plugin' version '1.2.0'
12+
id 'io.github.gradle-nexus.publish-plugin' version '1.3.0'
1313
id 'com.diffplug.spotless' version '6.17.0' apply false
1414
id 'com.github.nbaztec.coveralls-jacoco' version "1.2.15" apply false
1515

@@ -30,7 +30,7 @@ allprojects {
3030

3131
ext {
3232
// Platforms
33-
grpcVersion = '1.53.0' // [1.38.0,) Needed for io.grpc.protobuf.services.HealthStatusManager
33+
grpcVersion = '1.54.0' // [1.38.0,) Needed for io.grpc.protobuf.services.HealthStatusManager
3434
jacksonVersion = '2.14.2' // [2.9.0,)
3535
// we don't upgrade to 1.10.x because it requires kotlin 1.6. Users may use 1.10.x in their environments though.
3636
micrometerVersion = project.hasProperty("edgeDepsTest") ? '1.10.5' : '1.9.9' // [1.0.0,)
@@ -52,7 +52,7 @@ ext {
5252

5353
jsonPathVersion = '2.7.0' // compileOnly
5454

55-
cronUtilsVersion = '9.2.0' // for test server only
55+
cronUtilsVersion = '9.2.1' // for test server only
5656

5757
// Spring Boot 3 requires Java 17, java-sdk builds against 2.x version because we support Java 8.
5858
// We do test compatibility with Spring Boot 3 in integration tests.

temporal-sdk/src/test/java/io/temporal/client/functional/common/TestWorkflows.java renamed to temporal-sdk/src/main/java/io/temporal/internal/common/NonIdempotentHandle.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,15 @@
1818
* limitations under the License.
1919
*/
2020

21-
package io.temporal.client.functional.common;
21+
package io.temporal.internal.common;
2222

23-
import io.temporal.workflow.WorkflowInterface;
24-
import io.temporal.workflow.WorkflowMethod;
25-
26-
public class TestWorkflows {
27-
@WorkflowInterface
28-
public interface PrimitiveWorkflow {
29-
@WorkflowMethod
30-
void execute();
31-
}
23+
/**
24+
* This interface signifies that the implementation of {@link #close()} method may and likely is not
25+
* idempotent, which is in agreement with {@link AutoCloseable#close()} contract. It also narrows
26+
* {@link AutoCloseable#close()} contract to not throw any checked exceptions, making it more
27+
* convenient to use in try-with-resources blocks.
28+
*/
29+
public interface NonIdempotentHandle extends AutoCloseable {
30+
@Override
31+
void close();
3232
}

temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@
3838
import io.temporal.common.metadata.POJOWorkflowImplMetadata;
3939
import io.temporal.common.metadata.POJOWorkflowInterfaceMetadata;
4040
import io.temporal.common.metadata.POJOWorkflowMethodMetadata;
41+
import io.temporal.internal.WorkflowThreadMarker;
4142
import io.temporal.internal.common.ActivityOptionUtils;
43+
import io.temporal.internal.common.NonIdempotentHandle;
4244
import io.temporal.internal.common.SearchAttributesUtil;
4345
import io.temporal.internal.logging.ReplayAwareLogger;
4446
import io.temporal.serviceclient.CheckedExceptionWrapper;
@@ -395,14 +397,6 @@ public static <R> R executeActivity(
395397
return result.get();
396398
}
397399

398-
private static WorkflowOutboundCallsInterceptor getWorkflowOutboundInterceptor() {
399-
return getRootWorkflowContext().getWorkflowOutboundInterceptor();
400-
}
401-
402-
static SyncWorkflowContext getRootWorkflowContext() {
403-
return DeterministicRunnerImpl.currentThreadInternal().getWorkflowContext();
404-
}
405-
406400
public static void await(String reason, Supplier<Boolean> unblockCondition)
407401
throws DestroyWorkflowThreadError {
408402
getWorkflowOutboundInterceptor().await(reason, unblockCondition);
@@ -464,19 +458,12 @@ public static Throwable unwrap(Throwable e) {
464458
return CheckedExceptionWrapper.unwrap(e);
465459
}
466460

467-
/** Prohibit instantiation */
468-
private WorkflowInternal() {}
469-
470461
/** Returns false if not under workflow code. */
471462
public static boolean isReplaying() {
472463
Optional<WorkflowThread> thread = DeterministicRunnerImpl.currentThreadInternalIfPresent();
473464
return thread.isPresent() && getRootWorkflowContext().isReplaying();
474465
}
475466

476-
public static WorkflowInfo getWorkflowInfo() {
477-
return new WorkflowInfoImpl(getRootWorkflowContext().getReplayContext());
478-
}
479-
480467
public static <T> T getMemo(String key, Class<T> valueClass, Type genericType) {
481468
Payload memo = getRootWorkflowContext().getReplayContext().getMemo(key);
482469
if (memo == null) {
@@ -520,6 +507,24 @@ public static void sleep(Duration duration) {
520507
getWorkflowOutboundInterceptor().sleep(duration);
521508
}
522509

510+
public static boolean isWorkflowThread() {
511+
return WorkflowThreadMarker.isWorkflowThread();
512+
}
513+
514+
public static <T> T deadlockDetectorOff(Functions.Func<T> func) {
515+
if (isWorkflowThread()) {
516+
try (NonIdempotentHandle ignored = getWorkflowThread().lockDeadlockDetector()) {
517+
return func.apply();
518+
}
519+
} else {
520+
return func.apply();
521+
}
522+
}
523+
524+
public static WorkflowInfo getWorkflowInfo() {
525+
return new WorkflowInfoImpl(getRootWorkflowContext().getReplayContext());
526+
}
527+
523528
public static Scope getMetricsScope() {
524529
return getRootWorkflowContext().getMetricsScope();
525530
}
@@ -614,4 +619,19 @@ public static Optional<Exception> getPreviousRunFailure() {
614619
// converter
615620
.map(f -> getDataConverter().failureToException(f));
616621
}
622+
623+
private static WorkflowOutboundCallsInterceptor getWorkflowOutboundInterceptor() {
624+
return getRootWorkflowContext().getWorkflowOutboundInterceptor();
625+
}
626+
627+
static SyncWorkflowContext getRootWorkflowContext() {
628+
return DeterministicRunnerImpl.currentThreadInternal().getWorkflowContext();
629+
}
630+
631+
private static WorkflowThread getWorkflowThread() {
632+
return DeterministicRunnerImpl.currentThreadInternal();
633+
}
634+
635+
/** Prohibit instantiation */
636+
private WorkflowInternal() {}
617637
}

temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowThread.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static io.temporal.internal.sync.DeterministicRunnerImpl.currentThreadInternal;
2424

2525
import io.temporal.failure.CanceledFailure;
26+
import io.temporal.internal.common.NonIdempotentHandle;
2627
import io.temporal.workflow.CancellationScope;
2728
import java.util.Optional;
2829
import java.util.concurrent.Future;
@@ -90,6 +91,13 @@ static WorkflowThread newThread(Runnable runnable, boolean detached, String name
9091
*/
9192
boolean runUntilBlocked(long deadlockDetectionTimeoutMs);
9293

94+
/**
95+
* Disables deadlock detector on this thread
96+
*
97+
* @return a handle that must be used to unlock the deadlock detector back
98+
*/
99+
NonIdempotentHandle lockDeadlockDetector();
100+
93101
Throwable getUnhandledException();
94102

95103
boolean isDone();

0 commit comments

Comments
 (0)