Skip to content

Commit b976b51

Browse files
authored
Change ExecutionStateSampler.ExecutionState to support a scopedActivate method. From benchmarks this is no additional overhead and it is easier to use. (#36646)
1 parent 8c62649 commit b976b51

File tree

7 files changed

+114
-88
lines changed

7 files changed

+114
-88
lines changed

sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/control/ExecutionStateSamplerBenchmark.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,23 @@ public void testTinyBundleHarnessStateSampler(HarnessStateTracker state, Blackho
169169
state.tracker.reset();
170170
}
171171

172+
@Benchmark
173+
@Threads(512)
174+
public void testTinyBundleHarnessStateSamplerScoped(HarnessStateTracker state, Blackhole bh)
175+
throws Exception {
176+
state.tracker.start("processBundleId");
177+
for (int i = 0; i < 3; ) {
178+
try (AutoCloseable s1 = state.state1.scopedActivate();
179+
AutoCloseable s2 = state.state2.scopedActivate();
180+
AutoCloseable s3 = state.state3.scopedActivate()) {
181+
// trivial code that is being sampled for this state
182+
i += 1;
183+
bh.consume(i);
184+
}
185+
}
186+
state.tracker.reset();
187+
}
188+
172189
@Benchmark
173190
@Threads(16)
174191
public void testLargeBundleRunnersCoreStateSampler(

sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,16 @@ public ExecutionStateSampler(
131131
/** An {@link ExecutionState} represents the current state of an execution thread. */
132132
public interface ExecutionState {
133133

134+
interface ActiveState extends AutoCloseable {}
135+
136+
/**
137+
* Activates this execution state within the {@link ExecutionStateTracker}. The returned
138+
* closable will restore the previously active execution state.
139+
*
140+
* <p>Must only be invoked by the bundle processing thread.
141+
*/
142+
ActiveState scopedActivate();
143+
134144
/**
135145
* Activates this execution state within the {@link ExecutionStateTracker}.
136146
*
@@ -527,6 +537,9 @@ private class ExecutionStateImpl implements ExecutionState {
527537
// Read and written by the bundle processing thread frequently.
528538
private @Nullable ExecutionStateImpl previousState;
529539

540+
@SuppressWarnings("methodref")
541+
private final ActiveState activeState = this::deactivate;
542+
530543
private ExecutionStateImpl(
531544
String shortId,
532545
String ptransformId,
@@ -581,6 +594,12 @@ public void activate() {
581594
numTransitionsLazy.lazySet(numTransitions);
582595
}
583596

597+
@Override
598+
public ActiveState scopedActivate() {
599+
activate();
600+
return activeState;
601+
}
602+
584603
@Override
585604
public void deactivate() {
586605
currentState = previousState;

sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -450,11 +450,8 @@ public <T> void addIncomingTimerEndpoint(
450450
pTransformId, pTransform.getUniqueName());
451451
FnDataReceiver<Timer<T>> wrappedReceiver =
452452
(Timer<T> timer) -> {
453-
executionState.activate();
454-
try {
453+
try (AutoCloseable ignored = executionState.scopedActivate()) {
455454
receiver.accept(timer);
456-
} finally {
457-
executionState.deactivate();
458455
}
459456
};
460457
addTimerEndpoint.accept(

sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -371,14 +371,11 @@ public void accept(WindowedValue<T> input) throws Exception {
371371
// Use the ExecutionStateTracker and enter an appropriate state to track the
372372
// Process Bundle Execution time metric and also ensure user counters can get an appropriate
373373
// metrics container.
374-
executionState.activate();
375-
try {
374+
try (ExecutionState.ActiveState a = executionState.scopedActivate()) {
376375
this.delegate.accept(input);
377376
} catch (Exception e) {
378377
logAndRethrow(
379378
e, executionState, executionStateTracker, ptransformId, outputSampler, elementSample);
380-
} finally {
381-
executionState.deactivate();
382379
}
383380
this.sampledByteSizeDistribution.finishLazyUpdate();
384381
}
@@ -461,8 +458,7 @@ public void accept(WindowedValue<T> input) throws Exception {
461458
for (int size = consumerAndMetadatas.size(), i = 0; i < size; ++i) {
462459
ConsumerAndMetadata consumerAndMetadata = consumerAndMetadatas.get(i);
463460
ExecutionState state = consumerAndMetadata.getExecutionState();
464-
state.activate();
465-
try {
461+
try (ExecutionState.ActiveState a = state.scopedActivate()) {
466462
consumerAndMetadata.getConsumer().accept(input);
467463
} catch (Exception e) {
468464
logAndRethrow(
@@ -472,8 +468,6 @@ public void accept(WindowedValue<T> input) throws Exception {
472468
consumerAndMetadata.getPTransformId(),
473469
outputSampler,
474470
elementSample);
475-
} finally {
476-
state.deactivate();
477471
}
478472
this.sampledByteSizeDistribution.finishLazyUpdate();
479473
}

sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PTransformFunctionRegistry.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,11 +111,8 @@ public void register(
111111

112112
ThrowingRunnable wrapped =
113113
() -> {
114-
executionState.activate();
115-
try {
114+
try (ExecutionState.ActiveState ignored = executionState.scopedActivate()) {
116115
runnable.run();
117-
} finally {
118-
executionState.deactivate();
119116
}
120117
};
121118
runnables.add(wrapped);

sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ExecutionStateSamplerTest.java

Lines changed: 71 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -163,79 +163,81 @@ public Long answer(InvocationOnMock invocation) throws Throwable {
163163
tracker1.start("bundleId1");
164164
tracker2.start("bundleId2");
165165

166-
state1.activate();
167-
state2.activate();
168-
169-
// Check that the current threads PTransform id is available
170-
assertEquals("ptransformId1", tracker1.getCurrentThreadsPTransformId());
171-
assertEquals("ptransformId2", tracker2.getCurrentThreadsPTransformId());
172-
173-
// Check that the status returns a value as soon as it is activated.
174-
ExecutionStateTrackerStatus activeBundleStatus1 = tracker1.getStatus();
175-
ExecutionStateTrackerStatus activeBundleStatus2 = tracker2.getStatus();
176-
assertEquals("ptransformId1", activeBundleStatus1.getPTransformId());
177-
assertEquals("ptransformId2", activeBundleStatus2.getPTransformId());
178-
assertEquals("ptransformIdName1", activeBundleStatus1.getPTransformUniqueName());
179-
assertEquals("ptransformIdName2", activeBundleStatus2.getPTransformUniqueName());
180-
assertEquals(Thread.currentThread(), activeBundleStatus1.getTrackedThread());
181-
assertEquals(Thread.currentThread(), activeBundleStatus2.getTrackedThread());
182-
assertThat(activeBundleStatus1.getStartTime().getMillis(), equalTo(1L));
183-
assertThat(activeBundleStatus2.getStartTime().getMillis(), equalTo(1L));
184-
assertThat(
185-
activeBundleStatus1.getLastTransitionTime().getMillis(),
186-
// Because we are using lazySet, we aren't guaranteed to see the latest value
187-
// but we should definitely be seeing a value that isn't zero
188-
equalTo(1L));
189-
assertThat(
190-
activeBundleStatus2.getLastTransitionTime().getMillis(),
191-
// Internal implementation has this be equal to the second value we return (2 * 100L)
192-
equalTo(1L));
193-
194-
waitTillActive.countDown();
195-
waitForSamples.await();
166+
ExecutionStateTrackerStatus activeStateStatus1, activeStateStatus2;
167+
try (ExecutionState.ActiveState activeState = state1.scopedActivate()) {
168+
state2.activate();
169+
170+
// Check that the current threads PTransform id is available
171+
assertEquals("ptransformId1", tracker1.getCurrentThreadsPTransformId());
172+
assertEquals("ptransformId2", tracker2.getCurrentThreadsPTransformId());
173+
174+
// Check that the status returns a value as soon as it is activated.
175+
ExecutionStateTrackerStatus activeBundleStatus1 = tracker1.getStatus();
176+
ExecutionStateTrackerStatus activeBundleStatus2 = tracker2.getStatus();
177+
assertEquals("ptransformId1", activeBundleStatus1.getPTransformId());
178+
assertEquals("ptransformId2", activeBundleStatus2.getPTransformId());
179+
assertEquals("ptransformIdName1", activeBundleStatus1.getPTransformUniqueName());
180+
assertEquals("ptransformIdName2", activeBundleStatus2.getPTransformUniqueName());
181+
assertEquals(Thread.currentThread(), activeBundleStatus1.getTrackedThread());
182+
assertEquals(Thread.currentThread(), activeBundleStatus2.getTrackedThread());
183+
assertThat(activeBundleStatus1.getStartTime().getMillis(), equalTo(1L));
184+
assertThat(activeBundleStatus2.getStartTime().getMillis(), equalTo(1L));
185+
assertThat(
186+
activeBundleStatus1.getLastTransitionTime().getMillis(),
187+
// Because we are using lazySet, we aren't guaranteed to see the latest value
188+
// but we should definitely be seeing a value that isn't zero
189+
equalTo(1L));
190+
assertThat(
191+
activeBundleStatus2.getLastTransitionTime().getMillis(),
192+
// Internal implementation has this be equal to the second value we return (2 * 100L)
193+
equalTo(1L));
196194

197-
// Check that the current threads PTransform id is available
198-
assertEquals("ptransformId1", tracker1.getCurrentThreadsPTransformId());
199-
assertEquals("ptransformId2", tracker2.getCurrentThreadsPTransformId());
200-
201-
// Check that we get additional data about the active PTransform.
202-
ExecutionStateTrackerStatus activeStateStatus1 = tracker1.getStatus();
203-
ExecutionStateTrackerStatus activeStateStatus2 = tracker2.getStatus();
204-
assertEquals("ptransformId1", activeStateStatus1.getPTransformId());
205-
assertEquals("ptransformId2", activeStateStatus2.getPTransformId());
206-
assertEquals("ptransformIdName1", activeStateStatus1.getPTransformUniqueName());
207-
assertEquals("ptransformIdName2", activeStateStatus2.getPTransformUniqueName());
208-
assertEquals(Thread.currentThread(), activeStateStatus1.getTrackedThread());
209-
assertEquals(Thread.currentThread(), activeStateStatus2.getTrackedThread());
210-
assertThat(
211-
activeStateStatus1.getLastTransitionTime(),
212-
greaterThan(activeBundleStatus1.getLastTransitionTime()));
213-
assertThat(
214-
activeStateStatus2.getLastTransitionTime(),
215-
greaterThan(activeBundleStatus2.getLastTransitionTime()));
195+
waitTillActive.countDown();
196+
waitForSamples.await();
216197

217-
// Validate intermediate monitoring data
218-
Map<String, ByteString> intermediateResults1 = new HashMap<>();
219-
Map<String, ByteString> intermediateResults2 = new HashMap<>();
220-
tracker1.updateIntermediateMonitoringData(intermediateResults1);
221-
tracker2.updateIntermediateMonitoringData(intermediateResults2);
222-
assertThat(
223-
MonitoringInfoEncodings.decodeInt64Counter(intermediateResults1.get("shortId1")),
224-
// Because we are using lazySet, we aren't guaranteed to see the latest value.
225-
// The CountDownLatch ensures that we will see either the prior value or
226-
// the latest value.
227-
anyOf(equalTo(900L), equalTo(1000L)));
228-
assertThat(
229-
MonitoringInfoEncodings.decodeInt64Counter(intermediateResults2.get("shortId2")),
230-
// Because we are using lazySet, we aren't guaranteed to see the latest value.
231-
// The CountDownLatch ensures that we will see either the prior value or
232-
// the latest value.
233-
anyOf(equalTo(900L), equalTo(1000L)));
198+
// Check that the current threads PTransform id is available
199+
assertEquals("ptransformId1", tracker1.getCurrentThreadsPTransformId());
200+
assertEquals("ptransformId2", tracker2.getCurrentThreadsPTransformId());
201+
202+
// Check that we get additional data about the active PTransform.
203+
activeStateStatus1 = tracker1.getStatus();
204+
activeStateStatus2 = tracker2.getStatus();
205+
assertEquals("ptransformId1", activeStateStatus1.getPTransformId());
206+
assertEquals("ptransformId2", activeStateStatus2.getPTransformId());
207+
assertEquals("ptransformIdName1", activeStateStatus1.getPTransformUniqueName());
208+
assertEquals("ptransformIdName2", activeStateStatus2.getPTransformUniqueName());
209+
assertEquals(Thread.currentThread(), activeStateStatus1.getTrackedThread());
210+
assertEquals(Thread.currentThread(), activeStateStatus2.getTrackedThread());
211+
assertThat(
212+
activeStateStatus1.getLastTransitionTime(),
213+
greaterThan(activeBundleStatus1.getLastTransitionTime()));
214+
assertThat(
215+
activeStateStatus2.getLastTransitionTime(),
216+
greaterThan(activeBundleStatus2.getLastTransitionTime()));
217+
218+
// Validate intermediate monitoring data
219+
Map<String, ByteString> intermediateResults1 = new HashMap<>();
220+
Map<String, ByteString> intermediateResults2 = new HashMap<>();
221+
tracker1.updateIntermediateMonitoringData(intermediateResults1);
222+
tracker2.updateIntermediateMonitoringData(intermediateResults2);
223+
assertThat(
224+
MonitoringInfoEncodings.decodeInt64Counter(intermediateResults1.get("shortId1")),
225+
// Because we are using lazySet, we aren't guaranteed to see the latest value.
226+
// The CountDownLatch ensures that we will see either the prior value or
227+
// the latest value.
228+
anyOf(equalTo(900L), equalTo(1000L)));
229+
assertThat(
230+
MonitoringInfoEncodings.decodeInt64Counter(intermediateResults2.get("shortId2")),
231+
// Because we are using lazySet, we aren't guaranteed to see the latest value.
232+
// The CountDownLatch ensures that we will see either the prior value or
233+
// the latest value.
234+
anyOf(equalTo(900L), equalTo(1000L)));
234235

235-
waitTillIntermediateReport.countDown();
236-
waitForMoreSamples.await();
236+
waitTillIntermediateReport.countDown();
237+
waitForMoreSamples.await();
238+
state2.deactivate();
239+
}
237240
state1.deactivate();
238-
state2.deactivate();
239241

240242
waitTillStatesDeactivated.countDown();
241243
waitForEvenMoreSamples.await();

sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -206,9 +206,9 @@ public StreamObserver<BeamFnApi.LogEntry.List> logging(
206206
// from.
207207
ExecutionStateSampler.ExecutionState errorState =
208208
stateTracker.create("shortId", "errorPtransformId", "errorPtransformIdName", "process");
209-
errorState.activate();
210-
configuredLogger.log(TEST_RECORD_WITH_EXCEPTION);
211-
errorState.deactivate();
209+
try (AutoCloseable activeState = errorState.scopedActivate()) {
210+
configuredLogger.log(TEST_RECORD_WITH_EXCEPTION);
211+
}
212212

213213
// Ensure that configuring a custom formatter on the logging handler will be honored.
214214
for (Handler handler : rootLogger.getHandlers()) {

0 commit comments

Comments
 (0)