diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java index a122d692a4..0214bf8aa8 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java @@ -20,6 +20,7 @@ import org.apache.curator.framework.listen.Listenable; import org.apache.curator.x.discovery.details.InstanceProvider; +import org.apache.curator.x.discovery.details.ServiceCacheEventListener; import org.apache.curator.x.discovery.details.ServiceCacheListener; import java.io.Closeable; import java.util.List; @@ -33,12 +34,19 @@ public interface ServiceCache extends Closeable, Listenable> getInstances(); + List> getInstances(); /** * The cache must be started before use * * @throws Exception errors */ - public void start() throws Exception; + void start() throws Exception; + + /** + * Returns the listenable container over the newer {@link org.apache.curator.x.discovery.details.ServiceCacheEventListener} + * + * @return listenable + */ + Listenable> getCacheEventListenable(); } diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheEventListener.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheEventListener.java new file mode 100644 index 0000000000..7a3b570b61 --- /dev/null +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheEventListener.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.discovery.details; + +import org.apache.curator.x.discovery.ServiceInstance; + +/** + * Listener for events (addition/update/deletion) that happen to a service cache + */ +public interface ServiceCacheEventListener +{ + /** + * Called when a new instance is added. + * + * @param added instance added + */ + void cacheAdded(ServiceInstance added); + + /** + * Called when an instance is deleted. + * + * @param deleted instance deleted + */ + void cacheDeleted(ServiceInstance deleted); + + /** + * Called when an instance is updated. + * + * @param old old instance + * @param updated updated instance + */ + void cacheUpdated(ServiceInstance old, ServiceInstance updated); +} diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java index d1a31ad1cc..715182ba93 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java @@ -23,6 +23,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.apache.curator.framework.listen.Listenable; import org.apache.curator.utils.CloseableExecutorService; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.framework.CuratorFramework; @@ -45,9 +46,10 @@ public class ServiceCacheImpl implements ServiceCache, PathChildrenCacheListener { - private final ListenerContainer listenerContainer = new ListenerContainer(); + private final ListenerContainer> eventListenerContainer = new ListenerContainer<>(); + private final ListenerContainer listenerContainer = new ListenerContainer<>(); private final ServiceDiscoveryImpl discovery; - private final AtomicReference state = new AtomicReference(State.LATENT); + private final AtomicReference state = new AtomicReference<>(State.LATENT); private final PathChildrenCache cache; private final ConcurrentMap> instances = Maps.newConcurrentMap(); @@ -112,7 +114,7 @@ public void start() throws Exception { if ( childData.getData() != null ) // else already processed by the cache listener { - addInstance(childData, true); + addInstanceOnlyIfAbsent(childData); } } discovery.cacheOpened(this); @@ -124,24 +126,31 @@ public void close() throws IOException Preconditions.checkState(state.compareAndSet(State.STARTED, State.STOPPED), "Already closed or has not been started"); listenerContainer.forEach - ( - new Function() + ( + new Function() + { + @Override + public Void apply(ServiceCacheListener listener) { - @Override - public Void apply(ServiceCacheListener listener) - { - discovery.getClient().getConnectionStateListenable().removeListener(listener); - return null; - } + discovery.getClient().getConnectionStateListenable().removeListener(listener); + return null; } - ); + } + ); listenerContainer.clear(); + eventListenerContainer.clear(); CloseableUtils.closeQuietly(cache); discovery.cacheClosed(this); } + @Override + public Listenable> getCacheEventListenable() + { + return eventListenerContainer; + } + @Override public void addListener(ServiceCacheListener listener) { @@ -166,21 +175,22 @@ public void removeListener(ServiceCacheListener listener) @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { - boolean notifyListeners = false; - switch ( event.getType() ) + boolean notifyListeners = false; + switch ( event.getType() ) { case CHILD_ADDED: case CHILD_UPDATED: { - addInstance(event.getData(), false); notifyListeners = true; + applyTuple(addOrUpdateInstance(event.getData())); break; } case CHILD_REMOVED: { - instances.remove(instanceIdFromData(event.getData())); notifyListeners = true; + final ServiceInstance serviceInstance = instances.remove(instanceIdFromData(event.getData())); + applyTuple(new Tuple(serviceInstance, null)); break; } } @@ -202,23 +212,66 @@ public Void apply(ServiceCacheListener listener) } } + private void applyTuple(final Tuple tuple) + { + eventListenerContainer.forEach + ( + new Function, Void>() + { + @Override + public Void apply(ServiceCacheEventListener listener) + { + if ( tuple.oldInstance != null ) + { + if ( tuple.newInstance != null ) + { + listener.cacheUpdated(tuple.oldInstance, tuple.newInstance); + } + else + { + listener.cacheDeleted(tuple.oldInstance); + } + } + else if ( tuple.newInstance != null ) + { + listener.cacheAdded(tuple.newInstance); + } + return null; + } + } + ); + } + private String instanceIdFromData(ChildData childData) { return ZKPaths.getNodeFromPath(childData.getPath()); } - private void addInstance(ChildData childData, boolean onlyIfAbsent) throws Exception + private void addInstanceOnlyIfAbsent(ChildData childData) throws Exception { String instanceId = instanceIdFromData(childData); ServiceInstance serviceInstance = discovery.getSerializer().deserialize(childData.getData()); - if ( onlyIfAbsent ) - { - instances.putIfAbsent(instanceId, serviceInstance); - } - else - { - instances.put(instanceId, serviceInstance); - } + + instances.putIfAbsent(instanceId, serviceInstance); cache.clearDataBytes(childData.getPath(), childData.getStat().getVersion()); } + + private Tuple addOrUpdateInstance(ChildData childData) throws Exception + { + String instanceId = instanceIdFromData(childData); + ServiceInstance serviceInstance = discovery.getSerializer().deserialize(childData.getData()); + final Tuple result = new Tuple<>(instances.put(instanceId, serviceInstance), serviceInstance); + cache.clearDataBytes(childData.getPath(), childData.getStat().getVersion()); + return result; + } + + private static class Tuple { + private final ServiceInstance oldInstance; + private final ServiceInstance newInstance; + + private Tuple(final ServiceInstance oldInstance, final ServiceInstance newInstance) { + this.oldInstance = oldInstance; + this.newInstance = newInstance; + } + } } diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java index fda5c26965..509df7b539 100644 --- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java +++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java @@ -28,6 +28,7 @@ import org.apache.curator.test.ExecuteCalledWatchingExecutorService; import org.apache.curator.test.Timing; import org.apache.curator.utils.CloseableUtils; +import org.apache.curator.x.discovery.details.ServiceCacheEventListener; import org.apache.curator.x.discovery.details.ServiceCacheListener; import org.testng.Assert; import org.testng.annotations.Test; @@ -39,9 +40,12 @@ import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; public class TestServiceCache extends BaseClassForTests { + private final Timing timing = new Timing(); + @Test public void testInitialLoad() throws Exception { @@ -104,8 +108,6 @@ public void stateChanged(CuratorFramework client, ConnectionState newState) @Test public void testViaProvider() throws Exception { - Timing timing = new Timing(); - List closeables = Lists.newArrayList(); try { @@ -310,4 +312,74 @@ public void stateChanged(CuratorFramework client, ConnectionState newState) } } } + + @Test + public void testServiceCacheEventListener() throws Exception + { + List closeables = Lists.newArrayList(); + try + { + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + closeables.add(client); + client.start(); + + ServiceDiscovery discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/discovery").client(client).build(); + closeables.add(discovery); + discovery.start(); + + ServiceCache cache = discovery.serviceCacheBuilder().name("test").build(); + closeables.add(cache); + + final Semaphore latch = new Semaphore(0); + final AtomicBoolean validation = new AtomicBoolean(true); + ServiceCacheEventListener listener = new ServiceCacheEventListener() + { + @Override + public void cacheAdded(final ServiceInstance added) + { + latch.release(); + + validation.compareAndSet(true,added != null); + } + + @Override + public void cacheDeleted(final ServiceInstance deleted) + { + latch.release(); + + validation.compareAndSet(true,deleted != null); + } + + @Override + public void cacheUpdated(final ServiceInstance before, final ServiceInstance after) + { + latch.release(); + + validation.compareAndSet(true, "before".equals(before.getPayload())); + validation.compareAndSet(true, "after".equals(after.getPayload())); + } + }; + cache.getCacheEventListenable().addListener(listener); + cache.start(); + + ServiceInstance instance = ServiceInstance.builder().payload("before").name("test").port(10064).build(); + discovery.registerService(instance); + Assert.assertTrue(timing.acquireSemaphore(latch)); + instance = ServiceInstance.builder().id(instance.getId()).payload("after").name("test").port(10064).build(); + discovery.updateService(instance); + Assert.assertTrue(timing.acquireSemaphore(latch)); + discovery.unregisterService(instance); + Assert.assertTrue(timing.acquireSemaphore(latch)); + + Assert.assertTrue(validation.get()); + } + finally + { + Collections.reverse(closeables); + for ( Closeable c : closeables ) + { + CloseableUtils.closeQuietly(c); + } + } + } }