Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Utilities for working with {@link DoFnSignature}. See {@link #getSignature}. */
@Internal
Expand All @@ -113,6 +115,8 @@
})
public class DoFnSignatures {

private static final Logger LOG = LoggerFactory.getLogger(DoFnSignatures.class);

private DoFnSignatures() {}

/**
Expand Down Expand Up @@ -2327,12 +2331,77 @@ private static Map<String, DoFnSignature.StateDeclaration> analyzeStateDeclarati
(TypeDescriptor<? extends State>)
TypeDescriptor.of(fnClazz).resolveType(unresolvedStateType);

// Warn if ValueState contains a collection type that could benefit from specialized state
warnIfValueStateContainsCollection(fnClazz, id, stateType);

declarations.put(id, DoFnSignature.StateDeclaration.create(id, field, stateType));
}

return ImmutableMap.copyOf(declarations);
}

/**
* Warns if a ValueState is declared with a collection type (Map, List, Set) that could benefit
* from using specialized state types (MapState, BagState, SetState) for better performance.
*/
private static void warnIfValueStateContainsCollection(
Class<?> fnClazz, String stateId, TypeDescriptor<? extends State> stateType) {
if (!stateType.isSubtypeOf(TypeDescriptor.of(ValueState.class))) {
return;
}

try {
// Get the type directly and extract ValueState's type parameter
Type type = stateType.getType();
if (!(type instanceof ParameterizedType)) {
return;
}

// Find ValueState in the type hierarchy and get its type argument
Type valueType = null;
ParameterizedType pType = (ParameterizedType) type;
if (pType.getRawType() == ValueState.class) {
valueType = pType.getActualTypeArguments()[0];
} else {
// For subtypes of ValueState, we need to resolve the type parameter
return;
}

if (valueType == null
|| valueType instanceof java.lang.reflect.TypeVariable
|| valueType instanceof java.lang.reflect.WildcardType) {
// Cannot determine actual type, skip warning
return;
}

TypeDescriptor<?> valueTypeDescriptor = TypeDescriptor.of(valueType);
Class<?> rawType = valueTypeDescriptor.getRawType();

String recommendation = null;
if (Map.class.isAssignableFrom(rawType)) {
recommendation = "MapState";
} else if (List.class.isAssignableFrom(rawType)) {
recommendation = "BagState or OrderedListState";
} else if (java.util.Set.class.isAssignableFrom(rawType)) {
recommendation = "SetState";
}

if (recommendation != null) {
LOG.warn(
"DoFn {} declares ValueState '{}' with type {}. "
+ "Storing collections in ValueState requires reading and writing the entire "
+ "collection on each access, which can cause performance issues. "
+ "Consider using {} instead for better performance with large collections.",
fnClazz.getSimpleName(),
stateId,
rawType.getSimpleName(),
recommendation);
}
} catch (Exception e) {
// If we can't determine the type, don't warn - it's just an optimization hint
}
}

private static @Nullable Method findAnnotatedMethod(
ErrorReporter errors, Class<? extends Annotation> anno, Class<?> fnClazz, boolean required) {
Collection<Method> matches =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1700,4 +1700,65 @@ public void onMyTimer() {}
@Override
public void processWithTimer(ProcessContext context, Timer timer) {}
}

// Test DoFns for ValueState collection warning tests
private static class DoFnWithMapValueState extends DoFn<String, String> {
@StateId("mapState")
private final StateSpec<ValueState<java.util.Map<String, String>>> mapState =
StateSpecs.value();

@ProcessElement
public void process() {}
}

private static class DoFnWithListValueState extends DoFn<String, String> {
@StateId("listState")
private final StateSpec<ValueState<java.util.List<String>>> listState = StateSpecs.value();

@ProcessElement
public void process() {}
}

private static class DoFnWithSetValueState extends DoFn<String, String> {
@StateId("setState")
private final StateSpec<ValueState<java.util.Set<String>>> setState = StateSpecs.value();

@ProcessElement
public void process() {}
}

private static class DoFnWithSimpleValueState extends DoFn<String, String> {
@StateId("simpleState")
private final StateSpec<ValueState<String>> simpleState = StateSpecs.value();

@ProcessElement
public void process() {}
}

@Test
public void testValueStateWithMapLogsWarning() {
// This test verifies that the signature can be parsed for DoFns with collection ValueState.
// The warning is logged but doesn't prevent the signature from being created.
DoFnSignature signature = DoFnSignatures.getSignature(DoFnWithMapValueState.class);
assertThat(signature.stateDeclarations().get("mapState"), notNullValue());
}

@Test
public void testValueStateWithListLogsWarning() {
DoFnSignature signature = DoFnSignatures.getSignature(DoFnWithListValueState.class);
assertThat(signature.stateDeclarations().get("listState"), notNullValue());
}

@Test
public void testValueStateWithSetLogsWarning() {
DoFnSignature signature = DoFnSignatures.getSignature(DoFnWithSetValueState.class);
assertThat(signature.stateDeclarations().get("setState"), notNullValue());
}

@Test
public void testValueStateWithSimpleTypeNoWarning() {
// Simple types should not trigger any warning
DoFnSignature signature = DoFnSignatures.getSignature(DoFnWithSimpleValueState.class);
assertThat(signature.stateDeclarations().get("simpleState"), notNullValue());
}
}
Loading