Skip to content

Commit 94336fa

Browse files
authored
Make RestrictionTrackers.getProgress unblocking until initial progress successfully returned (#36750)
* Make RestrictionTrackers.getProgress unblocking * comments * address comments - add 1 min blocking time * Add log * only change behavior when initial progress never evaluated * simplify tests * changed to waitUntilBlocking
1 parent 6a82448 commit 94336fa

File tree

2 files changed

+165
-19
lines changed

2 files changed

+165
-19
lines changed

sdks/java/core/src/main/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackers.java

Lines changed: 78 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@
1717
*/
1818
package org.apache.beam.sdk.fn.splittabledofn;
1919

20+
import java.util.concurrent.TimeUnit;
21+
import java.util.concurrent.locks.ReentrantLock;
2022
import javax.annotation.concurrent.ThreadSafe;
2123
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
2224
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
2325
import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
26+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
2427

2528
/** Support utilities for interacting with {@link RestrictionTracker RestrictionTrackers}. */
2629
@SuppressWarnings({
@@ -45,6 +48,8 @@ public interface ClaimObserver<PositionT> {
4548
private static class RestrictionTrackerObserver<RestrictionT, PositionT>
4649
extends RestrictionTracker<RestrictionT, PositionT> {
4750
protected final RestrictionTracker<RestrictionT, PositionT> delegate;
51+
protected ReentrantLock lock = new ReentrantLock();
52+
protected volatile boolean hasInitialProgress = false;
4853
private final ClaimObserver<PositionT> claimObserver;
4954

5055
protected RestrictionTrackerObserver(
@@ -55,44 +60,76 @@ protected RestrictionTrackerObserver(
5560
}
5661

5762
@Override
58-
public synchronized boolean tryClaim(PositionT position) {
59-
if (delegate.tryClaim(position)) {
60-
claimObserver.onClaimed(position);
61-
return true;
62-
} else {
63-
claimObserver.onClaimFailed(position);
64-
return false;
63+
public boolean tryClaim(PositionT position) {
64+
lock.lock();
65+
try {
66+
if (delegate.tryClaim(position)) {
67+
claimObserver.onClaimed(position);
68+
return true;
69+
} else {
70+
claimObserver.onClaimFailed(position);
71+
return false;
72+
}
73+
} finally {
74+
lock.unlock();
6575
}
6676
}
6777

6878
@Override
69-
public synchronized RestrictionT currentRestriction() {
70-
return delegate.currentRestriction();
79+
public RestrictionT currentRestriction() {
80+
lock.lock();
81+
try {
82+
return delegate.currentRestriction();
83+
} finally {
84+
lock.unlock();
85+
}
7186
}
7287

7388
@Override
74-
public synchronized SplitResult<RestrictionT> trySplit(double fractionOfRemainder) {
75-
return delegate.trySplit(fractionOfRemainder);
89+
public SplitResult<RestrictionT> trySplit(double fractionOfRemainder) {
90+
lock.lock();
91+
try {
92+
SplitResult<RestrictionT> result = delegate.trySplit(fractionOfRemainder);
93+
return result;
94+
} finally {
95+
lock.unlock();
96+
}
7697
}
7798

7899
@Override
79-
public synchronized void checkDone() throws IllegalStateException {
80-
delegate.checkDone();
100+
public void checkDone() throws IllegalStateException {
101+
lock.lock();
102+
try {
103+
delegate.checkDone();
104+
} finally {
105+
lock.unlock();
106+
}
81107
}
82108

83109
@Override
84110
public IsBounded isBounded() {
85111
return delegate.isBounded();
86112
}
113+
114+
/** Evaluate progress if requested. */
115+
protected Progress getProgressBlocking() {
116+
lock.lock();
117+
try {
118+
return ((HasProgress) delegate).getProgress();
119+
} finally {
120+
lock.unlock();
121+
}
122+
}
87123
}
88124

89125
/**
90126
* A {@link RestrictionTracker} which forwards all calls to the delegate progress reporting {@link
91127
* RestrictionTracker}.
92128
*/
93129
@ThreadSafe
94-
private static class RestrictionTrackerObserverWithProgress<RestrictionT, PositionT>
130+
static class RestrictionTrackerObserverWithProgress<RestrictionT, PositionT>
95131
extends RestrictionTrackerObserver<RestrictionT, PositionT> implements HasProgress {
132+
private static final int FIRST_PROGRESS_TIMEOUT_SEC = 60;
96133

97134
protected RestrictionTrackerObserverWithProgress(
98135
RestrictionTracker<RestrictionT, PositionT> delegate,
@@ -101,8 +138,33 @@ protected RestrictionTrackerObserverWithProgress(
101138
}
102139

103140
@Override
104-
public synchronized Progress getProgress() {
105-
return ((HasProgress) delegate).getProgress();
141+
public Progress getProgress() {
142+
return getProgress(FIRST_PROGRESS_TIMEOUT_SEC);
143+
}
144+
145+
@VisibleForTesting
146+
Progress getProgress(int timeOutSec) {
147+
if (!hasInitialProgress) {
148+
Progress progress = Progress.NONE;
149+
try {
150+
// lock can be held long by long-running tryClaim/trySplit. We tolerate this scenario
151+
// by returning zero progress when initial progress never evaluated before due to lock
152+
// timeout.
153+
if (lock.tryLock(timeOutSec, TimeUnit.SECONDS)) {
154+
try {
155+
progress = getProgressBlocking();
156+
hasInitialProgress = true;
157+
} finally {
158+
lock.unlock();
159+
}
160+
}
161+
} catch (InterruptedException e) {
162+
Thread.currentThread().interrupt();
163+
}
164+
return progress;
165+
} else {
166+
return getProgressBlocking();
167+
}
106168
}
107169
}
108170

sdks/java/core/src/test/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackersTest.java

Lines changed: 87 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,14 @@
2424

2525
import java.util.ArrayList;
2626
import java.util.List;
27+
import java.util.concurrent.TimeUnit;
2728
import org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers.ClaimObserver;
2829
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
2930
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
3031
import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
32+
import org.junit.Rule;
3133
import org.junit.Test;
34+
import org.junit.rules.Timeout;
3235
import org.junit.runner.RunWith;
3336
import org.junit.runners.JUnit4;
3437

@@ -38,6 +41,8 @@
3841
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
3942
})
4043
public class RestrictionTrackersTest {
44+
@Rule public Timeout timeout = new Timeout(1, TimeUnit.MINUTES);
45+
4146
@Test
4247
public void testObservingClaims() {
4348
RestrictionTracker<String, String> observedTracker =
@@ -95,14 +100,37 @@ public void onClaimFailed(String position) {
95100

96101
private static class RestrictionTrackerWithProgress extends RestrictionTracker<Object, Object>
97102
implements HasProgress {
103+
private boolean blockTryClaim;
104+
private boolean blockTrySplit;
105+
private boolean isBlocked;
106+
public static final Progress REPORT_PROGRESS = Progress.from(2.0, 3.0);
107+
108+
public RestrictionTrackerWithProgress() {
109+
this(false, false);
110+
}
111+
112+
public RestrictionTrackerWithProgress(boolean blockTryClaim, boolean blockTrySplit) {
113+
this.blockTryClaim = blockTryClaim;
114+
this.blockTrySplit = blockTrySplit;
115+
this.isBlocked = false;
116+
}
98117

99118
@Override
100119
public Progress getProgress() {
101-
return RestrictionTracker.Progress.from(2.0, 3.0);
120+
return REPORT_PROGRESS;
102121
}
103122

104123
@Override
105-
public boolean tryClaim(Object position) {
124+
public synchronized boolean tryClaim(Object position) {
125+
while (blockTryClaim) {
126+
isBlocked = true;
127+
try {
128+
wait();
129+
} catch (InterruptedException e) {
130+
Thread.currentThread().interrupt();
131+
}
132+
}
133+
isBlocked = false;
106134
return false;
107135
}
108136

@@ -112,7 +140,16 @@ public Object currentRestriction() {
112140
}
113141

114142
@Override
115-
public SplitResult<Object> trySplit(double fractionOfRemainder) {
143+
public synchronized SplitResult<Object> trySplit(double fractionOfRemainder) {
144+
while (blockTrySplit) {
145+
isBlocked = true;
146+
try {
147+
wait();
148+
} catch (InterruptedException e) {
149+
Thread.currentThread().interrupt();
150+
}
151+
}
152+
isBlocked = false;
116153
return null;
117154
}
118155

@@ -123,6 +160,19 @@ public void checkDone() throws IllegalStateException {}
123160
public IsBounded isBounded() {
124161
return IsBounded.BOUNDED;
125162
}
163+
164+
public synchronized void releaseLock() {
165+
blockTrySplit = false;
166+
blockTryClaim = false;
167+
notifyAll();
168+
}
169+
170+
/** Wait until RestrictionTracker becomes blocking or unblocking. */
171+
public void waitUntilBlocking(boolean blocking) throws InterruptedException {
172+
while (isBlocked != blocking) {
173+
Thread.sleep(1);
174+
}
175+
}
126176
}
127177

128178
@Test
@@ -131,4 +181,38 @@ public void testClaimObserversMaintainBacklogInterfaces() {
131181
RestrictionTrackers.observe(new RestrictionTrackerWithProgress(), null);
132182
assertThat(hasSize, instanceOf(HasProgress.class));
133183
}
184+
185+
@Test
186+
public void testClaimObserversProgressNonBlockingOnTryClaim() throws InterruptedException {
187+
RestrictionTrackerWithProgress withProgress = new RestrictionTrackerWithProgress(true, false);
188+
RestrictionTracker<Object, Object> tracker =
189+
RestrictionTrackers.observe(withProgress, new RestrictionTrackers.NoopClaimObserver<>());
190+
Thread blocking = new Thread(() -> tracker.tryClaim(new Object()));
191+
blocking.start();
192+
withProgress.waitUntilBlocking(true);
193+
RestrictionTracker.Progress progress =
194+
((RestrictionTrackers.RestrictionTrackerObserverWithProgress) tracker).getProgress(1);
195+
assertEquals(RestrictionTracker.Progress.NONE, progress);
196+
withProgress.releaseLock();
197+
withProgress.waitUntilBlocking(false);
198+
progress = ((HasProgress) tracker).getProgress();
199+
assertEquals(RestrictionTrackerWithProgress.REPORT_PROGRESS, progress);
200+
}
201+
202+
@Test
203+
public void testClaimObserversProgressNonBlockingOnTrySplit() throws InterruptedException {
204+
RestrictionTrackerWithProgress withProgress = new RestrictionTrackerWithProgress(false, true);
205+
RestrictionTracker<Object, Object> tracker =
206+
RestrictionTrackers.observe(withProgress, new RestrictionTrackers.NoopClaimObserver<>());
207+
Thread blocking = new Thread(() -> tracker.trySplit(0.5));
208+
blocking.start();
209+
withProgress.waitUntilBlocking(true);
210+
RestrictionTracker.Progress progress =
211+
((RestrictionTrackers.RestrictionTrackerObserverWithProgress) tracker).getProgress(1);
212+
assertEquals(RestrictionTracker.Progress.NONE, progress);
213+
withProgress.releaseLock();
214+
withProgress.waitUntilBlocking(false);
215+
progress = ((HasProgress) tracker).getProgress();
216+
assertEquals(RestrictionTrackerWithProgress.REPORT_PROGRESS, progress);
217+
}
134218
}

0 commit comments

Comments
 (0)