Skip to content

Commit ecc840c

Browse files
authored
[Java SDK] Fix propagation of metrics set during onTimer processing. (#36576)
Fixes #29099. Added unit test that verifies ProcessBundleHandler plumbing in addition to the existing FnApiRunner test which is at a lower level.
1 parent 949c87f commit ecc840c

File tree

3 files changed

+215
-27
lines changed

3 files changed

+215
-27
lines changed

sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -445,8 +445,20 @@ public <T> void addIncomingTimerEndpoint(
445445
String timerFamilyId,
446446
org.apache.beam.sdk.coders.Coder<Timer<T>> coder,
447447
FnDataReceiver<Timer<T>> receiver) {
448+
ExecutionStateSampler.ExecutionState executionState =
449+
pCollectionConsumerRegistry.getProcessingExecutionState(
450+
pTransformId, pTransform.getUniqueName());
451+
FnDataReceiver<Timer<T>> wrappedReceiver =
452+
(Timer<T> timer) -> {
453+
executionState.activate();
454+
try {
455+
receiver.accept(timer);
456+
} finally {
457+
executionState.deactivate();
458+
}
459+
};
448460
addTimerEndpoint.accept(
449-
TimerEndpoint.create(pTransformId, timerFamilyId, coder, receiver));
461+
TimerEndpoint.create(pTransformId, timerFamilyId, coder, wrappedReceiver));
450462
}
451463

452464
@Override

sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java

Lines changed: 53 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,23 @@ public static ConsumerAndMetadata forConsumer(
9292
public abstract ExecutionStateTracker getExecutionStateTracker();
9393
}
9494

95+
@AutoValue
96+
abstract static class ExecutionStateKey {
97+
public static ExecutionStateKey of(String pTransformId, String pTransformUniqueName) {
98+
return new AutoValue_PCollectionConsumerRegistry_ExecutionStateKey(
99+
pTransformId, pTransformUniqueName);
100+
}
101+
102+
public abstract String getPTransformId();
103+
104+
public abstract String getPTransformUniqueId();
105+
}
106+
95107
private final ExecutionStateTracker stateTracker;
96108
private final ShortIdMap shortIdMap;
97-
private final Map<String, List<ConsumerAndMetadata>> pCollectionIdsToConsumers;
98-
private final Map<String, FnDataReceiver> pCollectionIdsToWrappedConsumer;
109+
private final Map<String, List<ConsumerAndMetadata>> pCollectionIdsToConsumers = new HashMap<>();
110+
private final Map<String, FnDataReceiver> pCollectionIdsToWrappedConsumer = new HashMap<>();
111+
private final Map<ExecutionStateKey, ExecutionState> executionStates = new HashMap<>();
99112
private final BundleProgressReporter.Registrar bundleProgressReporterRegistrar;
100113
private final ProcessBundleDescriptor processBundleDescriptor;
101114
private final RehydratedComponents rehydratedComponents;
@@ -118,8 +131,6 @@ public PCollectionConsumerRegistry(
118131
@Nullable DataSampler dataSampler) {
119132
this.stateTracker = stateTracker;
120133
this.shortIdMap = shortIdMap;
121-
this.pCollectionIdsToConsumers = new HashMap<>();
122-
this.pCollectionIdsToWrappedConsumer = new HashMap<>();
123134
this.bundleProgressReporterRegistrar = bundleProgressReporterRegistrar;
124135
this.processBundleDescriptor = processBundleDescriptor;
125136
this.rehydratedComponents =
@@ -162,31 +173,14 @@ public <T> void register(
162173
+ "calling getMultiplexingConsumer.");
163174
}
164175

