4848import java .util .Set ;
4949import java .util .concurrent .CopyOnWriteArraySet ;
5050import java .util .concurrent .atomic .AtomicLong ;
51+ import java .util .concurrent .locks .ReadWriteLock ;
52+ import java .util .concurrent .locks .ReentrantReadWriteLock ;
5153
5254public class KubernetesCrudDispatcher extends CrudDispatcher implements KubernetesCrudPersistence , CustomResourceAware {
5355
@@ -60,6 +62,7 @@ public class KubernetesCrudDispatcher extends CrudDispatcher implements Kubernet
6062 private final KubernetesCrudDispatcherHandler postHandler ;
6163 private final KubernetesCrudDispatcherHandler putHandler ;
6264 private final KubernetesCrudDispatcherHandler patchHandler ;
65+ private final ReadWriteLock lock = new ReentrantReadWriteLock ();
6366
6467 public KubernetesCrudDispatcher () {
6568 this (Collections .emptyList ());
@@ -81,12 +84,13 @@ public KubernetesCrudDispatcher(List<CustomResourceDefinitionContext> crdContext
8184 }
8285
8386 MockResponse process (RecordedRequest request , KubernetesCrudDispatcherHandler handler ) {
84- synchronized (map ) {
85- try {
86- return handler .handle (request );
87- } catch (KubernetesCrudDispatcherException e ) {
88- return new MockResponse ().setResponseCode (e .getCode ()).setBody (e .toStatusBody ());
89- }
87+ lock .writeLock ().lock ();
88+ try {
89+ return handler .handle (request );
90+ } catch (KubernetesCrudDispatcherException e ) {
91+ return new MockResponse ().setResponseCode (e .getCode ()).setBody (e .toStatusBody ());
92+ } finally {
93+ lock .writeLock ().unlock ();
9094 }
9195 }
9296
@@ -120,11 +124,14 @@ public MockResponse handleUpdate(RecordedRequest request) {
120124 */
121125 @ Override
122126 public MockResponse handleGet (String path ) {
123- synchronized (map ) {
127+ lock .readLock ().lock ();
128+ try {
124129 if (detectWatchMode (path )) {
125130 return handleWatch (path );
126131 }
127132 return handle (path , null );
133+ } finally {
134+ lock .readLock ().unlock ();
128135 }
129136 }
130137
@@ -188,28 +195,26 @@ public MockResponse handlePatch(RecordedRequest request) {
188195 */
189196 @ Override
190197 public MockResponse handleDelete (String path ) {
191- synchronized (map ) {
198+ lock .writeLock ().lock ();
199+ try {
192200 return handle (path , this ::processDelete );
201+ } finally {
202+ lock .writeLock ().unlock ();
193203 }
194204 }
195205
196206 private void processDelete (String path , AttributeSet pathAttributes , AttributeSet oldAttributes ) {
197207 String jsonStringOfResource = map .get (oldAttributes );
198- /*
199- * Potential performance improvement: The resource is unmarshalled and marshalled in other places (e.g., when creating a
200- * WatchEvent later).
201- * This could be avoided by storing the unmarshalled object (instead of a String) in the map.
202- */
203208 final GenericKubernetesResource resource = Serialization .unmarshal (jsonStringOfResource , GenericKubernetesResource .class );
204209 if (resource .getFinalizers ().isEmpty ()) {
205210 // No finalizers left, actually remove the resource.
206- processEvent (path , pathAttributes , oldAttributes , null );
211+ processEvent (path , pathAttributes , oldAttributes , null , null );
207212 return ;
208213 } else if (!resource .isMarkedForDeletion ()) {
209214 // Mark the resource as deleted, but don't remove it yet (wait for finalizer-removal).
210215 resource .getMetadata ().setDeletionTimestamp (LocalDateTime .now ().toString ());
211216 String updatedResource = Serialization .asJson (resource );
212- processEvent (path , pathAttributes , oldAttributes , updatedResource );
217+ processEvent (path , pathAttributes , oldAttributes , resource , updatedResource );
213218 }
214219 // else: if the resource is already marked for deletion and still has finalizers, do nothing.
215220 }
@@ -226,11 +231,9 @@ public AttributeSet getKey(String path) {
226231
227232 @ Override
228233 public Map .Entry <AttributeSet , String > findResource (AttributeSet attributes ) {
229- synchronized (map ) {
230- return map .entrySet ().stream ()
231- .filter (entry -> entry .getKey ().matches (attributes ))
232- .findFirst ().orElse (null );
233- }
234+ return map .entrySet ().stream ()
235+ .filter (entry -> entry .getKey ().matches (attributes ))
236+ .findFirst ().orElse (null );
234237 }
235238
236239 @ Override
@@ -239,11 +242,16 @@ public boolean isStatusSubresourceEnabledForResource(String path) {
239242 }
240243
241244 @ Override
242- public void processEvent (String path , AttributeSet pathAttributes , AttributeSet oldAttributes , String newState ) {
245+ public void processEvent (String path , AttributeSet pathAttributes , AttributeSet oldAttributes ,
246+ GenericKubernetesResource resource , String newState ) {
243247 String existing = map .remove (oldAttributes );
244248 AttributeSet newAttributes = null ;
245249 if (newState != null ) {
246- newAttributes = kubernetesAttributesExtractor .fromResource (newState );
250+ if (resource != null ) {
251+ newAttributes = kubernetesAttributesExtractor .extract (resource );
252+ } else {
253+ newAttributes = kubernetesAttributesExtractor .fromResource (newState );
254+ }
247255 // corner case - we need to get the plural from the path
248256 if (!newAttributes .containsKey (KubernetesAttributesExtractor .PLURAL )) {
249257 newAttributes = AttributeSet .merge (pathAttributes , newAttributes );
@@ -283,11 +291,9 @@ public MockResponse handleWatch(String path) {
283291 }
284292 WatchEventsListener watchEventListener = new WatchEventsListener (context , query , watchEventListeners , LOGGER ,
285293 watch -> {
286- synchronized (map ) {
287- map .entrySet ().stream ()
288- .filter (entry -> watch .attributeMatches (entry .getKey ()))
289- .forEach (entry -> watch .sendWebSocketResponse (entry .getValue (), Action .ADDED ));
290- }
294+ map .entrySet ().stream ()
295+ .filter (entry -> watch .attributeMatches (entry .getKey ()))
296+ .forEach (entry -> watch .sendWebSocketResponse (entry .getValue (), Action .ADDED ));
291297 });
292298 watchEventListeners .add (watchEventListener );
293299 mockResponse .setSocketPolicy (SocketPolicy .KEEP_OPEN );
0 commit comments