diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java index 65084120f922..3ac7c8431797 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.checkerframework.dataflow.qual.Pure; import org.joda.time.Duration; import org.joda.time.Instant; @@ -41,6 +42,7 @@ private LateDataUtils() {} * Return when {@code window} should be garbage collected. If the window's expiration time is on * or after the end of the global window, it will be truncated to the end of the global window. */ + @Pure public static Instant garbageCollectionTime( BoundedWindow window, WindowingStrategy windowingStrategy) { return garbageCollectionTime(window, windowingStrategy.getAllowedLateness()); @@ -50,6 +52,7 @@ public static Instant garbageCollectionTime( * Return when {@code window} should be garbage collected. If the window's expiration time is on * or after the end of the global window, it will be truncated to the end of the global window. */ + @Pure public static Instant garbageCollectionTime(BoundedWindow window, Duration allowedLateness) { // If the end of the window + allowed lateness is beyond the "end of time" aka the end of the diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java index 767673959663..6f9f15b13589 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java @@ -387,12 +387,12 @@ public PaneInfo pane() { } @Override - public String currentRecordId() { + public @Nullable String currentRecordId() { return element.getRecordId(); } @Override - public Long currentRecordOffset() { + public @Nullable Long currentRecordOffset() { return element.getRecordOffset(); } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputReader.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputReader.java index a1f2db263a47..01d06dca25db 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputReader.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputReader.java @@ -19,25 +19,22 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.PCollectionView; -import org.checkerframework.checker.nullness.qual.Nullable; +import org.checkerframework.dataflow.qual.Pure; /** * The interface to objects that provide side inputs. Particular implementations may read a side * input directly or use appropriate sorts of caching, etc. */ public interface SideInputReader { - /** - * Returns the value of the given {@link PCollectionView} for the given {@link BoundedWindow}. - * - *

It is valid for a side input to be {@code null}. It is not valid for this to return - * {@code null} for any other reason. - */ - @Nullable + /** Returns the value of the given {@link PCollectionView} for the given {@link BoundedWindow}. */ + @Pure T get(PCollectionView view, BoundedWindow window); /** Returns true if the given {@link PCollectionView} is valid for this reader. */ + @Pure boolean contains(PCollectionView view); /** Returns true if there are no side inputs in this reader. */ + @Pure boolean isEmpty(); } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 9cce1f71f2a1..0fd63556b9c7 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.core; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; @@ -66,6 +67,9 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; +import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.joda.time.Instant; @@ -81,11 +85,6 @@ * @param the type of the {@link DoFn} (main) input elements * @param the type of the {@link DoFn} (main) output elements */ -@SuppressWarnings({ - "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) - "nullness", - "keyfor" -}) // TODO(https://github.com/apache/beam/issues/20497) public class SimpleDoFnRunner implements DoFnRunner { private final PipelineOptions options; @@ -471,12 +470,12 @@ public Instant timestamp() { } @Override - public String currentRecordId() { + public @Nullable String currentRecordId() { return elem.getRecordId(); } @Override - public Long currentRecordOffset() { + public @Nullable Long currentRecordOffset() { return elem.getRecordOffset(); } @@ -527,13 +526,21 @@ public Object key() { } @Override - public Object sideInput(String tagId) { - return sideInput(sideInputMapping.get(tagId)); + public @Nullable Object sideInput(String tagId) { + PCollectionView view = + checkStateNotNull(sideInputMapping.get(tagId), "Side input tag %s not found", tagId); + return sideInput(view); } @Override public Object schemaElement(int index) { - SerializableFunction converter = doFnSchemaInformation.getElementConverters().get(index); + checkStateNotNull( + doFnSchemaInformation, + "attempt to access element via schema when no schema information provided"); + + SerializableFunction converter = + (SerializableFunction) + doFnSchemaInformation.getElementConverters().get(index); return converter.apply(element()); } @@ -561,6 +568,7 @@ public OutputReceiver outputReceiver(DoFn doFn) { @Override public OutputReceiver outputRowReceiver(DoFn doFn) { + checkStateNotNull(mainOutputSchemaCoder, "cannot provide row receiver without schema coder"); return DoFnOutputReceivers.rowReceiver( this, builderSupplier, mainOutputTag, mainOutputSchemaCoder); } @@ -601,14 +609,25 @@ public WatermarkEstimator watermarkEstimator() { @Override public State state(String stateId, boolean alwaysFetched) { try { + DoFnSignature.StateDeclaration stateDeclaration = + checkStateNotNull( + signature.stateDeclarations().get(stateId), "state not found: %s", stateId); + StateSpec spec = - (StateSpec) signature.stateDeclarations().get(stateId).field().get(fn); + checkStateNotNull( + (StateSpec) stateDeclaration.field().get(fn), + "Field %s corresponding to state id %s contained null", + stateDeclaration.field(), + stateId); + + @NonNull + @Initialized // unclear why checkerframework needs this help State state = stepContext .stateInternals() - .state(getNamespace(), StateTags.tagForSpec(stateId, (StateSpec) spec)); + .state(getNamespace(), StateTags.tagForSpec(stateId, (StateSpec) spec)); if (alwaysFetched) { - return (State) ((ReadableState) state).readLater(); + return (State) ((ReadableState) state).readLater(); } else { return state; } @@ -620,7 +639,16 @@ public State state(String stateId, boolean alwaysFetched) { @Override public Timer timer(String timerId) { try { - TimerSpec spec = (TimerSpec) signature.timerDeclarations().get(timerId).field().get(fn); + DoFnSignature.TimerDeclaration timerDeclaration = + checkStateNotNull( + signature.timerDeclarations().get(timerId), "timer not found: %s", timerId); + TimerSpec spec = + (TimerSpec) + checkStateNotNull( + timerDeclaration.field().get(fn), + "Field %s corresponding to timer id %s contained null", + timerDeclaration.field(), + timerId); return new TimerInternalsTimer( window(), getNamespace(), timerId, spec, timestamp(), stepContext.timerInternals()); } catch (IllegalAccessException e) { @@ -631,8 +659,19 @@ public Timer timer(String timerId) { @Override public TimerMap timerFamily(String timerFamilyId) { try { + DoFnSignature.TimerFamilyDeclaration timerFamilyDeclaration = + checkStateNotNull( + signature.timerFamilyDeclarations().get(timerFamilyId), + "timer family not found: %s", + timerFamilyId); + TimerSpec spec = - (TimerSpec) signature.timerFamilyDeclarations().get(timerFamilyId).field().get(fn); + (TimerSpec) + checkStateNotNull( + timerFamilyDeclaration.field().get(fn), + "Field %s corresponding to timer family id %s contained null", + timerFamilyDeclaration.field(), + timerFamilyId); return new TimerInternalsTimerMap( timerFamilyId, window(), @@ -794,6 +833,7 @@ public OutputReceiver outputReceiver(DoFn doFn) { @Override public OutputReceiver outputRowReceiver(DoFn doFn) { + checkStateNotNull(mainOutputSchemaCoder, "cannot provide row receiver without schema coder"); return DoFnOutputReceivers.rowReceiver( this, builderSupplier, mainOutputTag, mainOutputSchemaCoder); } @@ -833,8 +873,18 @@ public WatermarkEstimator watermarkEstimator() { @Override public State state(String stateId, boolean alwaysFetched) { try { + DoFnSignature.StateDeclaration stateDeclaration = + checkStateNotNull( + signature.stateDeclarations().get(stateId), "state not found: %s", stateId); + StateSpec spec = - (StateSpec) signature.stateDeclarations().get(stateId).field().get(fn); + checkStateNotNull( + (StateSpec) stateDeclaration.field().get(fn), + "Field %s corresponding to state id %s contained null", + stateDeclaration.field(), + stateId); + + @NonNull State state = stepContext .stateInternals() @@ -852,7 +902,16 @@ public State state(String stateId, boolean alwaysFetched) { @Override public Timer timer(String timerId) { try { - TimerSpec spec = (TimerSpec) signature.timerDeclarations().get(timerId).field().get(fn); + DoFnSignature.TimerDeclaration timerDeclaration = + checkStateNotNull( + signature.timerDeclarations().get(timerId), "timer not found: %s", timerId); + TimerSpec spec = + (TimerSpec) + checkStateNotNull( + timerDeclaration.field().get(fn), + "Field %s corresponding to timer id %s contained null", + timerDeclaration.field(), + timerId); return new TimerInternalsTimer( window, getNamespace(), timerId, spec, timestamp(), stepContext.timerInternals()); } catch (IllegalAccessException e) { @@ -863,8 +922,18 @@ public Timer timer(String timerId) { @Override public TimerMap timerFamily(String timerFamilyId) { try { + DoFnSignature.TimerFamilyDeclaration timerFamilyDeclaration = + checkStateNotNull( + signature.timerFamilyDeclarations().get(timerFamilyId), + "timer family not found: %s", + timerFamilyId); TimerSpec spec = - (TimerSpec) signature.timerFamilyDeclarations().get(timerFamilyId).field().get(fn); + (TimerSpec) + checkStateNotNull( + timerFamilyDeclaration.field().get(fn), + "Field %s corresponding to timer family id %s contained null", + timerFamilyDeclaration.field(), + timerFamilyId); return new TimerInternalsTimerMap( timerFamilyId, window(), @@ -1058,6 +1127,7 @@ public OutputReceiver outputReceiver(DoFn doFn) { @Override public OutputReceiver outputRowReceiver(DoFn doFn) { + checkStateNotNull(mainOutputSchemaCoder, "cannot provide row receiver without schema coder"); return DoFnOutputReceivers.rowReceiver( this, builderSupplier, mainOutputTag, mainOutputSchemaCoder); } @@ -1096,14 +1166,23 @@ public WatermarkEstimator watermarkEstimator() { @Override public State state(String stateId, boolean alwaysFetched) { try { + DoFnSignature.StateDeclaration stateDeclaration = + checkStateNotNull( + signature.stateDeclarations().get(stateId), "state not found: %s", stateId); StateSpec spec = - (StateSpec) signature.stateDeclarations().get(stateId).field().get(fn); + checkStateNotNull( + (StateSpec) stateDeclaration.field().get(fn), + "Field %s corresponding to state id %s contained null", + stateDeclaration.field(), + stateId); + @NonNull + @Initialized // unclear why checkerframework needs this help State state = stepContext .stateInternals() - .state(getNamespace(), StateTags.tagForSpec(stateId, (StateSpec) spec)); + .state(getNamespace(), StateTags.tagForSpec(stateId, (StateSpec) spec)); if (alwaysFetched) { - return (State) ((ReadableState) state).readLater(); + return (State) ((ReadableState) state).readLater(); } else { return state; } @@ -1195,7 +1274,7 @@ private class TimerInternalsTimer implements Timer { private final String timerId; private final String timerFamilyId; private final TimerSpec spec; - private Instant target; + private @MonotonicNonNull Instant target; private @Nullable Instant outputTimestamp; private boolean noOutputTimestamp; private final Instant elementInputTimestamp; @@ -1313,15 +1392,18 @@ public Timer withNoOutputTimestamp() { *

  • The current element timestamp for other time domains. */ private void setAndVerifyOutputTimestamp() { + checkStateNotNull(target, "attempt to set outputTimestamp before setting target firing time"); if (outputTimestamp != null) { + // setting to local var so checkerframework knows that method calls will not mutate it + Instant timestampToValidate = outputTimestamp; Instant lowerBound; try { lowerBound = elementInputTimestamp.minus(fn.getAllowedTimestampSkew()); } catch (ArithmeticException e) { lowerBound = BoundedWindow.TIMESTAMP_MIN_VALUE; } - if (outputTimestamp.isBefore(lowerBound) - || outputTimestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) { + if (timestampToValidate.isBefore(lowerBound) + || timestampToValidate.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) { throw new IllegalArgumentException( String.format( "Cannot output timer with output timestamp %s. Output timestamps must be no " @@ -1329,7 +1411,7 @@ private void setAndVerifyOutputTimestamp() { + "allowed skew (%s) and no later than %s. See the " + "DoFn#getAllowedTimestampSkew() Javadoc for details on changing the " + "allowed skew.", - outputTimestamp, + timestampToValidate, elementInputTimestamp, fn.getAllowedTimestampSkew().getMillis() >= Integer.MAX_VALUE ? fn.getAllowedTimestampSkew() @@ -1346,6 +1428,9 @@ private void setAndVerifyOutputTimestamp() { // the element (or timer) setting this timer. outputTimestamp = elementInputTimestamp; } + + // Now it has been set for all cases other than this.noOutputTimestamp == true, and there are + // further validations if (outputTimestamp != null) { Instant windowExpiry = LateDataUtils.garbageCollectionTime(window, allowedLateness); if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { @@ -1380,6 +1465,12 @@ private void setAndVerifyOutputTimestamp() { * user has no way to compute a good choice of time. */ private void setUnderlyingTimer() { + checkStateNotNull( + outputTimestamp, + "internal error: null outputTimestamp: must be populated by setAndVerifyOutputTimestamp()"); + checkStateNotNull( + target, + "internal error: attempt to set internal timer when target timestamp not yet set"); timerInternals.setTimer( namespace, timerId, timerFamilyId, target, outputTimestamp, spec.getTimeDomain()); } @@ -1396,7 +1487,9 @@ private Instant getCurrentTime(TimeDomain timeDomain) { case PROCESSING_TIME: return timerInternals.currentProcessingTime(); case SYNCHRONIZED_PROCESSING_TIME: - return timerInternals.currentSynchronizedProcessingTime(); + return checkStateNotNull( + timerInternals.currentSynchronizedProcessingTime(), + "internal error: requested SYNCHRONIZED_PROCESSING_TIME but it was null"); default: throw new IllegalStateException( String.format("Timer created for unknown time domain %s", spec.getTimeDomain())); @@ -1446,19 +1539,17 @@ public void set(String timerId, Instant absoluteTime) { @Override public Timer get(String timerId) { - if (timers.get(timerId) == null) { - Timer timer = - new TimerInternalsTimer( - window, - namespace, - timerId, - timerFamilyId, - spec, - elementInputTimestamp, - timerInternals); - timers.put(timerId, timer); - } - return timers.get(timerId); + return timers.computeIfAbsent( + timerId, + id -> + new TimerInternalsTimer( + window, + namespace, + id, + timerFamilyId, + spec, + elementInputTimestamp, + timerInternals)); } } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/TimerSpec.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/TimerSpec.java index d6364874e326..138afb057cd6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/TimerSpec.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/TimerSpec.java @@ -18,8 +18,10 @@ package org.apache.beam.sdk.state; import java.io.Serializable; +import org.checkerframework.dataflow.qual.Pure; /** A specification for a {@link Timer}. This includes its {@link TimeDomain}. */ public interface TimerSpec extends Serializable { + @Pure TimeDomain getTimeDomain(); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index a82e84090cb7..125408108c07 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -326,10 +326,10 @@ public abstract class ProcessContext extends WindowedContext { public abstract PaneInfo pane(); @Pure - public abstract String currentRecordId(); + public abstract @Nullable String currentRecordId(); @Pure - public abstract Long currentRecordOffset(); + public abstract @Nullable Long currentRecordOffset(); } /** Information accessible when running a {@link DoFn.OnTimer} method. */ diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnSchemaInformation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnSchemaInformation.java index 8dc302dd1d54..cbb9e87f2afa 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnSchemaInformation.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnSchemaInformation.java @@ -33,6 +33,7 @@ import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.checkerframework.dataflow.qual.Pure; /** Represents information about how a DoFn extracts schemas. */ @AutoValue @@ -46,12 +47,15 @@ public abstract class DoFnSchemaInformation implements Serializable { * The schema of the @Element parameter. If the Java type does not match the input PCollection but * the schemas are compatible, Beam will automatically convert between the Java types. */ + @Pure public abstract List> getElementConverters(); /** Effective FieldAccessDescriptor applied by DoFn. */ + @Pure public abstract FieldAccessDescriptor getFieldAccessDescriptor(); /** Create an instance. */ + @Pure public static DoFnSchemaInformation create() { return new AutoValue_DoFnSchemaInformation.Builder() .setElementConverters(Collections.emptyList()) @@ -66,9 +70,11 @@ public abstract static class Builder { abstract Builder setFieldAccessDescriptor(FieldAccessDescriptor descriptor); + @Pure abstract DoFnSchemaInformation build(); } + @Pure public abstract Builder toBuilder(); /** diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java index 5c007223c23e..0079435700cb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java @@ -188,16 +188,30 @@ interface ArgumentProvider { /** * Provide a reference to the input element key in {@link org.apache.beam.sdk.values.KV} pair. + * + *

    {@code null} is allowed because user keys may be null. This method may not return + * null for any other reason. */ + @Nullable Object key(); - /** Provide a reference to the input sideInput with the specified tag. */ + /** + * Provide a reference to the input sideInput with the specified tag. + * + *

    {@code null} is allowed because side input values may be null. This method may not + * return null for any other reason. + */ + @Nullable Object sideInput(String tagId); /** * Provide a reference to the selected schema field corresponding to the input argument * specified by index. + * + *

    {@code null} is allowed because element fields may be null. This method may not + * return null for any other reason. */ + @Nullable Object schemaElement(int index); /** Provide a reference to the input element timestamp. */ @@ -282,13 +296,13 @@ public InputT element(DoFn doFn) { } @Override - public Object key() { + public @Nullable Object key() { throw new UnsupportedOperationException( "Cannot access key as parameter outside of @OnTimer method."); } @Override - public Object sideInput(String tagId) { + public @Nullable Object sideInput(String tagId) { throw new UnsupportedOperationException( String.format("SideInput unsupported in %s", getErrorContext())); } @@ -300,7 +314,7 @@ public TimerMap timerFamily(String tagId) { } @Override - public Object schemaElement(int index) { + public @Nullable Object schemaElement(int index) { throw new UnsupportedOperationException( String.format("Schema element unsupported in %s", getErrorContext())); } @@ -481,17 +495,17 @@ public InputT element(DoFn doFn) { } @Override - public Object key() { + public @Nullable Object key() { return delegate.key(); } @Override - public Object sideInput(String tagId) { + public @Nullable Object sideInput(String tagId) { return delegate.sideInput(tagId); } @Override - public Object schemaElement(int index) { + public @Nullable Object schemaElement(int index) { return delegate.schemaElement(index); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java index d44a62121f84..35f71d690102 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java @@ -736,6 +736,7 @@ public abstract static class BundleFinalizerParameter extends Parameter { public abstract static class ElementParameter extends Parameter { ElementParameter() {} + @Pure public abstract TypeDescriptor elementT(); } @@ -747,10 +748,13 @@ public abstract static class ElementParameter extends Parameter { public abstract static class SchemaElementParameter extends Parameter { SchemaElementParameter() {} + @Pure public abstract TypeDescriptor elementT(); + @Pure public abstract @Nullable String fieldAccessString(); + @Pure public abstract int index(); /** Builder class. */ @@ -762,9 +766,11 @@ public abstract static class Builder { public abstract Builder setIndex(int index); + @Pure public abstract SchemaElementParameter build(); } + @Pure public abstract Builder toBuilder(); } @@ -787,6 +793,7 @@ public abstract static class TimerIdParameter extends Parameter { public abstract static class KeyParameter extends Parameter { KeyParameter() {} + @Pure public abstract TypeDescriptor keyT(); } @@ -805,8 +812,10 @@ public abstract static class TimeDomainParameter extends Parameter { public abstract static class SideInputParameter extends Parameter { SideInputParameter() {} + @Pure public abstract TypeDescriptor elementT(); + @Pure public abstract String sideInputId(); /** Builder class. */ @@ -816,9 +825,11 @@ public abstract static class Builder { public abstract SideInputParameter.Builder setSideInputId(String sideInput); + @Pure public abstract SideInputParameter build(); } + @Pure public abstract SideInputParameter.Builder toBuilder(); } @@ -831,6 +842,7 @@ public abstract static class Builder { public abstract static class OutputReceiverParameter extends Parameter { OutputReceiverParameter() {} + @Pure public abstract boolean isRowReceiver(); } @@ -873,6 +885,7 @@ public abstract static class OnWindowExpirationContextParameter extends Paramete public abstract static class WindowParameter extends Parameter { WindowParameter() {} + @Pure public abstract TypeDescriptor windowT(); } @@ -897,6 +910,7 @@ public abstract static class RestrictionParameter extends Parameter { // Package visible for AutoValue RestrictionParameter() {} + @Pure public abstract TypeDescriptor restrictionT(); } @@ -910,6 +924,7 @@ public abstract static class WatermarkEstimatorStateParameter extends Parameter // Package visible for AutoValue WatermarkEstimatorStateParameter() {} + @Pure public abstract TypeDescriptor estimatorStateT(); } @@ -923,6 +938,7 @@ public abstract static class WatermarkEstimatorParameter extends Parameter { // Package visible for AutoValue WatermarkEstimatorParameter() {} + @Pure public abstract TypeDescriptor estimatorT(); } @@ -936,6 +952,7 @@ public abstract static class RestrictionTrackerParameter extends Parameter { // Package visible for AutoValue RestrictionTrackerParameter() {} + @Pure public abstract TypeDescriptor trackerT(); } @@ -950,8 +967,10 @@ public abstract static class StateParameter extends Parameter { // Package visible for AutoValue StateParameter() {} + @Pure public abstract StateDeclaration referent(); + @Pure public abstract boolean alwaysFetched(); } @@ -964,6 +983,7 @@ public abstract static class TimerParameter extends Parameter { // Package visible for AutoValue TimerParameter() {} + @Pure public abstract TimerDeclaration referent(); } @@ -973,6 +993,7 @@ public abstract static class TimerFamilyParameter extends Parameter { // Package visible for AutoValue TimerFamilyParameter() {} + @Pure public abstract TimerFamilyDeclaration referent(); } } @@ -982,37 +1003,46 @@ public abstract static class TimerFamilyParameter extends Parameter { public abstract static class ProcessElementMethod implements MethodWithExtraParameters { /** The annotated method itself. */ @Override + @Pure public abstract Method targetMethod(); /** Types of optional parameters of the annotated method, in the order they appear. */ @Override + @Pure public abstract List extraParameters(); /** * Whether this method requires stable input, expressed via {@link * org.apache.beam.sdk.transforms.DoFn.RequiresStableInput}. */ + @Pure public abstract boolean requiresStableInput(); /** * Whether this method requires time sorted input, expressed via {@link * org.apache.beam.sdk.transforms.DoFn.RequiresTimeSortedInput}. */ + @Pure public abstract boolean requiresTimeSortedInput(); /** Concrete type of the {@link RestrictionTracker} parameter, if present. */ + @Pure public abstract @Nullable TypeDescriptor trackerT(); /** Concrete type of the {@link WatermarkEstimator} parameter, if present. */ + @Pure public abstract @Nullable TypeDescriptor watermarkEstimatorT(); /** The window type used by this method, if any. */ @Override + @Pure public abstract @Nullable TypeDescriptor windowT(); /** Whether this {@link DoFn} returns a {@link ProcessContinuation} or void. */ + @Pure public abstract boolean hasReturnValue(); + @Pure static ProcessElementMethod create( Method targetMethod, List extraParameters, @@ -1033,6 +1063,7 @@ static ProcessElementMethod create( hasReturnValue); } + @Pure public @Nullable List getSchemaElementParameters() { return extraParameters().stream() .filter(Predicates.instanceOf(SchemaElementParameter.class)::apply) @@ -1040,6 +1071,7 @@ static ProcessElementMethod create( .collect(Collectors.toList()); } + @Pure public @Nullable List getSideInputParameters() { return extraParameters().stream() .filter(Predicates.instanceOf(SideInputParameter.class)::apply) @@ -1048,6 +1080,7 @@ static ProcessElementMethod create( } /** The {@link OutputReceiverParameter} for a main output, or null if there is none. */ + @Pure public @Nullable OutputReceiverParameter getMainOutputReceiver() { Optional parameter = extraParameters().stream() @@ -1059,6 +1092,7 @@ static ProcessElementMethod create( /** * Whether this {@link DoFn} is splittable. */ + @Pure public boolean isSplittable() { return extraParameters().stream() .anyMatch(Predicates.instanceOf(RestrictionTrackerParameter.class)::apply); @@ -1070,10 +1104,12 @@ public boolean isSplittable() { public abstract static class OnTimerMethod implements MethodWithExtraParameters { /** The id on the method's {@link DoFn.TimerId} annotation. */ + @Pure public abstract String id(); /** The annotated method itself. */ @Override + @Pure public abstract Method targetMethod(); /** @@ -1081,16 +1117,20 @@ public abstract static class OnTimerMethod implements MethodWithExtraParameters * org.apache.beam.sdk.transforms.DoFn.RequiresStableInput}. For timers, this means that any * state must be stably persisted prior to calling it. */ + @Pure public abstract boolean requiresStableInput(); /** The window type used by this method, if any. */ @Override + @Pure public abstract @Nullable TypeDescriptor windowT(); /** Types of optional parameters of the annotated method, in the order they appear. */ @Override + @Pure public abstract List extraParameters(); + @Pure static OnTimerMethod create( Method targetMethod, String id, @@ -1111,10 +1151,12 @@ static OnTimerMethod create( public abstract static class OnTimerFamilyMethod implements MethodWithExtraParameters { /** The id on the method's {@link DoFn.TimerId} annotation. */ + @Pure public abstract String id(); /** The annotated method itself. */ @Override + @Pure public abstract Method targetMethod(); /** @@ -1122,16 +1164,20 @@ public abstract static class OnTimerFamilyMethod implements MethodWithExtraParam * org.apache.beam.sdk.transforms.DoFn.RequiresStableInput}. For timers, this means that any * state must be stably persisted prior to calling it. */ + @Pure public abstract boolean requiresStableInput(); /** The window type used by this method, if any. */ @Override + @Pure public abstract @Nullable TypeDescriptor windowT(); /** Types of optional parameters of the annotated method, in the order they appear. */ @Override + @Pure public abstract List extraParameters(); + @Pure static OnTimerFamilyMethod create( Method targetMethod, String id, @@ -1153,6 +1199,7 @@ public abstract static class OnWindowExpirationMethod implements MethodWithExtra /** The annotated method itself. */ @Override + @Pure public abstract Method targetMethod(); /** @@ -1161,16 +1208,20 @@ public abstract static class OnWindowExpirationMethod implements MethodWithExtra * org.apache.beam.sdk.transforms.DoFn.OnWindowExpiration}, this means that any state must be * stably persisted prior to calling it. */ + @Pure public abstract boolean requiresStableInput(); /** The window type used by this method, if any. */ @Override + @Pure public abstract @Nullable TypeDescriptor windowT(); /** Types of optional parameters of the annotated method, in the order they appear. */ @Override + @Pure public abstract List extraParameters(); + @Pure static OnWindowExpirationMethod create( Method targetMethod, boolean requiresStableInput, @@ -1193,10 +1244,13 @@ public abstract static class TimerDeclaration { public static final String PREFIX = "ts-"; + @Pure public abstract String id(); + @Pure public abstract Field field(); + @Pure static TimerDeclaration create(String id, Field field) { return new AutoValue_DoFnSignature_TimerDeclaration(id, field); } @@ -1211,10 +1265,13 @@ public abstract static class TimerFamilyDeclaration { public static final String PREFIX = "tfs-"; + @Pure public abstract String id(); + @Pure public abstract Field field(); + @Pure static TimerFamilyDeclaration create(String id, Field field) { return new AutoValue_DoFnSignature_TimerFamilyDeclaration(id, field); } @@ -1225,16 +1282,20 @@ static TimerFamilyDeclaration create(String id, Field field) { public abstract static class BundleMethod implements MethodWithExtraParameters { /** The annotated method itself. */ @Override + @Pure public abstract Method targetMethod(); /** Types of optional parameters of the annotated method, in the order they appear. */ @Override + @Pure public abstract List extraParameters(); /** The type of window expected by this method, if any. */ @Override + @Pure public abstract @Nullable TypeDescriptor windowT(); + @Pure static BundleMethod create(Method targetMethod, List extraParameters) { /* start bundle/finish bundle currently do not get invoked on a per window basis and can't accept a BoundedWindow parameter */ return new AutoValue_DoFnSignature_BundleMethod(targetMethod, extraParameters, null); @@ -1247,12 +1308,16 @@ static BundleMethod create(Method targetMethod, List extraParameters) */ @AutoValue public abstract static class StateDeclaration { + @Pure public abstract String id(); + @Pure public abstract Field field(); + @Pure public abstract TypeDescriptor stateType(); + @Pure static StateDeclaration create( String id, Field field, TypeDescriptor stateType) { field.setAccessible(true); @@ -1267,10 +1332,13 @@ static StateDeclaration create( */ @AutoValue public abstract static class FieldAccessDeclaration { + @Pure public abstract String id(); + @Pure public abstract Field field(); + @Pure static FieldAccessDeclaration create(String id, Field field) { field.setAccessible(true); return new AutoValue_DoFnSignature_FieldAccessDeclaration(id, field); @@ -1282,12 +1350,15 @@ static FieldAccessDeclaration create(String id, Field field) { public abstract static class LifecycleMethod implements MethodWithExtraParameters { /** The annotated method itself. */ @Override + @Pure public abstract Method targetMethod(); /** Types of optional parameters of the annotated method, in the order they appear. */ @Override + @Pure public abstract List extraParameters(); + @Pure static LifecycleMethod create(Method targetMethod, List extraParameters) { return new AutoValue_DoFnSignature_LifecycleMethod(null, targetMethod, extraParameters); } @@ -1298,19 +1369,24 @@ static LifecycleMethod create(Method targetMethod, List extraParamete public abstract static class GetInitialRestrictionMethod implements MethodWithExtraParameters { /** The annotated method itself. */ @Override + @Pure public abstract Method targetMethod(); /** Type of the returned restriction. */ + @Pure public abstract TypeDescriptor restrictionT(); /** The window type used by this method, if any. */ @Override + @Pure public abstract @Nullable TypeDescriptor windowT(); /** Types of optional parameters of the annotated method, in the order they appear. */ @Override + @Pure public abstract List extraParameters(); + @Pure static GetInitialRestrictionMethod create( Method targetMethod, TypeDescriptor restrictionT, @@ -1326,16 +1402,20 @@ static GetInitialRestrictionMethod create( public abstract static class SplitRestrictionMethod implements MethodWithExtraParameters { /** The annotated method itself. */ @Override + @Pure public abstract Method targetMethod(); /** The window type used by this method, if any. */ @Override + @Pure public abstract @Nullable TypeDescriptor windowT(); /** Types of parameters of the annotated method, in the order they appear. */ @Override + @Pure public abstract List extraParameters(); + @Pure static SplitRestrictionMethod create( Method targetMethod, TypeDescriptor windowT, @@ -1350,16 +1430,20 @@ static SplitRestrictionMethod create( public abstract static class TruncateRestrictionMethod implements MethodWithExtraParameters { /** The annotated method itself. */ @Override + @Pure public abstract Method targetMethod(); /** The window type used by this method, if any. */ @Override + @Pure public abstract @Nullable TypeDescriptor windowT(); /** Types of parameters of the annotated method, in the order they appear. */ @Override + @Pure public abstract List extraParameters(); + @Pure static TruncateRestrictionMethod create( Method targetMethod, TypeDescriptor windowT, @@ -1374,17 +1458,21 @@ static TruncateRestrictionMethod create( public abstract static class NewTrackerMethod implements MethodWithExtraParameters { /** The annotated method itself. */ @Override + @Pure public abstract Method targetMethod(); /** Type of the returned {@link RestrictionTracker}. */ + @Pure public abstract TypeDescriptor trackerT(); /** The window type used by this method, if any. */ @Override + @Pure public abstract @Nullable TypeDescriptor windowT(); /** Types of optional parameters of the annotated method, in the order they appear. */ @Override + @Pure public abstract List extraParameters(); static NewTrackerMethod create( @@ -1402,16 +1490,20 @@ static NewTrackerMethod create( public abstract static class GetSizeMethod implements MethodWithExtraParameters { /** The annotated method itself. */ @Override + @Pure public abstract Method targetMethod(); /** The window type used by this method, if any. */ @Override + @Pure public abstract @Nullable TypeDescriptor windowT(); /** Types of optional parameters of the annotated method, in the order they appear. */ @Override + @Pure public abstract List extraParameters(); + @Pure static GetSizeMethod create( Method targetMethod, TypeDescriptor windowT, @@ -1425,11 +1517,14 @@ static GetSizeMethod create( public abstract static class GetRestrictionCoderMethod implements DoFnMethod { /** The annotated method itself. */ @Override + @Pure public abstract Method targetMethod(); /** Type of the returned {@link Coder}. */ + @Pure public abstract TypeDescriptor coderT(); + @Pure static GetRestrictionCoderMethod create(Method targetMethod, TypeDescriptor coderT) { return new AutoValue_DoFnSignature_GetRestrictionCoderMethod(targetMethod, coderT); } @@ -1441,19 +1536,24 @@ public abstract static class GetInitialWatermarkEstimatorStateMethod implements MethodWithExtraParameters { /** The annotated method itself. */ @Override + @Pure public abstract Method targetMethod(); /** Type of the returned watermark estimator state. */ + @Pure public abstract TypeDescriptor watermarkEstimatorStateT(); /** The window type used by this method, if any. */ @Override + @Pure public abstract @Nullable TypeDescriptor windowT(); /** Types of optional parameters of the annotated method, in the order they appear. */ @Override + @Pure public abstract List extraParameters(); + @Pure static GetInitialWatermarkEstimatorStateMethod create( Method targetMethod, TypeDescriptor watermarkEstimatorStateT, @@ -1469,19 +1569,24 @@ static GetInitialWatermarkEstimatorStateMethod create( public abstract static class NewWatermarkEstimatorMethod implements MethodWithExtraParameters { /** The annotated method itself. */ @Override + @Pure public abstract Method targetMethod(); /** Type of the returned {@link WatermarkEstimator}. */ + @Pure public abstract TypeDescriptor watermarkEstimatorT(); /** The window type used by this method, if any. */ @Override + @Pure public abstract @Nullable TypeDescriptor windowT(); /** Types of optional parameters of the annotated method, in the order they appear. */ @Override + @Pure public abstract List extraParameters(); + @Pure static NewWatermarkEstimatorMethod create( Method targetMethod, TypeDescriptor watermarkEstimatorT, @@ -1497,11 +1602,14 @@ static NewWatermarkEstimatorMethod create( public abstract static class GetWatermarkEstimatorStateCoderMethod implements DoFnMethod { /** The annotated method itself. */ @Override + @Pure public abstract Method targetMethod(); /** Type of the returned {@link Coder}. */ + @Pure public abstract TypeDescriptor coderT(); + @Pure static GetWatermarkEstimatorStateCoderMethod create( Method targetMethod, TypeDescriptor coderT) { return new AutoValue_DoFnSignature_GetWatermarkEstimatorStateCoderMethod( diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Preconditions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Preconditions.java index 7bb08039c81d..6ffb43a5648b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Preconditions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Preconditions.java @@ -24,6 +24,7 @@ import org.checkerframework.checker.nullness.qual.EnsuresNonNull; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; +import org.checkerframework.dataflow.qual.Pure; /** * Beam-specific variants of {@link @@ -44,6 +45,7 @@ public class Preconditions { */ @CanIgnoreReturnValue @EnsuresNonNull("#1") + @Pure public static T checkArgumentNotNull(@Nullable T reference) { if (reference == null) { throw new IllegalArgumentException(); @@ -62,6 +64,7 @@ public class Preconditions { */ @CanIgnoreReturnValue @EnsuresNonNull("#1") + @Pure public static T checkArgumentNotNull( @Nullable T reference, @Nullable Object errorMessage) { if (reference == null) { @@ -86,6 +89,7 @@ public class Preconditions { */ @CanIgnoreReturnValue @EnsuresNonNull("reference") + @Pure public static T checkArgumentNotNull( @Nullable T reference, @Nullable String errorMessageTemplate, @@ -103,6 +107,7 @@ public class Preconditions { */ @CanIgnoreReturnValue @EnsuresNonNull("#1") + @Pure public static T checkArgumentNotNull( @Nullable T obj, @Nullable String errorMessageTemplate, char p1) { if (obj == null) { @@ -118,6 +123,7 @@ public class Preconditions { */ @CanIgnoreReturnValue @EnsuresNonNull("#1") + @Pure public static T checkArgumentNotNull( @Nullable T obj, @Nullable String errorMessageTemplate, int p1) { if (obj == null) { @@ -133,6 +139,7 @@ public class Preconditions { */ @CanIgnoreReturnValue @EnsuresNonNull("#1") + @Pure public static T checkArgumentNotNull( @Nullable T obj, @Nullable String errorMessageTemplate, long p1) { if (obj == null) { @@ -148,6 +155,7 @@ public class Preconditions { */ @CanIgnoreReturnValue @EnsuresNonNull("#1") + @Pure public static T checkArgumentNotNull( @Nullable T obj, @Nullable String errorMessageTemplate, @Nullable Object p1) { if (obj == null) { @@ -163,6 +171,7 @@ public class Preconditions { */ @CanIgnoreReturnValue @EnsuresNonNull("#1") + @Pure public static T checkArgumentNotNull( @Nullable T obj, @Nullable String errorMessageTemplate, char p1, char p2) { if (obj == null) { @@ -178,6 +187,7 @@ public class Preconditions { */ @CanIgnoreReturnValue @EnsuresNonNull("#1") + @Pure public static T checkArgumentNotNull( @Nullable T obj, @Nullable String errorMessageTemplate, char p1, int p2) { if (obj == null) { @@ -193,6 +203,7 @@ public class Preconditions { */ @CanIgnoreReturnValue @EnsuresNonNull("#1") + @Pure public static T checkArgumentNotNull( @Nullable T obj, @Nullable String errorMessageTemplate, char p1, long p2) { if (obj == null) { @@ -208,6 +219,7 @@ public class Preconditions { */ @CanIgnoreReturnValue @EnsuresNonNull("#1") + @Pure public static T checkArgumentNotNull( @Nullable T obj, @Nullable String errorMessageTemplate, char p1, @Nullable Object p2) { if (obj == null) { @@ -223,6 +235,7 @@ public class Preconditions { */ @CanIgnoreReturnValue @EnsuresNonNull("#1") + @Pure public static T checkArgumentNotNull( @Nullable T obj, @Nullable String errorMessageTemplate, int p1, char p2) { if (obj == null) { @@ -238,6 +251,7 @@ public class Preconditions { */ @CanIgnoreReturnValue @EnsuresNonNull("#1") + @Pure public static T checkArgumentNotNull( @Nullable T obj, @Nullable String errorMessageTemplate, int p1, int p2) { if (obj == null) { @@ -253,6 +267,7 @@ public class Preconditions { */ @CanIgnoreReturnValue @EnsuresNonNull("#1") + @Pure public static T checkArgumentNotNull( @Nullable T obj, @Nullable String errorMessageTemplate, int p1, long p2) { if (obj == null) { @@ -268,6 +283,7 @@ public class Preconditions { */ @CanIgnoreReturnValue @EnsuresNonNull("#1") + @Pure public static T checkArgumentNotNull( @Nullable T obj, @Nullable String errorMessageTemplate, int p1, @Nullable Object p2) { if (obj == null) { @@ -283,6 +299,7 @@ public class Preconditions { */ @CanIgnoreReturnValue @EnsuresNonNull("#1") + @Pure public static T checkArgumentNotNull( @Nullable T obj, @Nullable String errorMessageTemplate, long p1, char p2) { if (obj == null) { @@ -298,6 +315,7 @@ public class Preconditions { */ @CanIgnoreReturnValue @EnsuresNonNull("#1") + @Pure public static T checkArgumentNotNull( @Nullable T obj, @Nullable String errorMessageTemplate, long p1, int p2) { if (obj == null) { @@ -313,6 +331,7 @@ public class Preconditions { */ @CanIgnoreReturnValue @EnsuresNonNull("#1") + @Pure public static T checkArgumentNotNull( @Nullable T obj, @Nullable String errorMessageTemplate, long p1, long p2) { if (obj == null) { @@ -328,6 +347,7 @@ public class Preconditions { */ @CanIgnoreReturnValue @EnsuresNonNull("#1") + @Pure public static T checkArgumentNotNull( @Nullable T obj, @Nullable String errorMessageTemplate, long p1, @Nullable Object p2) { if (obj == null) { @@ -343,6 +363,7 @@ public class Preconditions { */ @CanIgnoreReturnValue @EnsuresNonNull("#1") + @Pure public static T checkArgumentNotNull( @Nullable T obj, @Nullable String errorMessageTemplate, @Nullable Object p1, char p2) { if (obj == null) { @@ -358,6 +379,7 @@ public class Preconditions { */ @CanIgnoreReturnValue @EnsuresNonNull("#1") + @Pure public static T checkArgumentNotNull( @Nullable T obj, @Nullable String errorMessageTemplate, @Nullable Object p1, int p2) { if (obj == null) { @@ -373,6 +395,7 @@ public class Preconditions { */ @CanIgnoreReturnValue @EnsuresNonNull("#1") + @Pure public static T checkArgumentNotNull( @Nullable T obj, @Nullable String errorMessageTemplate, @Nullable Object p1, long p2) { if (obj == null) { @@ -388,6 +411,7 @@ public class Preconditions { */ @CanIgnoreReturnValue @EnsuresNonNull("#1") + @Pure public static T checkArgumentNotNull( @Nullable T obj, @Nullable String errorMessageTemplate, @@ -406,6 +430,7 @@ public class Preconditions { */ @CanIgnoreReturnValue @EnsuresNonNull("#1") + @Pure public static T checkArgumentNotNull( @Nullable T obj, @Nullable String errorMessageTemplate, @@ -425,6 +450,7 @@ public class Preconditions { */ @CanIgnoreReturnValue @EnsuresNonNull("#1") + @Pure public static T checkArgumentNotNull( @Nullable T obj, @Nullable String errorMessageTemplate, @@ -447,6 +473,7 @@ public class Preconditions { */ @CanIgnoreReturnValue @EnsuresNonNull("#1") + @Pure public static T checkStateNotNull(@Nullable T obj) { if (obj == null) { throw new IllegalStateException(); @@ -465,6 +492,7 @@ public class Preconditions { */ @CanIgnoreReturnValue @EnsuresNonNull("#1") + @Pure public static T checkStateNotNull( @Nullable T reference, @Nullable Object errorMessage) { if (reference == null) { @@ -489,6 +517,7 @@ public class Preconditions { */ @CanIgnoreReturnValue @EnsuresNonNull("#1") + @Pure public static T checkStateNotNull( @Nullable T obj, @Nullable String errorMessageTemplate, @@ -506,6 +535,7 @@ public class Preconditions { */ @CanIgnoreReturnValue @EnsuresNonNull("#1") + @Pure public static T checkStateNotNull( @Nullable T obj, @Nullable String errorMessageTemplate, @Nullable Object p1) { if (obj == null) { @@ -521,6 +551,7 @@ public class Preconditions { */ @CanIgnoreReturnValue @EnsuresNonNull("#1") + @Pure public static T checkStateNotNull( @Nullable T obj, @Nullable String errorMessageTemplate, diff --git a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/KafkaSourceConsumerFnTest.java b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/KafkaSourceConsumerFnTest.java index f5ada3033561..1df50b5e9acd 100644 --- a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/KafkaSourceConsumerFnTest.java +++ b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/KafkaSourceConsumerFnTest.java @@ -159,7 +159,7 @@ public void testKafkaOffsetHolderEquality() { null)); tester.testEquals(); } -}; +} class CounterSourceConnector extends SourceConnector { public static class CounterSourceConnectorConfig extends AbstractConfig {