1818import io .fabric8 .kubernetes .api .model .GenericKubernetesResource ;
1919import io .fabric8 .kubernetes .client .Watcher .Action ;
2020import io .fabric8 .kubernetes .client .dsl .base .CustomResourceDefinitionContext ;
21+ import io .fabric8 .kubernetes .client .server .mock .crud .KubernetesCrudDispatcherException ;
2122import io .fabric8 .kubernetes .client .server .mock .crud .KubernetesCrudDispatcherHandler ;
2223import io .fabric8 .kubernetes .client .server .mock .crud .KubernetesCrudPersistence ;
2324import io .fabric8 .kubernetes .client .server .mock .crud .PatchHandler ;
4748import java .util .Set ;
4849import java .util .concurrent .CopyOnWriteArraySet ;
4950import java .util .concurrent .atomic .AtomicLong ;
50-
51- import static io . fabric8 . kubernetes . client . server . mock . crud . KubernetesCrudDispatcherHandler . process ;
51+ import java . util . concurrent . locks . ReadWriteLock ;
52+ import java . util . concurrent . locks . ReentrantReadWriteLock ;
5253
5354public class KubernetesCrudDispatcher extends CrudDispatcher implements KubernetesCrudPersistence , CustomResourceAware {
5455
@@ -61,6 +62,7 @@ public class KubernetesCrudDispatcher extends CrudDispatcher implements Kubernet
6162 private final KubernetesCrudDispatcherHandler postHandler ;
6263 private final KubernetesCrudDispatcherHandler putHandler ;
6364 private final KubernetesCrudDispatcherHandler patchHandler ;
65+ private final ReadWriteLock lock = new ReentrantReadWriteLock ();
6466
6567 public KubernetesCrudDispatcher () {
6668 this (Collections .emptyList ());
@@ -81,6 +83,17 @@ public KubernetesCrudDispatcher(List<CustomResourceDefinitionContext> crdContext
8183 crdContexts .stream ().forEach (this ::expectCustomResource );
8284 }
8385
86+ MockResponse process (RecordedRequest request , KubernetesCrudDispatcherHandler handler ) {
87+ lock .writeLock ().lock ();
88+ try {
89+ return handler .handle (request );
90+ } catch (KubernetesCrudDispatcherException e ) {
91+ return new MockResponse ().setResponseCode (e .getCode ()).setBody (e .toStatusBody ());
92+ } finally {
93+ lock .writeLock ().unlock ();
94+ }
95+ }
96+
8497 /**
8598 * Adds the specified object to the in-memory db.
8699 *
@@ -111,10 +124,15 @@ public MockResponse handleUpdate(RecordedRequest request) {
111124 */
112125 @ Override
113126 public MockResponse handleGet (String path ) {
114- if (detectWatchMode (path )) {
115- return handleWatch (path );
127+ lock .readLock ().lock ();
128+ try {
129+ if (detectWatchMode (path )) {
130+ return handleWatch (path );
131+ }
132+ return handle (path , null );
133+ } finally {
134+ lock .readLock ().unlock ();
116135 }
117- return handle (path , null );
118136 }
119137
120138 private interface EventProcessor {
@@ -126,17 +144,15 @@ private MockResponse handle(String path, EventProcessor eventProcessor) {
126144 List <String > items = new ArrayList <>();
127145 AttributeSet query = attributeExtractor .fromPath (path );
128146
129- synchronized (map ) {
130- new ArrayList <>(map .entrySet ()).stream ()
131- .filter (entry -> entry .getKey ().matches (query ))
132- .forEach (entry -> {
133- LOGGER .debug ("Entry found for query {} : {}" , query , entry );
134- items .add (entry .getValue ());
135- if (eventProcessor != null ) {
136- eventProcessor .processEvent (path , query , entry .getKey ());
137- }
138- });
139- }
147+ new ArrayList <>(map .entrySet ()).stream ()
148+ .filter (entry -> entry .getKey ().matches (query ))
149+ .forEach (entry -> {
150+ LOGGER .debug ("Entry found for query {} : {}" , query , entry );
151+ items .add (entry .getValue ());
152+ if (eventProcessor != null ) {
153+ eventProcessor .processEvent (path , query , entry .getKey ());
154+ }
155+ });
140156
141157 if (query .containsKey (KubernetesAttributesExtractor .NAME )) {
142158 if (!items .isEmpty ()) {
@@ -179,26 +195,30 @@ public MockResponse handlePatch(RecordedRequest request) {
179195 */
180196 @ Override
181197 public MockResponse handleDelete (String path ) {
182- return handle (path , (p , pathAttributes , oldAttributes ) -> {
183- String jsonStringOfResource = map .get (oldAttributes );
184- /*
185- * Potential performance improvement: The resource is unmarshalled and marshalled in other places (e.g., when creating a
186- * WatchEvent later).
187- * This could be avoided by storing the unmarshalled object (instead of a String) in the map.
188- */
189- final GenericKubernetesResource resource = Serialization .unmarshal (jsonStringOfResource , GenericKubernetesResource .class );
190- if (resource .getFinalizers ().isEmpty ()) {
191- // No finalizers left, actually remove the resource.
192- processEvent (path , pathAttributes , oldAttributes , null );
193- return ;
194- } else if (!resource .isMarkedForDeletion ()) {
195- // Mark the resource as deleted, but don't remove it yet (wait for finalizer-removal).
196- resource .getMetadata ().setDeletionTimestamp (LocalDateTime .now ().toString ());
197- String updatedResource = Serialization .asJson (resource );
198- processEvent (path , pathAttributes , oldAttributes , updatedResource );
199- }
200- // else: if the resource is already marked for deletion and still has finalizers, do nothing.
201- });
198+ lock .writeLock ().lock ();
199+ try {
200+ return handle (path , this ::processDelete );
201+ } finally {
202+ lock .writeLock ().unlock ();
203+ }
204+ }
205+
206+ private void processDelete (String path , AttributeSet pathAttributes , AttributeSet oldAttributes ) {
207+ String jsonStringOfResource = map .get (oldAttributes );
208+ final GenericKubernetesResource resource = Serialization .unmarshal (jsonStringOfResource , GenericKubernetesResource .class );
209+ if (resource .getFinalizers ().isEmpty ()) {
210+ // No finalizers left, actually remove the resource.
211+ processEvent (path , pathAttributes , oldAttributes , null , null );
212+ return ;
213+ }
214+ if (!resource .isMarkedForDeletion ()) {
215+ // Mark the resource as deleted, but don't remove it yet (wait for finalizer-removal).
216+ resource .getMetadata ().setDeletionTimestamp (LocalDateTime .now ().toString ());
217+ String updatedResource = Serialization .asJson (resource );
218+ processEvent (path , pathAttributes , oldAttributes , resource , updatedResource );
219+ return ;
220+ }
221+ // else: if the resource is already marked for deletion and still has finalizers, do nothing.
202222 }
203223
204224 @ Override
@@ -213,11 +233,9 @@ public AttributeSet getKey(String path) {
213233
214234 @ Override
215235 public Map .Entry <AttributeSet , String > findResource (AttributeSet attributes ) {
216- synchronized (map ) {
217- return map .entrySet ().stream ()
218- .filter (entry -> entry .getKey ().matches (attributes ))
219- .findFirst ().orElse (null );
220- }
236+ return map .entrySet ().stream ()
237+ .filter (entry -> entry .getKey ().matches (attributes ))
238+ .findFirst ().orElse (null );
221239 }
222240
223241 @ Override
@@ -226,11 +244,16 @@ public boolean isStatusSubresourceEnabledForResource(String path) {
226244 }
227245
228246 @ Override
229- public void processEvent (String path , AttributeSet pathAttributes , AttributeSet oldAttributes , String newState ) {
247+ public void processEvent (String path , AttributeSet pathAttributes , AttributeSet oldAttributes ,
248+ GenericKubernetesResource resource , String newState ) {
230249 String existing = map .remove (oldAttributes );
231250 AttributeSet newAttributes = null ;
232251 if (newState != null ) {
233- newAttributes = kubernetesAttributesExtractor .fromResource (newState );
252+ if (resource != null ) {
253+ newAttributes = kubernetesAttributesExtractor .extract (resource );
254+ } else {
255+ newAttributes = kubernetesAttributesExtractor .fromResource (newState );
256+ }
234257 // corner case - we need to get the plural from the path
235258 if (!newAttributes .containsKey (KubernetesAttributesExtractor .PLURAL )) {
236259 newAttributes = AttributeSet .merge (pathAttributes , newAttributes );
@@ -269,13 +292,9 @@ public MockResponse handleWatch(String path) {
269292 query = query .add (new Attribute ("name" , resourceName ));
270293 }
271294 WatchEventsListener watchEventListener = new WatchEventsListener (context , query , watchEventListeners , LOGGER ,
272- watch -> {
273- synchronized (map ) {
274- map .entrySet ().stream ()
275- .filter (entry -> watch .attributeMatches (entry .getKey ()))
276- .forEach (entry -> watch .sendWebSocketResponse (entry .getValue (), Action .ADDED ));
277- }
278- });
295+ watch -> map .entrySet ().stream ()
296+ .filter (entry -> watch .attributeMatches (entry .getKey ()))
297+ .forEach (entry -> watch .sendWebSocketResponse (entry .getValue (), Action .ADDED )));
279298 watchEventListeners .add (watchEventListener );
280299 mockResponse .setSocketPolicy (SocketPolicy .KEEP_OPEN );
281300 return mockResponse .withWebSocketUpgrade (watchEventListener );
0 commit comments