Skip to content

Commit d75a4f6

Browse files
committed
fix #4931: using coarse locking to prevent odd concurrent behavior
1 parent b6f2bc9 commit d75a4f6

File tree

3 files changed

+50
-44
lines changed

3 files changed

+50
-44
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
* Fix #4891: address vertx not completely reading exec streams
2020
* Fix #4899: BuildConfigs.instantiateBinary().fromFile() does not time out
2121
* Fix #4908: using the response headers in the vertx response
22+
* Fix #4931: using course grain locking for all mock server operations
2223

2324
#### Improvements
2425
* Fix #4675: adding a fully client side timeout for informer watches

junit/kubernetes-server-mock/src/main/java/io/fabric8/kubernetes/client/server/mock/KubernetesCrudDispatcher.java

Lines changed: 49 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import io.fabric8.kubernetes.api.model.GenericKubernetesResource;
1919
import io.fabric8.kubernetes.client.Watcher.Action;
2020
import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
21+
import io.fabric8.kubernetes.client.server.mock.crud.KubernetesCrudDispatcherException;
2122
import io.fabric8.kubernetes.client.server.mock.crud.KubernetesCrudDispatcherHandler;
2223
import io.fabric8.kubernetes.client.server.mock.crud.KubernetesCrudPersistence;
2324
import io.fabric8.kubernetes.client.server.mock.crud.PatchHandler;
@@ -48,8 +49,6 @@
4849
import java.util.concurrent.CopyOnWriteArraySet;
4950
import java.util.concurrent.atomic.AtomicLong;
5051

