Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Utilities for working with {@link DoFnSignature}. See {@link #getSignature}. */
@Internal
Expand All @@ -113,6 +115,8 @@
})
public class DoFnSignatures {

private static final Logger LOG = LoggerFactory.getLogger(DoFnSignatures.class);

private DoFnSignatures() {}

/**
Expand Down Expand Up @@ -2327,12 +2331,59 @@ private static Map<String, DoFnSignature.StateDeclaration> analyzeStateDeclarati
(TypeDescriptor<? extends State>)
TypeDescriptor.of(fnClazz).resolveType(unresolvedStateType);

// Warn if ValueState contains a collection type that could benefit from specialized state
warnIfValueStateContainsCollection(fnClazz, id, stateType);

declarations.put(id, DoFnSignature.StateDeclaration.create(id, field, stateType));
}

return ImmutableMap.copyOf(declarations);
}

/**
* Warns if a ValueState is declared with a collection type (Map, List, Set) that could benefit
* from using specialized state types (MapState, BagState, SetState) for better performance.
*/
private static void warnIfValueStateContainsCollection(
Class<?> fnClazz, String stateId, TypeDescriptor<? extends State> stateType) {
if (!stateType.isSubtypeOf(TypeDescriptor.of(ValueState.class))) {
return;
}

// Use TypeDescriptor.resolveType() to extract ValueState's type parameter
// This preserves generic type information better than raw Type manipulation
TypeDescriptor<?> valueTypeDescriptor =
stateType.resolveType(ValueState.class.getTypeParameters()[0]);

// Skip if the type has unresolved parameters (e.g., TypeVariable, WildcardType)
if (valueTypeDescriptor.hasUnresolvedParameters()) {
return;
}

// Use TypeDescriptor.isSubtypeOf() for type checking - stays in TypeDescriptor API
String recommendation = null;
if (valueTypeDescriptor.isSubtypeOf(TypeDescriptor.of(Map.class))) {
recommendation = "MapState";
} else if (valueTypeDescriptor.isSubtypeOf(TypeDescriptor.of(List.class))) {
recommendation = "BagState or OrderedListState";
} else if (valueTypeDescriptor.isSubtypeOf(TypeDescriptor.of(java.util.Set.class))) {
recommendation = "SetState";
}

if (recommendation != null) {
LOG.info(
"DoFn {} declares ValueState '{}' with collection type {}. "
+ "ValueState reads/writes the entire collection on each access. "
+ "This is appropriate for small collections or atomic replacement. "
+ "For large collections or frequent appends, consider using {} instead "
+ "(if supported by your runner).",
fnClazz.getSimpleName(),
stateId,
valueTypeDescriptor.getRawType().getSimpleName(),
recommendation);
}
}

private static @Nullable Method findAnnotatedMethod(
ErrorReporter errors, Class<? extends Annotation> anno, Class<?> fnClazz, boolean required) {
Collection<Method> matches =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1700,4 +1700,65 @@ public void onMyTimer() {}
@Override
public void processWithTimer(ProcessContext context, Timer timer) {}
}

// Test DoFns for ValueState collection warning tests
private static class DoFnWithMapValueState extends DoFn<String, String> {
@StateId("mapState")
private final StateSpec<ValueState<java.util.Map<String, String>>> mapState =
StateSpecs.value();

@ProcessElement
public void process() {}
}

private static class DoFnWithListValueState extends DoFn<String, String> {
@StateId("listState")
private final StateSpec<ValueState<java.util.List<String>>> listState = StateSpecs.value();

@ProcessElement
public void process() {}
}

private static class DoFnWithSetValueState extends DoFn<String, String> {
@StateId("setState")
private final StateSpec<ValueState<java.util.Set<String>>> setState = StateSpecs.value();

@ProcessElement
public void process() {}
}

private static class DoFnWithSimpleValueState extends DoFn<String, String> {
@StateId("simpleState")
private final StateSpec<ValueState<String>> simpleState = StateSpecs.value();

@ProcessElement
public void process() {}
}

@Test
public void testValueStateWithMapLogsWarning() {
// This test verifies that the signature can be parsed for DoFns with collection ValueState.
// The warning is logged but doesn't prevent the signature from being created.
DoFnSignature signature = DoFnSignatures.getSignature(DoFnWithMapValueState.class);
assertThat(signature.stateDeclarations().get("mapState"), notNullValue());
}

@Test
public void testValueStateWithListLogsWarning() {
DoFnSignature signature = DoFnSignatures.getSignature(DoFnWithListValueState.class);
assertThat(signature.stateDeclarations().get("listState"), notNullValue());
}

@Test
public void testValueStateWithSetLogsWarning() {
DoFnSignature signature = DoFnSignatures.getSignature(DoFnWithSetValueState.class);
assertThat(signature.stateDeclarations().get("setState"), notNullValue());
}

@Test
public void testValueStateWithSimpleTypeNoWarning() {
// Simple types should not trigger any warning
DoFnSignature signature = DoFnSignatures.getSignature(DoFnWithSimpleValueState.class);
assertThat(signature.stateDeclarations().get("simpleState"), notNullValue());
}
}
Loading