165-
SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
166-
builder.setUrn(MonitoringInfoConstants.Urns.PROCESS_BUNDLE_MSECS);
167-
builder.setType(MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE);
168-
builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, pTransformId);
169-
MonitoringInfo mi = builder.build();
170-
if (mi == null) {
171-
throw new IllegalStateException(
172-
String.format(
173-
"Unable to construct %s counter for PTransform {id=%s, name=%s}",
174-
MonitoringInfoConstants.Urns.PROCESS_BUNDLE_MSECS,
175-
pTransformId,
176-
pTransformUniqueName));
177-
}
178-
String shortId = shortIdMap.getOrCreateShortId(mi);
179-
ExecutionState executionState =
180-
stateTracker.create(
181-
shortId,
182-
pTransformId,
183-
pTransformUniqueName,
184-
org.apache.beam.runners.core.metrics.ExecutionStateTracker.PROCESS_STATE_NAME);
185-
186176
List<ConsumerAndMetadata> consumerAndMetadatas =
187177
pCollectionIdsToConsumers.computeIfAbsent(pCollectionId, (unused) -> new ArrayList<>());
188178
consumerAndMetadatas.add(
189-
ConsumerAndMetadata.forConsumer(consumer, pTransformId, executionState, stateTracker));
179+
ConsumerAndMetadata.forConsumer(
180+
consumer,
181+
pTransformId,
182+
getProcessingExecutionState(pTransformId, pTransformUniqueName),
183+
stateTracker));
190184
}
191185

