diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExpiringMemoizingSerializableSupplier.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExpiringMemoizingSerializableSupplier.java new file mode 100644 index 000000000000..b64ca35aaed1 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExpiringMemoizingSerializableSupplier.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.time.Duration; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * A thread-safe {@link SerializableSupplier} that wraps a {@link SerializableSupplier} and retains + * the supplier's result for the provided period. Lightweight locking and synchronization is used to + * guarantee mutual exclusivity and visibility of updates at the expense of single nanosecond + * precision. + * + *

The initial value and subsequently retained values are considered transient and will not be + * serialized. + */ +public final class ExpiringMemoizingSerializableSupplier + implements SerializableSupplier { + // TODO(sjvanrossum): Replace with VarHandle after JDK 8 support is dropped. + @SuppressWarnings("rawtypes") + private static final AtomicLongFieldUpdater + DEADLINE_NANOS = + AtomicLongFieldUpdater.newUpdater( + ExpiringMemoizingSerializableSupplier.class, "deadlineNanos"); + + private final SerializableSupplier supplier; + private final long periodNanos; + private transient T value; + private transient volatile long deadlineNanos; + + public ExpiringMemoizingSerializableSupplier( + SerializableSupplier supplier, Duration period, T initialValue, Duration initialDelay) { + this.supplier = supplier; // final store + this.periodNanos = period.toNanos(); // final store + this.value = initialValue; // normal store + + // Ordered stores may be reordered with subsequent loads. + // The default value of deadlineNanos permits an indefinite initial expiration depending on the + // clock's state. + this.deadlineNanos = + System.nanoTime() + initialDelay.toNanos() + & ~1L; // volatile store (sequentially consistent release) + } + + @Override + public T get() { + final long deadlineNanos = this.deadlineNanos; // volatile load (acquire) + final long nowNanos; + final T result; + + /* + * Sacrificing 1ns precision to pack the lock state into the low bit of deadlineNanos is deemed acceptable. + * Subsequent loads and stores are prevented from reordering before a volatile load. + * Preceeding loads and stores are prevented from reordering after an ordered store. + * A store to value can't be reordered after a store to deadlineNanos + * A store to deadlineNanos can be reordered after a load of deadlineNanos. + * The returned value will be as old as or younger than deadlineNanos. + */ + if ((deadlineNanos & 1L) == 0 + && deadlineNanos - (nowNanos = System.nanoTime()) <= 0L + && DEADLINE_NANOS + .compareAndSet( // volatile load/store (sequentially consistent acquire/release) + this, deadlineNanos, deadlineNanos | 1L)) { + try { + this.value = result = supplier.get(); // normal store + } finally { + DEADLINE_NANOS.lazySet(this, (nowNanos + periodNanos) & ~1L); // ordered store (release) + } + } else { + result = this.value; // normal load + } + + return result; + } + + private void readObject(ObjectInputStream is) throws IOException, ClassNotFoundException { + is.defaultReadObject(); + + // Immediate initial expiration prevents a load of value before it is initialized. + this.deadlineNanos = + System.nanoTime() & ~1L; // volatile store (sequentially consistent release) + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ExpiringMemoizingSerializableSupplierTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ExpiringMemoizingSerializableSupplierTest.java new file mode 100644 index 000000000000..f45f41747755 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ExpiringMemoizingSerializableSupplierTest.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThrows; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Iterator; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class ExpiringMemoizingSerializableSupplierTest { + + @Test + public void testSupplierIsSerializable() { + final ExpiringMemoizingSerializableSupplier instance = + new ExpiringMemoizingSerializableSupplier<>( + Object::new, Duration.ZERO, null, Duration.ZERO); + + // Instances must be serializable. + SerializableUtils.ensureSerializable(instance); + } + + @Test + public void testSameValueAfterConstruction() { + final Object initialValue = new Object(); + final ExpiringMemoizingSerializableSupplier instance = + new ExpiringMemoizingSerializableSupplier<>( + Object::new, Duration.ofHours(1), initialValue, Duration.ofHours(1)); + + // Construction initializes deadlineNanos for delayed expiration. + // The supplied value must not be observed as uninitialized + // The supplied value is referentially equal to initialValue. + final Object instanceValue = instance.get(); + assertNotNull(instanceValue); + assertSame(initialValue, instanceValue); + } + + @SuppressWarnings("unchecked") + @Test + public void testDistinctValuesAfterDeserialization() throws Exception { + final Object initialValue = new Object(); + final ExpiringMemoizingSerializableSupplier instance = + new ExpiringMemoizingSerializableSupplier<>( + Object::new, Duration.ofHours(1), initialValue, Duration.ofHours(1)); + + // Deserialized instances must be referentially distinct for the purpose of this test. + final byte[] serialized = SerializableUtils.serializeToByteArray(instance); + final ExpiringMemoizingSerializableSupplier deserialized1 = + (ExpiringMemoizingSerializableSupplier) + SerializableUtils.deserializeFromByteArray(serialized, "instance"); + final ExpiringMemoizingSerializableSupplier deserialized2 = + (ExpiringMemoizingSerializableSupplier) + SerializableUtils.deserializeFromByteArray(serialized, "instance"); + assertNotSame(instance, deserialized1); + assertNotSame(instance, deserialized2); + assertNotSame(deserialized1, deserialized2); + + // Deserialization initializes deadlineNanos for immediate expiration. + // Supplied values must not be observed as uninitialized. + // The initial and supplied values are all referentially distinct. + final Object deserialized1Value = deserialized1.get(); + final Object deserialized2Value = deserialized2.get(); + assertNotNull(deserialized1Value); + assertNotNull(deserialized2Value); + assertNotSame(initialValue, deserialized1Value); + assertNotSame(initialValue, deserialized2Value); + assertNotSame(deserialized1Value, deserialized2Value); + } + + @Test + public void testProgressAfterException() throws Exception { + final Object initialValue = new Object(); + final Object terminalValue = new Object(); + final Iterator suppliedValues = + Arrays.asList(new Object(), new RuntimeException(), new Object()).iterator(); + final ExpiringMemoizingSerializableSupplier instance = + new ExpiringMemoizingSerializableSupplier<>( + () -> { + if (!suppliedValues.hasNext()) { + return terminalValue; + } + final Object value = suppliedValues.next(); + if (value instanceof RuntimeException) { + throw (RuntimeException) value; + } + return value; + }, + Duration.ZERO, + initialValue, + Duration.ZERO); + + // The initial value expires immediately and must not be observed. + final Object instanceValue = instance.get(); + assertNotSame(initialValue, instanceValue); + + // An exception must be thrown for the purpose of this test. + assertThrows(RuntimeException.class, instance::get); + + // Exceptions must not lock the instance state. + // The supplied value is referentially distinct from instanceValue for the purpose of this test. + // Note that parallelly observed supplied values may be referentially equal to instanceValue. + final Object intermediateValue = instance.get(); + assertNotSame(instanceValue, intermediateValue); + + // The supplied value is referentially equal to terminalValue for the purpose of this test. + assertSame(terminalValue, instance.get()); + } + + @Test + public void testInitialValueVisibilityOnDifferentThread() throws Exception { + final Object initialValue = new Object(); + final Object[] valueHolder = new Object[] {new Object()}; + final ExpiringMemoizingSerializableSupplier instance = + new ExpiringMemoizingSerializableSupplier<>( + Object::new, Duration.ZERO, initialValue, Duration.ofHours(1)); + + // Initialization of value and deadlineNanos must be visible on other threads. + // The initial value must be supplied for delayed expiration. + final Thread t = new Thread(() -> valueHolder[0] = instance.get()); + t.start(); + t.join(); + final Object observedValue = valueHolder[0]; + assertNotNull(observedValue); + assertSame(initialValue, observedValue); + } + + @Test + public void testIntermediateValueVisibilityOnDifferentThread() throws Exception { + final Object intermediateValue = new Object(); + final Object[] valueHolder = new Object[] {new Object()}; + final ExpiringMemoizingSerializableSupplier instance = + new ExpiringMemoizingSerializableSupplier<>( + () -> intermediateValue, Duration.ofHours(1), new Object(), Duration.ZERO); + + // Initialization of value and deadlineNanos must be visible on other threads. + // The intermediate value must be supplied for immediate expiration. + final Thread t = new Thread(() -> valueHolder[0] = instance.get()); + t.start(); + t.join(); + final Object observedValue = valueHolder[0]; + assertNotNull(observedValue); + assertSame(intermediateValue, observedValue); + } +} diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index eab5ae083187..60fc9d57a626 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -27,11 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Supplier; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.kafka.KafkaIO.ReadSourceDescriptors; import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg; @@ -52,6 +48,7 @@ import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.ExpiringMemoizingSerializableSupplier; import org.apache.beam.sdk.util.MemoizingPerInstantiationSerializableSupplier; import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.sdk.util.SerializableSupplier; @@ -348,101 +345,38 @@ public Consumer load( */ private static class KafkaLatestOffsetEstimator implements GrowableOffsetRangeTracker.RangeEndEstimator, Closeable { - private static final AtomicReferenceFieldUpdater - CURRENT_REFRESH_TASK = - (AtomicReferenceFieldUpdater) - AtomicReferenceFieldUpdater.newUpdater( - KafkaLatestOffsetEstimator.class, Runnable.class, "currentRefreshTask"); - private final Executor executor; private final Consumer offsetConsumer; - private final TopicPartition topicPartition; - // TODO(sjvanrossum): Use VarHandle.setOpaque/getOpaque when Java 8 support is dropped - private long lastRefreshEndOffset; - // TODO(sjvanrossum): Use VarHandle.setOpaque/getOpaque when Java 8 support is dropped - private long nextRefreshNanos; - private volatile @Nullable Runnable currentRefreshTask; - - /* - Periodic refreshes of lastRefreshEndOffset and nextRefreshNanos are guarded by the volatile - field currentRefreshTask. This guard's correctness depends on specific ordering of reads and - writes (loads and stores). - - To validate the behavior of this guard please read the Java Memory Model (JMM) specification. - For the current context consider the following oversimplifications of the JMM: - - Writes to a non-volatile long or double field are non-atomic. - - Writes to a non-volatile field may never become visible to another core. - - Writes to a volatile field are atomic and will become visible to another core. - - Lazy writes to a volatile field are atomic and will become visible to another core for - reads of that volatile field. - - Writes preceeding writes or lazy writes to a volatile field are visible to another core. - - In short, the contents of this class' guarded fields are visible if the guard field is (lazily) - written last and read first. The contents of the volatile guard may be stale in comparison to - the contents of the guarded fields. For this method it is important that no more than one - thread will schedule a refresh task. Using currentRefreshTask as the guard field ensures that - lastRefreshEndOffset and nextRefreshNanos are at least as stale as currentRefreshTask. - It's fine if lastRefreshEndOffset and nextRefreshNanos are less stale than currentRefreshTask. - - Removing currentRefreshTask by guarding on nextRefreshNanos is possible, but executing - currentRefreshTask == null is practically free (measured in cycles) compared to executing - nextRefreshNanos < System.nanoTime() (measured in nanoseconds). - - Note that the JMM specifies that writes to a long or double are not guaranteed to be atomic. - In practice, every 64-bit JVM will treat them as atomic (and the JMM encourages this). - There's no way to force atomicity without visibility in Java 8 so atomicity guards have been - omitted. Java 9 introduces VarHandle with "opaque" getters/setters which do provide this. - */ + private final Supplier offsetSupplier; KafkaLatestOffsetEstimator( final Consumer offsetConsumer, final TopicPartition topicPartition) { - this.executor = Executors.newSingleThreadExecutor(); this.offsetConsumer = offsetConsumer; - this.topicPartition = topicPartition; - this.lastRefreshEndOffset = -1L; - this.nextRefreshNanos = Long.MIN_VALUE; - this.currentRefreshTask = null; + this.offsetSupplier = + new ExpiringMemoizingSerializableSupplier<>( + () -> { + try { + return offsetConsumer + .endOffsets(Collections.singleton(topicPartition)) + .getOrDefault(topicPartition, Long.MIN_VALUE); + } catch (Throwable t) { + LOG.error("Failed to get end offset for {}", topicPartition, t); + return Long.MIN_VALUE; + } + }, + Duration.ofSeconds(1), + Long.MIN_VALUE, + Duration.ZERO); } @Override public long estimate() { - final @Nullable Runnable task = currentRefreshTask; // volatile load (acquire) - - final long currentNanos; - if (task == null - && nextRefreshNanos < (currentNanos = System.nanoTime()) // normal load - && CURRENT_REFRESH_TASK.compareAndSet(this, null, this::refresh)) { // volatile load/store - try { - executor.execute(this::refresh); - } catch (RejectedExecutionException ex) { - LOG.error("Execution of end offset refresh rejected for {}", topicPartition, ex); - nextRefreshNanos = currentNanos + TimeUnit.SECONDS.toNanos(1); // normal store - CURRENT_REFRESH_TASK.lazySet(this, null); // ordered store (release) - } - } - - return lastRefreshEndOffset; // normal load + return offsetSupplier.get(); } @Override public void close() { offsetConsumer.close(); } - - private void refresh() { - try { - @Nullable - Long endOffset = - offsetConsumer.endOffsets(Collections.singleton(topicPartition)).get(topicPartition); - if (endOffset == null) { - LOG.warn("No end offset found for partition {}.", topicPartition); - } else { - lastRefreshEndOffset = endOffset; // normal store - } - nextRefreshNanos = System.nanoTime() + TimeUnit.SECONDS.toNanos(1); // normal store - } finally { - CURRENT_REFRESH_TASK.lazySet(this, null); // ordered store (release) - } - } } @GetInitialRestriction