Skip to content

Commit b92c97d

Browse files
Workflow-friendly concurrency primitives (#2133)
Workflow-friendly concurrency primitives
1 parent bbf2de7 commit b92c97d

File tree

8 files changed

+1108
-2
lines changed

8 files changed

+1108
-2
lines changed

temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,14 @@ public static <E> WorkflowQueue<E> newWorkflowQueue(int capacity) {
109109
return new WorkflowQueueImpl<>(capacity);
110110
}
111111

112+
public static WorkflowLock newWorkflowLock() {
113+
return new WorkflowLockImpl();
114+
}
115+
116+
public static WorkflowSemaphore newWorkflowSemaphore(int permits) {
117+
return new WorkflowSemaphoreImpl(permits);
118+
}
119+
112120
public static <E> CompletablePromise<E> newCompletablePromise() {
113121
return new CompletablePromiseImpl<>();
114122
}
@@ -479,13 +487,13 @@ public static <R> R executeActivity(
479487

480488
public static void await(String reason, Supplier<Boolean> unblockCondition)
481489
throws DestroyWorkflowThreadError {
482-
assertNotReadOnly("await");
490+
assertNotReadOnly(reason);
483491
getWorkflowOutboundInterceptor().await(reason, unblockCondition);
484492
}
485493

486494
public static boolean await(Duration timeout, String reason, Supplier<Boolean> unblockCondition)
487495
throws DestroyWorkflowThreadError {
488-
assertNotReadOnly("await with timeout");
496+
assertNotReadOnly(reason);
489497
return getWorkflowOutboundInterceptor().await(timeout, reason, unblockCondition);
490498
}
491499

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.internal.sync;
22+
23+
import static io.temporal.internal.sync.WorkflowInternal.assertNotReadOnly;
24+
25+
import com.google.common.base.Preconditions;
26+
import io.temporal.workflow.CancellationScope;
27+
import io.temporal.workflow.WorkflowLock;
28+
import java.time.Duration;
29+
30+
class WorkflowLockImpl implements WorkflowLock {
31+
private boolean locked = false;
32+
33+
@Override
34+
public void lock() {
35+
WorkflowInternal.await(
36+
"WorkflowLock.lock",
37+
() -> {
38+
CancellationScope.throwCanceled();
39+
return !locked;
40+
});
41+
locked = true;
42+
}
43+
44+
@Override
45+
public boolean tryLock() {
46+
assertNotReadOnly("WorkflowLock.tryLock");
47+
if (!locked) {
48+
locked = true;
49+
return true;
50+
}
51+
return false;
52+
}
53+
54+
@Override
55+
public boolean tryLock(Duration timeout) {
56+
boolean unlocked =
57+
WorkflowInternal.await(
58+
timeout,
59+
"WorkflowLock.tryLock",
60+
() -> {
61+
CancellationScope.throwCanceled();
62+
return !locked;
63+
});
64+
if (unlocked) {
65+
locked = true;
66+
return true;
67+
}
68+
return false;
69+
}
70+
71+
@Override
72+
public void unlock() {
73+
assertNotReadOnly("WorkflowLock.unlock");
74+
Preconditions.checkState(locked, "WorkflowLock.unlock called when not locked");
75+
locked = false;
76+
}
77+
78+
@Override
79+
public boolean isHeld() {
80+
return locked;
81+
}
82+
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.internal.sync;
22+
23+
import static io.temporal.internal.sync.WorkflowInternal.assertNotReadOnly;
24+
25+
import com.google.common.base.Preconditions;
26+
import io.temporal.workflow.CancellationScope;
27+
import io.temporal.workflow.WorkflowSemaphore;
28+
import java.time.Duration;
29+
30+
class WorkflowSemaphoreImpl implements WorkflowSemaphore {
31+
private int currentPermits;
32+
33+
public WorkflowSemaphoreImpl(int permits) {
34+
this.currentPermits = permits;
35+
}
36+
37+
@Override
38+
public void acquire() {
39+
acquire(1);
40+
}
41+
42+
@Override
43+
public void acquire(int permits) {
44+
Preconditions.checkArgument(
45+
permits >= 0, "WorkflowSemaphore.acquire called with negative permits");
46+
WorkflowInternal.await(
47+
"WorkflowSemaphore.acquire",
48+
() -> {
49+
CancellationScope.throwCanceled();
50+
return currentPermits >= permits;
51+
});
52+
currentPermits -= permits;
53+
}
54+
55+
@Override
56+
public boolean tryAcquire() {
57+
return tryAcquire(1);
58+
}
59+
60+
@Override
61+
public boolean tryAcquire(Duration timeout) {
62+
return tryAcquire(1, timeout);
63+
}
64+
65+
@Override
66+
public boolean tryAcquire(int permits) {
67+
assertNotReadOnly("WorkflowSemaphore.tryAcquire");
68+
Preconditions.checkArgument(
69+
permits >= 0, "WorkflowSemaphore.tryAcquire called with negative permits");
70+
if (currentPermits >= permits) {
71+
currentPermits -= permits;
72+
return true;
73+
}
74+
return false;
75+
}
76+
77+
@Override
78+
public boolean tryAcquire(int permits, Duration timeout) {
79+
Preconditions.checkArgument(
80+
permits >= 0, "WorkflowSemaphore.tryAcquire called with negative permits");
81+
boolean acquired =
82+
WorkflowInternal.await(
83+
timeout,
84+
"WorkflowSemaphore.tryAcquire",
85+
() -> {
86+
CancellationScope.throwCanceled();
87+
return currentPermits >= permits;
88+
});
89+
if (acquired) {
90+
currentPermits -= permits;
91+
}
92+
return acquired;
93+
}
94+
95+
@Override
96+
public void release() {
97+
release(1);
98+
}
99+
100+
@Override
101+
public void release(int permits) {
102+
assertNotReadOnly("WorkflowSemaphore.release");
103+
Preconditions.checkArgument(
104+
permits >= 0, "WorkflowSemaphore.release called with negative permits");
105+
currentPermits += permits;
106+
}
107+
}

temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -507,6 +507,27 @@ public static <E> Promise<E> newFailedPromise(Exception failure) {
507507
return WorkflowInternal.newFailedPromise(failure);
508508
}
509509

510+
/**
511+
* Creates a {@link WorkflowLock} implementation that can be used from workflow code.
512+
*
513+
* @apiNote The lock returned is not reentrant. If a workflow thread tries to acquire a lock that
514+
* it already holds, the call will block indefinitely.
515+
* @return new instance of {@link WorkflowLock}
516+
*/
517+
public static WorkflowLock newWorkflowLock() {
518+
return WorkflowInternal.newWorkflowLock();
519+
}
520+
521+
/**
522+
* Creates a {@link WorkflowSemaphore} implementation that can be used from workflow code.
523+
*
524+
* @param permits the given number of permits for the semaphore.
525+
* @return new instance of {@link WorkflowSemaphore}
526+
*/
527+
public static WorkflowSemaphore newWorkflowSemaphore(int permits) {
528+
return WorkflowInternal.newWorkflowSemaphore(permits);
529+
}
530+
510531
/**
511532
* Registers an implementation object. The object must implement at least one interface annotated
512533
* with {@link WorkflowInterface}. All its methods annotated with @{@link SignalMethod}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.workflow;
22+
23+
import java.time.Duration;
24+
25+
/**
26+
* Workflow lock is an alternative to {@link java.util.concurrent.locks.Lock} that is deterministic
27+
* and compatible with Temporal's concurrency model. API is designed to be used in a workflow code
28+
* only. It is not allowed to be used in an activity code.
29+
*
30+
* <p>In Temporal concurrency model, only one thread in a workflow code can execute at a time.
31+
*/
32+
public interface WorkflowLock {
33+
/**
34+
* Acquires the lock.
35+
*
36+
* @throws io.temporal.failure.CanceledFailure if thread (or current {@link CancellationScope} was
37+
* canceled).
38+
*/
39+
void lock();
40+
41+
/**
42+
* Acquires the lock only if it is free at the time of invocation.
43+
*
44+
* @return true if the lock was acquired and false otherwise
45+
*/
46+
boolean tryLock();
47+
48+
/**
49+
* Acquires the lock if it is free within the given waiting time.
50+
*
51+
* @throws io.temporal.failure.CanceledFailure if thread (or current {@link CancellationScope} was
52+
* canceled).
53+
* @return true if the lock was acquired and false if the waiting time elapsed before the lock was
54+
* acquired.
55+
*/
56+
boolean tryLock(Duration timeout);
57+
58+
/** Releases the lock. */
59+
void unlock();
60+
61+
/**
62+
* Checks if a lock is held.
63+
*
64+
* @return true if the lock is held and false otherwise.
65+
*/
66+
boolean isHeld();
67+
}

0 commit comments

Comments
 (0)