Skip to content

Commit bf40ad6

Browse files
committed
improv(parameters): Make parameters top-level provider thread-safe for cache and transformer management.
1 parent 7c6d6ea commit bf40ad6

File tree

6 files changed

+274
-34
lines changed

6 files changed

+274
-34
lines changed

powertools-parameters/powertools-parameters-tests/src/test/java/software/amazon/lambda/powertools/parameters/cache/CacheManagerTest.java

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import java.time.Clock;
2323
import java.util.Optional;
24+
2425
import org.junit.jupiter.api.BeforeEach;
2526
import org.junit.jupiter.api.Test;
2627

@@ -69,7 +70,6 @@ public void getIfNotExpired_withCustomDefaultExpirationTime_notExpired_shouldRet
6970
manager.setDefaultExpirationTime(of(42, SECONDS));
7071
manager.putInCache("key", "value");
7172

72-
7373
Optional<String> value = manager.getIfNotExpired("key", offset(clock, of(40, SECONDS)).instant());
7474

7575
assertThat(value).isPresent().contains("value");
@@ -101,4 +101,96 @@ public void getIfNotExpired_resetExpirationTime_shouldUseDefaultExpirationTime()
101101
assertThat(value2).isPresent().contains("value2");
102102
}
103103

104+
@Test
105+
public void putInCache_sharedCache_shouldBeAccessibleAcrossThreads() throws InterruptedException {
106+
// GIVEN
107+
Thread thread1 = new Thread(() -> {
108+
manager.setExpirationTime(of(60, SECONDS));
109+
manager.putInCache("sharedKey", "valueFromThread1");
110+
manager.resetExpirationTime();
111+
});
112+
113+
Thread thread2 = new Thread(() -> {
114+
manager.setExpirationTime(of(10, SECONDS));
115+
// Thread 2 should be able to read the value cached by Thread 1
116+
Optional<String> value = manager.getIfNotExpired("sharedKey", clock.instant());
117+
assertThat(value).isPresent().contains("valueFromThread1");
118+
manager.resetExpirationTime();
119+
});
120+
121+
// WHEN
122+
thread1.start();
123+
thread1.join();
124+
thread2.start();
125+
thread2.join();
126+
127+
// THEN - Both threads should be able to access the same cached value
128+
Optional<String> value = manager.getIfNotExpired("sharedKey", clock.instant());
129+
assertThat(value).isPresent().contains("valueFromThread1");
130+
}
131+
132+
@Test
133+
public void putInCache_concurrentCalls_shouldBeThreadSafe() throws InterruptedException {
134+
// GIVEN
135+
int threadCount = 10;
136+
Thread[] threads = new Thread[threadCount];
137+
boolean[] success = new boolean[threadCount];
138+
Clock testClock = Clock.systemDefaultZone();
139+
140+
// WHEN - Multiple threads set different expiration times and cache values concurrently
141+
for (int i = 0; i < threadCount; i++) {
142+
final int threadIndex = i;
143+
final int expirationSeconds = (i % 2 == 0) ? 60 : 10; // Alternate between 60s and 10s
144+
145+
threads[i] = new Thread(() -> {
146+
try {
147+
manager.setExpirationTime(of(expirationSeconds, SECONDS));
148+
manager.putInCache("key" + threadIndex, "value" + threadIndex);
149+
manager.resetExpirationTime();
150+
success[threadIndex] = true;
151+
} catch (Exception e) {
152+
success[threadIndex] = false;
153+
}
154+
});
155+
}
156+
157+
// Start all threads
158+
for (Thread thread : threads) {
159+
thread.start();
160+
}
161+
162+
// Wait for all threads to complete
163+
for (Thread thread : threads) {
164+
thread.join();
165+
}
166+
167+
// THEN - All threads should complete successfully
168+
for (boolean result : success) {
169+
assertThat(result).isTrue();
170+
}
171+
172+
// THEN - Each cached value should have the correct expiration time
173+
// Values with 60s TTL should still be present after 9s, values with 10s should expire after 11s
174+
for (int i = 0; i < threadCount; i++) {
175+
final int expirationSeconds = (i % 2 == 0) ? 60 : 10;
176+
177+
// Check that value is still present just before expiration
178+
Optional<String> valueBeforeExpiry = manager.getIfNotExpired("key" + i,
179+
offset(testClock, of(expirationSeconds - 1, SECONDS)).instant());
180+
assertThat(valueBeforeExpiry)
181+
.as("Thread %d with %ds expiration should still have value after %ds", i, expirationSeconds,
182+
expirationSeconds - 1)
183+
.isPresent()
184+
.contains("value" + i);
185+
186+
// Check that value expires after the TTL
187+
Optional<String> valueAfterExpiry = manager.getIfNotExpired("key" + i,
188+
offset(testClock, of(expirationSeconds + 1, SECONDS)).instant());
189+
assertThat(valueAfterExpiry)
190+
.as("Thread %d with %ds expiration should not have value after %ds", i, expirationSeconds,
191+
expirationSeconds + 1)
192+
.isNotPresent();
193+
}
194+
}
195+
104196
}