192186
/**
@@ -246,6 +240,39 @@ public FnDataReceiver<WindowedValue<?>> getMultiplexingConsumer(String pCollecti
246240
});
247241
}
248242

243+
/**
244+
* Returns a shared ExecutionState for tracking the process of the given transform.
245+
*
246+
* @return A {@link ExecutionState} which should be only activated/deactivated on the processing
247+
* thread for the bundle.
248+
*/
249+
public ExecutionState getProcessingExecutionState(
250+
String pTransformId, String pTransformUniqueName) {
251+
return executionStates.computeIfAbsent(
252+
ExecutionStateKey.of(pTransformId, pTransformUniqueName),
253+
(key) -> {
254+
SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
255+
builder.setUrn(MonitoringInfoConstants.Urns.PROCESS_BUNDLE_MSECS);
256+
builder.setType(MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE);
257+
builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, key.getPTransformId());
258+
MonitoringInfo mi = builder.build();
259+
if (mi == null) {
260+
throw new IllegalStateException(
261+
String.format(
262+
"Unable to construct %s counter for PTransform {id=%s, name=%s}",
263+
MonitoringInfoConstants.Urns.PROCESS_BUNDLE_MSECS,
264+
key.getPTransformId(),
265+
key.getPTransformUniqueId()));
266+
}
267+
String shortId = shortIdMap.getOrCreateShortId(mi);
268+
return stateTracker.create(
269+
shortId,
270+
key.getPTransformId(),
271+
key.getPTransformUniqueId(),
272+
org.apache.beam.runners.core.metrics.ExecutionStateTracker.PROCESS_STATE_NAME);
273+
});
274+
}
275+
249276
private static <T> void logAndRethrow(
250277
Exception e,
251278
ExecutionState executionState,

sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,13 @@
2121
import static org.apache.beam.fn.harness.control.ProcessBundleHandler.REGISTERED_RUNNER_FACTORIES;
2222
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
2323
import static org.hamcrest.MatcherAssert.assertThat;
24+
import static org.hamcrest.Matchers.allOf;
2425
import static org.hamcrest.Matchers.contains;
2526
import static org.hamcrest.Matchers.containsInAnyOrder;
2627
import static org.hamcrest.Matchers.emptyIterable;
2728
import static org.hamcrest.Matchers.equalTo;
29+
import static org.hamcrest.Matchers.hasEntry;
30+
import static org.hamcrest.Matchers.hasProperty;
2831
import static org.hamcrest.Matchers.is;
2932
import static org.hamcrest.collection.IsEmptyCollection.empty;
3033
import static org.junit.Assert.assertEquals;
@@ -94,6 +97,7 @@
9497
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
9598
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
9699
import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
100+
import org.apache.beam.model.pipeline.v1.MetricsApi;
97101
import org.apache.beam.model.pipeline.v1.RunnerApi;
98102
import org.apache.beam.model.pipeline.v1.RunnerApi.AccumulationMode;
99103
import org.apache.beam.model.pipeline.v1.RunnerApi.ClosingBehavior;
@@ -120,6 +124,8 @@
120124
import org.apache.beam.sdk.fn.test.TestExecutors;
121125
import org.apache.beam.sdk.fn.test.TestExecutors.TestExecutorService;
122126
import org.apache.beam.sdk.function.ThrowingRunnable;
127+
import org.apache.beam.sdk.metrics.Counter;
128+
import org.apache.beam.sdk.metrics.Metrics;
123129
import org.apache.beam.sdk.metrics.MetricsEnvironment;
124130
import org.apache.beam.sdk.options.PipelineOptionsFactory;
125131
import org.apache.beam.sdk.state.TimeDomain;
@@ -935,6 +941,19 @@ public void testPTransformStartExceptionsArePropagated() {
935941
private static final class SimpleDoFn extends DoFn<KV<String, String>, String> {
936942
private static final TupleTag<String> MAIN_OUTPUT_TAG = new TupleTag<>("mainOutput");
937943
private static final String TIMER_FAMILY_ID = "timer_family";
944+
private final Counter timersFired = Metrics.counter(SimpleDoFn.class, "timersFired");
945+
private final Counter bundlesStarted = Metrics.counter(SimpleDoFn.class, "bundlesStarted");
946+
private final Counter bundlesFinished = Metrics.counter(SimpleDoFn.class, "bundlesFinished");
947+
948+
@StartBundle
949+
public void startBundle() {
950+
bundlesStarted.inc();
951+
}
952+
953+
@FinishBundle
954+
public void finishBundle() {
955+
bundlesFinished.inc();
956+
}
938957

939958
@TimerFamily(TIMER_FAMILY_ID)
940959
private final TimerSpec timer = TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
@@ -944,6 +963,7 @@ public void processElement(ProcessContext context, BoundedWindow window) {}
944963

945964
@OnTimerFamily(TIMER_FAMILY_ID)
946965
public void onTimer(@TimerFamily(TIMER_FAMILY_ID) TimerMap timerFamily) {
966+
timersFired.inc();
947967
timerFamily
948968
.get("output_timer")
949969
.withOutputTimestamp(Instant.ofEpochMilli(100L))
@@ -1926,6 +1946,135 @@ public void testTimerRegistrationsFailIfNoTimerApiServiceDescriptorSpecified() t
19261946
.build()));
19271947
}
19281948

1949+
@Test
1950+
public void testTimerMetrics() throws Exception {
1951+
List<String> dataOutput = new ArrayList<>();
1952+
List<Timers> timerOutput = new ArrayList<>();
1953+
ProcessBundleHandler handler =
1954+
setupProcessBundleHandlerForSimpleRecordingDoFn(dataOutput, timerOutput, false);
1955+
1956+
ByteStringOutputStream encodedData = new ByteStringOutputStream();
1957+
KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()).encode(KV.of("", "data"), encodedData);
1958+
ByteStringOutputStream encodedTimer = new ByteStringOutputStream();
1959+
Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE)
1960+
.encode(
1961+
Timer.of(
1962+
"",
1963+
"timer_id",
1964+
Collections.singletonList(GlobalWindow.INSTANCE),
1965+
Instant.ofEpochMilli(1L),
1966+
Instant.ofEpochMilli(1L),
1967+
PaneInfo.ON_TIME_AND_ONLY_FIRING),
1968+
encodedTimer);
1969+
Elements elements =
1970+
Elements.newBuilder()
1971+
.addData(
1972+
Data.newBuilder().setInstructionId("998L").setTransformId("2L").setIsLast(true))
1973+
.addTimers(
1974+
Timers.newBuilder()
1975+
.setInstructionId("998L")
1976+
.setTransformId("3L")
1977+
.setTimerFamilyId(TimerFamilyDeclaration.PREFIX + SimpleDoFn.TIMER_FAMILY_ID)
1978+
.setTimers(encodedTimer.toByteString()))
1979+
.addTimers(
1980+
Timers.newBuilder()
1981+
.setInstructionId("998L")
1982+
.setTransformId("3L")
1983+
.setTimerFamilyId(TimerFamilyDeclaration.PREFIX + SimpleDoFn.TIMER_FAMILY_ID)
1984+
.setIsLast(true))
1985+
.build();
1986+
InstructionResponse.Builder response =
1987+
handler.processBundle(
1988+
InstructionRequest.newBuilder()
1989+
.setInstructionId("998L")
1990+
.setProcessBundle(
1991+
ProcessBundleRequest.newBuilder()
1992+
.setProcessBundleDescriptorId("1L")
1993+
.setElements(elements))
1994+
.build());
1995+
handler.shutdown();
1996+
1997+
int timerCounterFound = 0;
1998+
for (MetricsApi.MonitoringInfo info : response.getProcessBundle().getMonitoringInfosList()) {
1999+
if (info.getLabelsOrDefault("NAME", "").equals("timersFired")) {
2000+
++timerCounterFound;
2001+
assertThat(
2002+
info,
2003+
allOf(
2004+
hasProperty("urn", equalTo("beam:metric:user:sum_int64:v1")),
2005+
hasProperty("type", equalTo("beam:metrics:sum_int64:v1")),
2006+
hasProperty("payload", equalTo(ByteString.copyFromUtf8("\001"))),
2007+
hasProperty(
2008+
"labels",
2009+
hasEntry(
2010+
equalTo("NAMESPACE"),
2011+
equalTo(
2012+
"org.apache.beam.fn.harness.control.ProcessBundleHandlerTest$SimpleDoFn"))),
2013+
hasProperty("labels", hasEntry(equalTo("PTRANSFORM"), equalTo("3L")))));
2014+
}
2015+
}
2016+
assertEquals(1, timerCounterFound);
2017+
}
2018+
2019+
@Test
2020+
public void testStartFinishBundleMetrics() throws Exception {
2021+
List<String> dataOutput = new ArrayList<>();
2022+
List<Timers> timerOutput = new ArrayList<>();
2023+
ProcessBundleHandler handler =
2024+
setupProcessBundleHandlerForSimpleRecordingDoFn(dataOutput, timerOutput, false);
2025+
2026+
ByteStringOutputStream encodedData = new ByteStringOutputStream();
2027+
KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()).encode(KV.of("", "data"), encodedData);
2028+
Elements elements =
2029+
Elements.newBuilder()
2030+
.addData(
2031+
Data.newBuilder().setInstructionId("998L").setTransformId("2L").setIsLast(true))
2032+
.addTimers(
2033+
Timers.newBuilder()
2034+
.setInstructionId("998L")
2035+
.setTransformId("3L")
2036+
.setTimerFamilyId(TimerFamilyDeclaration.PREFIX + SimpleDoFn.TIMER_FAMILY_ID)
2037+
.setIsLast(true))
2038+
.build();
2039+
InstructionResponse.Builder response =
2040+
handler.processBundle(
2041+
InstructionRequest.newBuilder()
2042+
.setInstructionId("998L")
2043+
.setProcessBundle(
2044+
ProcessBundleRequest.newBuilder()
2045+
.setProcessBundleDescriptorId("1L")
2046+
.setElements(elements))
2047+
.build());
2048+
handler.shutdown();
2049+
2050+
int startCounterFound = 0;
2051+
int finishCounterFound = 0;
2052+
for (MetricsApi.MonitoringInfo info : response.getProcessBundle().getMonitoringInfosList()) {
2053+
if (info.getLabelsOrDefault("NAME", "").equals("bundlesStarted")) {
2054+
++startCounterFound;
2055+
} else if (info.getLabelsOrDefault("NAME", "").equals("bundlesFinished")) {
2056+
++finishCounterFound;
2057+
} else {
2058+
continue;
2059+
}
2060+
assertThat(
2061+
info,
2062+
allOf(
2063+
hasProperty("urn", equalTo("beam:metric:user:sum_int64:v1")),
2064+
hasProperty("type", equalTo("beam:metrics:sum_int64:v1")),
2065+
hasProperty("payload", equalTo(ByteString.copyFromUtf8("\001"))),
2066+
hasProperty(
2067+
"labels",
2068+
hasEntry(
2069+
equalTo("NAMESPACE"),
2070+
equalTo(
2071+
"org.apache.beam.fn.harness.control.ProcessBundleHandlerTest$SimpleDoFn"))),
2072+
hasProperty("labels", hasEntry(equalTo("PTRANSFORM"), equalTo("3L")))));
2073+
}
2074+
assertEquals(1, startCounterFound);
2075+
assertEquals(1, finishCounterFound);
2076+
}
2077+
19292078
private static void throwException() {
19302079
throw new IllegalStateException("TestException");
19312080
}

0 commit comments

Comments
 (0)