Skip to content

Commit 386d783

Browse files
authored
chore: fix a ConcurrentModificationException during BlobReadSession#close() (googleapis#3035)
A ConcurrentModificationException could happen while cleaning up an individual read at the same time the session had multiple child streams.
1 parent 4417b8a commit 386d783

File tree

2 files changed

+242
-42
lines changed

2 files changed

+242
-42
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSessionImpl.java

Lines changed: 75 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,13 @@
3232
import java.util.ArrayList;
3333
import java.util.IdentityHashMap;
3434
import java.util.Iterator;
35+
import java.util.List;
3536
import java.util.Locale;
3637
import java.util.Map.Entry;
3738
import java.util.concurrent.ExecutionException;
3839
import java.util.concurrent.ScheduledExecutorService;
3940
import java.util.concurrent.locks.ReentrantLock;
40-
import org.checkerframework.checker.lock.qual.GuardedBy;
41+
import java.util.function.BiFunction;
4142

4243
final class ObjectReadSessionImpl implements ObjectReadSession {
4344

@@ -49,12 +50,8 @@ final class ObjectReadSessionImpl implements ObjectReadSession {
4950
private final Object resource;
5051
private final RetryContextProvider retryContextProvider;
5152

52-
@GuardedBy("this.lock")
53-
private final IdentityHashMap<ObjectReadSessionStream, ObjectReadSessionState> children;
53+
private final ConcurrentIdentityMap<ObjectReadSessionStream, ObjectReadSessionState> children;
5454

55-
private final ReentrantLock lock;
56-
57-
@GuardedBy("this.lock")
5855
private volatile boolean open;
5956

6057
ObjectReadSessionImpl(
@@ -69,8 +66,7 @@ final class ObjectReadSessionImpl implements ObjectReadSession {
6966
this.state = state;
7067
this.resource = state.getMetadata();
7168
this.retryContextProvider = retryContextProvider;
72-
this.children = new IdentityHashMap<>();
73-
this.lock = new ReentrantLock();
69+
this.children = new ConcurrentIdentityMap<>();
7470
this.open = true;
7571
}
7672

@@ -81,54 +77,42 @@ public Object getResource() {
8177

8278
@Override
8379
public <Projection> Projection readAs(ReadProjectionConfig<Projection> config) {
84-
lock.lock();
85-
try {
86-
checkState(open, "Session already closed");
87-
switch (config.getType()) {
88-
case STREAM_READ:
89-
long readId = state.newReadId();
90-
ObjectReadSessionStreamRead<Projection> read =
91-
config.cast().newRead(readId, retryContextProvider.create());
92-
registerReadInState(readId, read);
93-
return read.project();
94-
case SESSION_USER:
95-
return config.project(this, IOAutoCloseable.noOp());
96-
default:
97-
throw new IllegalStateException(
98-
String.format(
99-
Locale.US,
100-
"Broken java enum %s value=%s",
101-
ProjectionType.class.getName(),
102-
config.getType().name()));
103-
}
104-
} finally {
105-
lock.unlock();
80+
checkState(open, "Session already closed");
81+
switch (config.getType()) {
82+
case STREAM_READ:
83+
long readId = state.newReadId();
84+
ObjectReadSessionStreamRead<Projection> read =
85+
config.cast().newRead(readId, retryContextProvider.create());
86+
registerReadInState(readId, read);
87+
return read.project();
88+
case SESSION_USER:
89+
return config.project(this, IOAutoCloseable.noOp());
90+
default:
91+
throw new IllegalStateException(
92+
String.format(
93+
Locale.US,
94+
"Broken java enum %s value=%s",
95+
ProjectionType.class.getName(),
96+
config.getType().name()));
10697
}
10798
}
10899

109100
@Override
110101
public void close() throws IOException {
111-
open = false;
112-
lock.lock();
113102
try {
114-
Iterator<Entry<ObjectReadSessionStream, ObjectReadSessionState>> it =
115-
children.entrySet().iterator();
116-
ArrayList<ApiFuture<Void>> closing = new ArrayList<>(children.size());
117-
while (it.hasNext()) {
118-
Entry<ObjectReadSessionStream, ObjectReadSessionState> next = it.next();
119-
ObjectReadSessionStream subStream = next.getKey();
120-
it.remove();
121-
closing.add(subStream.closeAsync());
103+
if (!open) {
104+
return;
122105
}
106+
open = false;
107+
List<ApiFuture<Void>> closing =
108+
children.drainEntries((subStream, subStreamState) -> subStream.closeAsync());
123109
stream.close();
124110
ApiFutures.allAsList(closing).get();
125111
} catch (ExecutionException e) {
126112
throw new IOException(e.getCause());
127113
} catch (InterruptedException e) {
128114
Thread.currentThread().interrupt();
129115
throw new InterruptedIOException();
130-
} finally {
131-
lock.unlock();
132116
}
133117
}
134118

@@ -152,4 +136,53 @@ private void registerReadInState(long readId, ObjectReadSessionStreamRead<?> rea
152136
newStream.send(request);
153137
}
154138
}
139+
140+
@VisibleForTesting
141+
static final class ConcurrentIdentityMap<K, V> {
142+
private final ReentrantLock lock;
143+
private final IdentityHashMap<K, V> children;
144+
145+
@VisibleForTesting
146+
ConcurrentIdentityMap() {
147+
lock = new ReentrantLock();
148+
children = new IdentityHashMap<>();
149+
}
150+
151+
public void put(K key, V value) {
152+
lock.lock();
153+
try {
154+
children.put(key, value);
155+
} finally {
156+
lock.unlock();
157+
}
158+
}
159+
160+
public void remove(K key) {
161+
lock.lock();
162+
try {
163+
children.remove(key);
164+
} finally {
165+
lock.unlock();
166+
}
167+
}
168+
169+
public <R> ArrayList<R> drainEntries(BiFunction<K, V, R> f) {
170+
lock.lock();
171+
try {
172+
Iterator<Entry<K, V>> it = children.entrySet().iterator();
173+
ArrayList<R> results = new ArrayList<>(children.size());
174+
while (it.hasNext()) {
175+
Entry<K, V> entry = it.next();
176+
K key = entry.getKey();
177+
V value = entry.getValue();
178+
it.remove();
179+
R r = f.apply(key, value);
180+
results.add(r);
181+
}
182+
return results;
183+
} finally {
184+
lock.unlock();
185+
}
186+
}
187+
}
155188
}
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
/*
2+
* Copyright 2025 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.storage;
18+
19+
import static com.google.cloud.storage.TestUtils.assertAll;
20+
import static com.google.common.truth.Truth.assertThat;
21+
22+
import com.google.cloud.storage.ObjectReadSessionImpl.ConcurrentIdentityMap;
23+
import com.google.common.base.MoreObjects;
24+
import com.google.common.util.concurrent.ListenableFuture;
25+
import com.google.common.util.concurrent.ListeningExecutorService;
26+
import com.google.common.util.concurrent.MoreExecutors;
27+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
28+
import java.util.List;
29+
import java.util.concurrent.CountDownLatch;
30+
import java.util.concurrent.Executors;
31+
import java.util.concurrent.TimeUnit;
32+
import java.util.concurrent.atomic.AtomicInteger;
33+
import java.util.function.BiFunction;
34+
import org.junit.AfterClass;
35+
import org.junit.BeforeClass;
36+
import org.junit.Test;
37+
38+
public final class ObjectReadSessionTest {
39+
private static final AtomicInteger vCounter = new AtomicInteger(1);
40+
41+
private static ListeningExecutorService exec;
42+
43+
@BeforeClass
44+
public static void beforeClass() {
45+
exec =
46+
MoreExecutors.listeningDecorator(
47+
Executors.newFixedThreadPool(2, new ThreadFactoryBuilder().setDaemon(true).build()));
48+
}
49+
50+
@AfterClass
51+
public static void afterClass() {
52+
exec.shutdownNow();
53+
}
54+
55+
@Test
56+
public void concurrentIdentityMap_basic() throws Exception {
57+
ConcurrentIdentityMap<Key, Value> map = new ConcurrentIdentityMap<>();
58+
59+
map.put(new Key("k1"), new Value());
60+
map.put(new Key("k2"), new Value());
61+
map.put(new Key("k3"), new Value());
62+
map.put(new Key("k4"), new Value());
63+
64+
List<String> strings = map.drainEntries((k, v) -> String.format("%s -> %s", k, v));
65+
assertThat(strings).hasSize(4);
66+
67+
String joined = String.join("\n", strings);
68+
assertAll(
69+
() -> assertThat(joined).contains("k1"),
70+
() -> assertThat(joined).contains("k2"),
71+
() -> assertThat(joined).contains("k3"),
72+
() -> assertThat(joined).contains("k4"));
73+
}
74+
75+
@Test
76+
public void concurrentIdentityMap_multipleThreadsAdding() throws Exception {
77+
ConcurrentIdentityMap<Key, Value> map = new ConcurrentIdentityMap<>();
78+
79+
CountDownLatch cdl = new CountDownLatch(1);
80+
map.put(new Key("t1k1"), new Value());
81+
map.put(new Key("t1k2"), new Value());
82+
83+
ListenableFuture<Boolean> submitted =
84+
exec.submit(
85+
() -> {
86+
try {
87+
boolean await = cdl.await(3, TimeUnit.SECONDS);
88+
assertThat(await).isTrue();
89+
map.put(new Key("t2k1"), new Value());
90+
return true;
91+
} catch (InterruptedException e) {
92+
throw new RuntimeException(e);
93+
}
94+
});
95+
96+
BiFunction<Key, Value, String> f =
97+
(k, v) -> {
98+
cdl.countDown();
99+
return String.format("%s -> %s", k, v);
100+
};
101+
List<String> strings = map.drainEntries(f);
102+
assertThat(strings).hasSize(2);
103+
String joined = String.join("\n", strings);
104+
assertAll(() -> assertThat(joined).contains("t1k1"), () -> assertThat(joined).contains("t1k2"));
105+
106+
submitted.get(1, TimeUnit.SECONDS);
107+
List<String> drain2 = map.drainEntries(f);
108+
assertThat(drain2).hasSize(1);
109+
}
110+
111+
@Test
112+
public void concurrentIdentityMap_removeAfterDrainClean() throws Exception {
113+
ConcurrentIdentityMap<Key, Value> map = new ConcurrentIdentityMap<>();
114+
115+
CountDownLatch cdl = new CountDownLatch(1);
116+
map.put(new Key("t1k1"), new Value());
117+
Key t1k2 = new Key("t1k2");
118+
map.put(t1k2, new Value());
119+
120+
ListenableFuture<Boolean> submit =
121+
exec.submit(
122+
() -> {
123+
try {
124+
boolean await = cdl.await(3, TimeUnit.SECONDS);
125+
assertThat(await).isTrue();
126+
map.remove(t1k2);
127+
return true;
128+
} catch (InterruptedException e) {
129+
throw new RuntimeException(e);
130+
}
131+
});
132+
133+
BiFunction<Key, Value, String> f =
134+
(k, v) -> {
135+
cdl.countDown();
136+
return String.format("%s -> %s", k, v);
137+
};
138+
List<String> strings = map.drainEntries(f);
139+
assertThat(strings).hasSize(2);
140+
String joined = String.join("\n", strings);
141+
assertAll(() -> assertThat(joined).contains("t1k1"), () -> assertThat(joined).contains("t1k2"));
142+
143+
assertThat(submit.get(1, TimeUnit.SECONDS)).isEqualTo(true);
144+
}
145+
146+
private static final class Key {
147+
private final String k;
148+
149+
private Key(String k) {
150+
this.k = k;
151+
}
152+
153+
@Override
154+
public String toString() {
155+
return MoreObjects.toStringHelper(this).add("k", k).toString();
156+
}
157+
}
158+
159+
private static final class Value {
160+
private final String v = String.format("v/%d", vCounter.getAndIncrement());
161+
162+
@Override
163+
public String toString() {
164+
return MoreObjects.toStringHelper(this).add("v", v).toString();
165+
}
166+
}
167+
}

0 commit comments

Comments
 (0)