Skip to content

Commit 11f9b0c

Browse files
authored
Extract expiring memoizing supplier as a separate utility and run end offset refreshes in the current thread. (#36075)
1 parent 9ec41f8 commit 11f9b0c

File tree

3 files changed

+286
-85
lines changed

3 files changed

+286
-85
lines changed
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.util;
19+
20+
import java.io.IOException;
21+
import java.io.ObjectInputStream;
22+
import java.time.Duration;
23+
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
24+
import org.checkerframework.checker.nullness.qual.Nullable;
25+
26+
/**
27+
* A thread-safe {@link SerializableSupplier} that wraps a {@link SerializableSupplier} and retains
28+
* the supplier's result for the provided period. Lightweight locking and synchronization is used to
29+
* guarantee mutual exclusivity and visibility of updates at the expense of single nanosecond
30+
* precision.
31+
*
32+
* <p>The initial value and subsequently retained values are considered transient and will not be
33+
* serialized.
34+
*/
35+
public final class ExpiringMemoizingSerializableSupplier<T extends @Nullable Object>
36+
implements SerializableSupplier<T> {
37+
// TODO(sjvanrossum): Replace with VarHandle after JDK 8 support is dropped.
38+
@SuppressWarnings("rawtypes")
39+
private static final AtomicLongFieldUpdater<ExpiringMemoizingSerializableSupplier>
40+
DEADLINE_NANOS =
41+
AtomicLongFieldUpdater.newUpdater(
42+
ExpiringMemoizingSerializableSupplier.class, "deadlineNanos");
43+
44+
private final SerializableSupplier<T> supplier;
45+
private final long periodNanos;
46+
private transient T value;
47+
private transient volatile long deadlineNanos;
48+
49+
public ExpiringMemoizingSerializableSupplier(
50+
SerializableSupplier<T> supplier, Duration period, T initialValue, Duration initialDelay) {
51+
this.supplier = supplier; // final store
52+
this.periodNanos = period.toNanos(); // final store
53+
this.value = initialValue; // normal store
54+
55+
// Ordered stores may be reordered with subsequent loads.
56+
// The default value of deadlineNanos permits an indefinite initial expiration depending on the
57+
// clock's state.
58+
this.deadlineNanos =
59+
System.nanoTime() + initialDelay.toNanos()
60+
& ~1L; // volatile store (sequentially consistent release)
61+
}
62+
63+
@Override
64+
public T get() {
65+
final long deadlineNanos = this.deadlineNanos; // volatile load (acquire)
66+
final long nowNanos;
67+
final T result;
68+
69+
/*
70+
* Sacrificing 1ns precision to pack the lock state into the low bit of deadlineNanos is deemed acceptable.
71+
* Subsequent loads and stores are prevented from reordering before a volatile load.
72+
* Preceeding loads and stores are prevented from reordering after an ordered store.
73+
* A store to value can't be reordered after a store to deadlineNanos
74+
* A store to deadlineNanos can be reordered after a load of deadlineNanos.
75+
* The returned value will be as old as or younger than deadlineNanos.
76+
*/
77+
if ((deadlineNanos & 1L) == 0
78+
&& deadlineNanos - (nowNanos = System.nanoTime()) <= 0L
79+
&& DEADLINE_NANOS
80+
.compareAndSet( // volatile load/store (sequentially consistent acquire/release)
81+
this, deadlineNanos, deadlineNanos | 1L)) {
82+
try {
83+
this.value = result = supplier.get(); // normal store
84+
} finally {
85+
DEADLINE_NANOS.lazySet(this, (nowNanos + periodNanos) & ~1L); // ordered store (release)
86+
}
87+
} else {
88+
result = this.value; // normal load
89+
}
90+
91+
return result;
92+
}
93+
94+
private void readObject(ObjectInputStream is) throws IOException, ClassNotFoundException {
95+
is.defaultReadObject();
96+
97+
// Immediate initial expiration prevents a load of value before it is initialized.
98+
this.deadlineNanos =
99+
System.nanoTime() & ~1L; // volatile store (sequentially consistent release)
100+
}
101+
}
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.util;
19+
20+
import static org.junit.Assert.assertNotNull;
21+
import static org.junit.Assert.assertNotSame;
22+
import static org.junit.Assert.assertSame;
23+
import static org.junit.Assert.assertThrows;
24+
25+
import java.time.Duration;
26+
import java.util.Arrays;
27+
import java.util.Iterator;
28+
import org.junit.Test;
29+
import org.junit.runner.RunWith;
30+
import org.junit.runners.JUnit4;
31+
32+
@RunWith(JUnit4.class)
33+
public class ExpiringMemoizingSerializableSupplierTest {
34+
35+
@Test
36+
public void testSupplierIsSerializable() {
37+
final ExpiringMemoizingSerializableSupplier<?> instance =
38+
new ExpiringMemoizingSerializableSupplier<>(
39+
Object::new, Duration.ZERO, null, Duration.ZERO);
40+
41+
// Instances must be serializable.
42+
SerializableUtils.ensureSerializable(instance);
43+
}
44+
45+
@Test
46+
public void testSameValueAfterConstruction() {
47+
final Object initialValue = new Object();
48+
final ExpiringMemoizingSerializableSupplier<Object> instance =
49+
new ExpiringMemoizingSerializableSupplier<>(
50+
Object::new, Duration.ofHours(1), initialValue, Duration.ofHours(1));
51+
52+
// Construction initializes deadlineNanos for delayed expiration.
53+
// The supplied value must not be observed as uninitialized
54+
// The supplied value is referentially equal to initialValue.
55+
final Object instanceValue = instance.get();
56+
assertNotNull(instanceValue);
57+
assertSame(initialValue, instanceValue);
58+
}
59+
60+
@SuppressWarnings("unchecked")
61+
@Test
62+
public void testDistinctValuesAfterDeserialization() throws Exception {
63+
final Object initialValue = new Object();
64+
final ExpiringMemoizingSerializableSupplier<Object> instance =
65+
new ExpiringMemoizingSerializableSupplier<>(
66+
Object::new, Duration.ofHours(1), initialValue, Duration.ofHours(1));
67+
68+
// Deserialized instances must be referentially distinct for the purpose of this test.
69+
final byte[] serialized = SerializableUtils.serializeToByteArray(instance);
70+
final ExpiringMemoizingSerializableSupplier<Object> deserialized1 =
71+
(ExpiringMemoizingSerializableSupplier<Object>)
72+
SerializableUtils.deserializeFromByteArray(serialized, "instance");
73+
final ExpiringMemoizingSerializableSupplier<Object> deserialized2 =
74+
(ExpiringMemoizingSerializableSupplier<Object>)
75+
SerializableUtils.deserializeFromByteArray(serialized, "instance");
76+
assertNotSame(instance, deserialized1);
77+
assertNotSame(instance, deserialized2);
78+
assertNotSame(deserialized1, deserialized2);
79+
80+
// Deserialization initializes deadlineNanos for immediate expiration.
81+
// Supplied values must not be observed as uninitialized.
82+
// The initial and supplied values are all referentially distinct.
83+
final Object deserialized1Value = deserialized1.get();
84+
final Object deserialized2Value = deserialized2.get();
85+
assertNotNull(deserialized1Value);
86+
assertNotNull(deserialized2Value);
87+
assertNotSame(initialValue, deserialized1Value);
88+
assertNotSame(initialValue, deserialized2Value);
89+
assertNotSame(deserialized1Value, deserialized2Value);
90+
}
91+
92+
@Test
93+
public void testProgressAfterException() throws Exception {
94+
final Object initialValue = new Object();
95+
final Object terminalValue = new Object();
96+
final Iterator<?> suppliedValues =
97+
Arrays.asList(new Object(), new RuntimeException(), new Object()).iterator();
98+
final ExpiringMemoizingSerializableSupplier<?> instance =
99+
new ExpiringMemoizingSerializableSupplier<>(
100+
() -> {
101+
if (!suppliedValues.hasNext()) {
102+
return terminalValue;
103+
}
104+
final Object value = suppliedValues.next();
105+
if (value instanceof RuntimeException) {
106+
throw (RuntimeException) value;
107+
}
108+
return value;
109+
},
110+
Duration.ZERO,
111+
initialValue,
112+
Duration.ZERO);
113+
114+
// The initial value expires immediately and must not be observed.
115+
final Object instanceValue = instance.get();
116+
assertNotSame(initialValue, instanceValue);
117+
118+
// An exception must be thrown for the purpose of this test.
119+
assertThrows(RuntimeException.class, instance::get);
120+
121+
// Exceptions must not lock the instance state.
122+
// The supplied value is referentially distinct from instanceValue for the purpose of this test.
123+
// Note that parallelly observed supplied values may be referentially equal to instanceValue.
124+
final Object intermediateValue = instance.get();
125+
assertNotSame(instanceValue, intermediateValue);
126+
127+
// The supplied value is referentially equal to terminalValue for the purpose of this test.
128+
assertSame(terminalValue, instance.get());
129+
}
130+
131+
@Test
132+
public void testInitialValueVisibilityOnDifferentThread() throws Exception {
133+
final Object initialValue = new Object();
134+
final Object[] valueHolder = new Object[] {new Object()};
135+
final ExpiringMemoizingSerializableSupplier<Object> instance =
136+
new ExpiringMemoizingSerializableSupplier<>(
137+
Object::new, Duration.ZERO, initialValue, Duration.ofHours(1));
138+
139+
// Initialization of value and deadlineNanos must be visible on other threads.
140+
// The initial value must be supplied for delayed expiration.
141+
final Thread t = new Thread(() -> valueHolder[0] = instance.get());
142+
t.start();
143+
t.join();
144+
final Object observedValue = valueHolder[0];
145+
assertNotNull(observedValue);
146+
assertSame(initialValue, observedValue);
147+
}
148+
149+
@Test
150+
public void testIntermediateValueVisibilityOnDifferentThread() throws Exception {
151+
final Object intermediateValue = new Object();
152+
final Object[] valueHolder = new Object[] {new Object()};
153+
final ExpiringMemoizingSerializableSupplier<Object> instance =
154+
new ExpiringMemoizingSerializableSupplier<>(
155+
() -> intermediateValue, Duration.ofHours(1), new Object(), Duration.ZERO);
156+
157+
// Initialization of value and deadlineNanos must be visible on other threads.
158+
// The intermediate value must be supplied for immediate expiration.
159+
final Thread t = new Thread(() -> valueHolder[0] = instance.get());
160+
t.start();
161+
t.join();
162+
final Object observedValue = valueHolder[0];
163+
assertNotNull(observedValue);
164+
assertSame(intermediateValue, observedValue);
165+
}
166+
}

0 commit comments

Comments
 (0)