powertools-parameters/powertools-parameters-tests/src/test/java/software/amazon/lambda/powertools/parameters/transform/TransformationManagerTest.java

Lines changed: 108 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121
import static software.amazon.lambda.powertools.parameters.transform.Transformer.json;
2222

2323
import java.util.Base64;
24+
2425
import org.junit.jupiter.api.BeforeEach;
2526
import org.junit.jupiter.api.Test;
27+
2628
import software.amazon.lambda.powertools.parameters.exception.TransformationException;
2729

2830
public class TransformationManagerTest {
@@ -90,9 +92,9 @@ public void performComplexTransformation_noTransformer_shouldThrowException() {
9092
public void performComplexTransformation_shouldPerformTransformation() {
9193
manager.setTransformer(json);
9294

93-
ObjectToDeserialize object =
94-
manager.performComplexTransformation("{\"foo\":\"Foo\", \"bar\":42, \"baz\":123456789}",
95-
ObjectToDeserialize.class);
95+
ObjectToDeserialize object = manager.performComplexTransformation(
96+
"{\"foo\":\"Foo\", \"bar\":42, \"baz\":123456789}",
97+
ObjectToDeserialize.class);
9698

9799
assertThat(object).isNotNull();
98100
}
@@ -104,4 +106,107 @@ public void performComplexTransformation_throwsTransformationException() {
104106
assertThatExceptionOfType(TransformationException.class)
105107
.isThrownBy(() -> manager.performComplexTransformation("value", ObjectToDeserialize.class));
106108
}
109+
110+
@Test
111+
public void unsetTransformer_shouldCleanUpThreadLocal() {
112+
// GIVEN
113+
manager.setTransformer(json);
114+
assertThat(manager.shouldTransform()).isTrue();
115+
116+
// WHEN
117+
manager.unsetTransformer();
118+
119+
// THEN
120+
assertThat(manager.shouldTransform()).isFalse();
121+
}
122+
123+
@Test
124+
public void setTransformer_concurrentCalls_shouldBeThreadSafe() throws InterruptedException {
125+
// GIVEN
126+
boolean[] success = new boolean[2];
127+
128+
Thread thread1 = new Thread(() -> {
129+
try {
130+
manager.setTransformer(json);
131+
Thread.sleep(10); // Small delay to increase chance of thread interleaving
132+
// Thread 1 expects json transformer
133+
String result = manager.performComplexTransformation(
134+
"{\"foo\":\"Foo\", \"bar\":42, \"baz\":123456789}",
135+
ObjectToDeserialize.class).getFoo();
136+
success[0] = result.equals("Foo");
137+
} catch (Exception e) {
138+
e.printStackTrace();
139+
success[0] = false;
140+
}
141+
});
142+
143+
Thread thread2 = new Thread(() -> {
144+
try {
145+
Thread.sleep(5); // Start slightly after thread1
146+
manager.setTransformer(base64);
147+
// Thread 2 expects base64 transformer
148+
String result = manager.performBasicTransformation(
149+
Base64.getEncoder().encodeToString("bar".getBytes()));
150+
success[1] = result.equals("bar");
151+
} catch (Exception e) {
152+
e.printStackTrace();
153+
success[1] = false;
154+
}
155+
});
156+
157+
// WHEN - Start both threads concurrently
158+
thread1.start();
159+
thread2.start();
160+
161+
// THEN - Both threads should complete without errors
162+
thread1.join();
163+
thread2.join();
164+
165+
assertThat(success[0]).as("Thread 1 with JSON transformer should succeed").isTrue();
166+
assertThat(success[1]).as("Thread 2 with Base64 transformer should succeed").isTrue();
167+
}
168+
169+
@Test
170+
public void unsetTransformer_concurrentCalls_shouldNotAffectOtherThreads() throws InterruptedException {
171+
// GIVEN
172+
boolean[] success = new boolean[2];
173+
174+
Thread thread1 = new Thread(() -> {
175+
try {
176+
manager.setTransformer(json);
177+
Thread.sleep(10);
178+
// Thread 1 should still have json transformer even if thread 2 unsets
179+
assertThat(manager.shouldTransform()).isTrue();
180+
success[0] = true;
181+
} catch (Exception e) {
182+
e.printStackTrace();
183+
success[0] = false;
184+
}
185+
});
186+
187+
Thread thread2 = new Thread(() -> {
188+
try {
189+
manager.setTransformer(base64);
190+
Thread.sleep(5);
191+
manager.unsetTransformer();
192+
// Thread 2 should have no transformer after unset
193+
assertThat(manager.shouldTransform()).isFalse();
194+
success[1] = true;
195+
} catch (Exception e) {
196+
e.printStackTrace();
197+
success[1] = false;
198+
}
199+
});
200+
201+
// WHEN
202+
thread1.start();
203+
thread2.start();
204+
205+
// THEN
206+
thread1.join();
207+
thread2.join();
208+
209+
assertThat(success[0]).as("Thread 1 should still have transformer").isTrue();
210+
assertThat(success[1]).as("Thread 2 should have unset transformer").isTrue();
211+
}
107212
}

powertools-parameters/src/main/java/software/amazon/lambda/powertools/parameters/BaseProvider.java

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919
import java.time.Instant;
2020
import java.time.temporal.ChronoUnit;
2121
import java.util.Map;
22-
import software.amazon.awssdk.annotations.NotThreadSafe;
22+
23+
import software.amazon.awssdk.annotations.ThreadSafe;
2324
import software.amazon.lambda.powertools.parameters.cache.CacheManager;
2425
import software.amazon.lambda.powertools.parameters.exception.TransformationException;
2526
import software.amazon.lambda.powertools.parameters.transform.BasicTransformer;
@@ -28,8 +29,20 @@
2829

2930
/**
3031
* Base class for all parameter providers.
32+
* <p>
33+
* This class is thread-safe when used as a singleton in multi-threaded environments.
34+
* Configuration methods ({@link #withMaxAge(int, ChronoUnit)}, {@link #withTransformation(Class)})
35+
* use thread-local storage to support concurrent requests with different requirements.
36+
* <p>
37+
* The cache and transformation managers are thread-safe with zero synchronization overhead,
38+
* using lock-free data structures (ThreadLocal, AtomicReference, ConcurrentHashMap) for optimal performance.
39+
* The cache storage is shared across all threads, allowing cached values to be reused across requests.
40+
* <p>
41+
* <b>Implementation Requirements:</b> Subclasses must ensure that implementations of
42+
* {@link #getValue(String)} and {@link #getMultipleValues(String)} are thread-safe to
43+
* guarantee overall thread-safety of the provider.
3144
*/
32-
@NotThreadSafe
45+
@ThreadSafe
3346
public abstract class BaseProvider implements ParamProvider {
3447
public static final String PARAMETERS = "parameters";
3548

@@ -91,6 +104,7 @@ public BaseProvider withMaxAge(int maxAge, ChronoUnit unit) {
91104
* @param transformerClass Class of the transformer to apply. For convenience, you can use {@link Transformer#json} or {@link Transformer#base64} shortcuts.
92105
* @return the provider itself in order to chain calls (eg. <pre>provider.withTransformation(json).get("key", MyObject.class)</pre>).
93106
*/
107+
@SuppressWarnings("rawtypes") // Transformer type parameter determined at runtime
94108
public BaseProvider withTransformation(Class<? extends Transformer> transformerClass) {
95109
if (transformationManager == null) {
96110
throw new IllegalStateException(
@@ -110,12 +124,12 @@ public BaseProvider withTransformation(Class<? extends Transformer> transformerC
110124
* eg. getMultiple("/foo/bar") will retrieve [key="baz", value="valuebaz"] for parameter "/foo/bar/baz"
111125
*/
112126
@Override
127+
@SuppressWarnings("unchecked") // Cache stores Object, safe cast as we control what's stored
113128
public Map<String, String> getMultiple(String path) {
114129
// remove trailing whitespace
115130
String pathWithoutTrailingSlash = path.replaceAll("\\/+$", "");
116131
try {
117-
return (Map<String, String>) cacheManager.getIfNotExpired(pathWithoutTrailingSlash, now()).orElseGet(() ->
118-
{
132+
return (Map<String, String>) cacheManager.getIfNotExpired(pathWithoutTrailingSlash, now()).orElseGet(() -> {
119133
Map<String, String> params = getMultipleValues(pathWithoutTrailingSlash);
120134

121135
cacheManager.putInCache(pathWithoutTrailingSlash, params);
@@ -143,8 +157,7 @@ public Map<String, String> getMultiple(String path) {
143157
@Override
144158
public String get(final String key) {
145159
try {
146-
return (String) cacheManager.getIfNotExpired(key, now()).orElseGet(() ->
147-
{
160+
return (String) cacheManager.getIfNotExpired(key, now()).orElseGet(() -> {
148161
String value = getValue(key);
149162

150163
String transformedValue = value;
@@ -175,10 +188,10 @@ public String get(final String key) {
175188
* @throws TransformationException if the transformation could not be done, because of a wrong format or an error during transformation.
176189
*/
177190
@Override
191+
@SuppressWarnings("unchecked") // Cache stores Object, safe cast as we control what's stored
178192
public <T> T get(final String key, final Class<T> targetClass) {
179193
try {
180-
return (T) cacheManager.getIfNotExpired(key, now()).orElseGet(() ->
181-
{
194+
return (T) cacheManager.getIfNotExpired(key, now()).orElseGet(() -> {
182195
String value = getValue(key);
183196

184197
if (transformationManager == null) {
@@ -207,7 +220,7 @@ protected Instant now() {
207220
protected void resetToDefaults() {
208221
cacheManager.resetExpirationTime();
209222
if (transformationManager != null) {
210-
transformationManager.setTransformer(null);
223+
transformationManager.unsetTransformer();
211224
}
212225
}
213226

powertools-parameters/src/main/java/software/amazon/lambda/powertools/parameters/cache/CacheManager.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,27 @@
2020
import java.time.Duration;
2121
import java.time.Instant;
2222
import java.util.Optional;
23+
import java.util.concurrent.atomic.AtomicReference;
2324

25+
/**
26+
* Manages caching of parameter values with configurable expiration times.
27+
* <p>
28+
* This class is thread-safe. The cache storage is shared across all threads,
29+
* while expiration time configuration is thread-local to support concurrent
30+
* requests with different cache TTL requirements.
31+
*/
2432
public class CacheManager {
2533
static final Duration DEFAULT_MAX_AGE_SECS = Duration.of(5, SECONDS);
2634

2735
private final DataStore store;
28-
private Duration defaultMaxAge = DEFAULT_MAX_AGE_SECS;
29-
private Duration maxAge = defaultMaxAge;
36+
private final AtomicReference<Duration> defaultMaxAge = new AtomicReference<>(DEFAULT_MAX_AGE_SECS);
37+
private final ThreadLocal<Duration> maxAge = ThreadLocal.withInitial(() -> null);
3038

3139
public CacheManager() {
3240
store = new DataStore();
3341
}
3442

43+
@SuppressWarnings("unchecked") // DataStore stores Object, safe cast as we control what's stored
3544
public <T> Optional<T> getIfNotExpired(String key, Instant now) {
3645
if (store.hasExpired(key, now)) {
3746
return Optional.empty();
@@ -40,19 +49,19 @@ public <T> Optional<T> getIfNotExpired(String key, Instant now) {
4049
}
4150

4251
public void setExpirationTime(Duration duration) {
43-
this.maxAge = duration;
52+
this.maxAge.set(duration);
4453
}
4554

4655
public void setDefaultExpirationTime(Duration duration) {
47-
this.defaultMaxAge = duration;
48-
this.maxAge = duration;
56+
this.defaultMaxAge.set(duration);
4957
}
5058

5159
public <T> void putInCache(String key, T value) {
52-
store.put(key, value, Clock.systemDefaultZone().instant().plus(maxAge));
60+
Duration effectiveMaxAge = maxAge.get() != null ? maxAge.get() : defaultMaxAge.get();
61+
store.put(key, value, Clock.systemDefaultZone().instant().plus(effectiveMaxAge));
5362
}
5463

5564
public void resetExpirationTime() {
56-
maxAge = defaultMaxAge;
65+
maxAge.remove();
5766
}
5867
}

powertools-parameters/src/main/java/software/amazon/lambda/powertools/parameters/cache/DataStore.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,11 @@ public Object get(String key) {
4242
}
4343

4444
public boolean hasExpired(String key, Instant now) {
45-
boolean hasExpired = !store.containsKey(key) || now.isAfter(store.get(key).time);
45+
ValueNode node = store.get(key);
46+
if (node == null) {
47+
return true;
48+
}
49+
boolean hasExpired = now.isAfter(node.time);
4650
// Auto-clean if the parameter has expired
4751
if (hasExpired) {
4852
remove(key);

0 commit comments

Comments
 (0)