51-
import static io.fabric8.kubernetes.client.server.mock.crud.KubernetesCrudDispatcherHandler.process;
52-
5352
public class KubernetesCrudDispatcher extends CrudDispatcher implements KubernetesCrudPersistence, CustomResourceAware {
5453

5554
private static final Logger LOGGER = LoggerFactory.getLogger(KubernetesCrudDispatcher.class);
@@ -81,6 +80,16 @@ public KubernetesCrudDispatcher(List<CustomResourceDefinitionContext> crdContext
8180
crdContexts.stream().forEach(this::expectCustomResource);
8281
}
8382

83+
MockResponse process(RecordedRequest request, KubernetesCrudDispatcherHandler handler) {
84+
synchronized (map) {
85+
try {
86+
return handler.handle(request);
87+
} catch (KubernetesCrudDispatcherException e) {
88+
return new MockResponse().setResponseCode(e.getCode()).setBody(e.toStatusBody());
89+
}
90+
}
91+
}
92+
8493
/**
8594
* Adds the specified object to the in-memory db.
8695
*
@@ -111,10 +120,12 @@ public MockResponse handleUpdate(RecordedRequest request) {
111120
*/
112121
@Override
113122
public MockResponse handleGet(String path) {
114-
if (detectWatchMode(path)) {
115-
return handleWatch(path);
123+
synchronized (map) {
124+
if (detectWatchMode(path)) {
125+
return handleWatch(path);
126+
}
127+
return handle(path, null);
116128
}
117-
return handle(path, null);
118129
}
119130

120131
private interface EventProcessor {
@@ -126,17 +137,15 @@ private MockResponse handle(String path, EventProcessor eventProcessor) {
126137
List<String> items = new ArrayList<>();
127138
AttributeSet query = attributeExtractor.fromPath(path);
128139

129-
synchronized (map) {
130-
new ArrayList<>(map.entrySet()).stream()
131-
.filter(entry -> entry.getKey().matches(query))
132-
.forEach(entry -> {
133-
LOGGER.debug("Entry found for query {} : {}", query, entry);
134-
items.add(entry.getValue());
135-
if (eventProcessor != null) {
136-
eventProcessor.processEvent(path, query, entry.getKey());
137-
}
138-
});
139-
}
140+
new ArrayList<>(map.entrySet()).stream()
141+
.filter(entry -> entry.getKey().matches(query))
142+
.forEach(entry -> {
143+
LOGGER.debug("Entry found for query {} : {}", query, entry);
144+
items.add(entry.getValue());
145+
if (eventProcessor != null) {
146+
eventProcessor.processEvent(path, query, entry.getKey());
147+
}
148+
});
140149

141150
if (query.containsKey(KubernetesAttributesExtractor.NAME)) {
142151
if (!items.isEmpty()) {
@@ -179,26 +188,30 @@ public MockResponse handlePatch(RecordedRequest request) {
179188
*/
180189
@Override
181190
public MockResponse handleDelete(String path) {
182-
return handle(path, (p, pathAttributes, oldAttributes) -> {
183-
String jsonStringOfResource = map.get(oldAttributes);
184-
/*
185-
* Potential performance improvement: The resource is unmarshalled and marshalled in other places (e.g., when creating a
186-
* WatchEvent later).
187-
* This could be avoided by storing the unmarshalled object (instead of a String) in the map.
188-
*/
189-
final GenericKubernetesResource resource = Serialization.unmarshal(jsonStringOfResource, GenericKubernetesResource.class);
190-
if (resource.getFinalizers().isEmpty()) {
191-
// No finalizers left, actually remove the resource.
192-
processEvent(path, pathAttributes, oldAttributes, null);
193-
return;
194-
} else if (!resource.isMarkedForDeletion()) {
195-
// Mark the resource as deleted, but don't remove it yet (wait for finalizer-removal).
196-
resource.getMetadata().setDeletionTimestamp(LocalDateTime.now().toString());
197-
String updatedResource = Serialization.asJson(resource);
198-
processEvent(path, pathAttributes, oldAttributes, updatedResource);
199-
}
200-
// else: if the resource is already marked for deletion and still has finalizers, do nothing.
201-
});
191+
synchronized (map) {
192+
return handle(path, this::processDelete);
193+
}
194+
}
195+
196+
private void processDelete(String path, AttributeSet pathAttributes, AttributeSet oldAttributes) {
197+
String jsonStringOfResource = map.get(oldAttributes);
198+
/*
199+
* Potential performance improvement: The resource is unmarshalled and marshalled in other places (e.g., when creating a
200+
* WatchEvent later).
201+
* This could be avoided by storing the unmarshalled object (instead of a String) in the map.
202+
*/
203+
final GenericKubernetesResource resource = Serialization.unmarshal(jsonStringOfResource, GenericKubernetesResource.class);
204+
if (resource.getFinalizers().isEmpty()) {
205+
// No finalizers left, actually remove the resource.
206+
processEvent(path, pathAttributes, oldAttributes, null);
207+
return;
208+
} else if (!resource.isMarkedForDeletion()) {
209+
// Mark the resource as deleted, but don't remove it yet (wait for finalizer-removal).
210+
resource.getMetadata().setDeletionTimestamp(LocalDateTime.now().toString());
211+
String updatedResource = Serialization.asJson(resource);
212+
processEvent(path, pathAttributes, oldAttributes, updatedResource);
213+
}
214+
// else: if the resource is already marked for deletion and still has finalizers, do nothing.
202215
}
203216

204217
@Override

junit/kubernetes-server-mock/src/main/java/io/fabric8/kubernetes/client/server/mock/crud/KubernetesCrudDispatcherHandler.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -113,14 +113,6 @@ default GenericKubernetesResource validateRequestBody(String requestBody) throws
113113
return resource;
114114
}
115115

116-
static MockResponse process(RecordedRequest request, KubernetesCrudDispatcherHandler handler) {
117-
try {
118-
return handler.handle(request);
119-
} catch (KubernetesCrudDispatcherException e) {
120-
return new MockResponse().setResponseCode(e.getCode()).setBody(e.toStatusBody());
121-
}
122-
}
123-
124116
static boolean isStatusPath(String path) {
125117
return path.endsWith("/" + STATUS);
126118
}

0 commit comments

Comments
 (0)