Skip to content

Commit f08db2e

Browse files
authored
JAVA-2951: Accept multiple node state listeners, schema change listeners and request trackers (#1565)
1 parent 5642d4d commit f08db2e

File tree

42 files changed

+2053
-355
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+2053
-355
lines changed

changelog/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
### 4.13.0 (in progress)
66

7+
- [improvement] JAVA-2951: Accept multiple node state listeners, schema change listeners and request trackers
8+
79
Merged from 4.12.x:
810

911
- [bug] JAVA-2949: Provide mapper support for CompletionStage<Stream<T>>

core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,10 @@ public enum DefaultDriverOption implements DriverOption {
296296
* The class of a session-wide component that tracks the outcome of requests.
297297
*
298298
* <p>Value-type: {@link String}
299+
*
300+
* @deprecated Use {@link #REQUEST_TRACKER_CLASSES} instead.
299301
*/
302+
@Deprecated
300303
REQUEST_TRACKER_CLASS("advanced.request-tracker.class"),
301304
/**
302305
* Whether to log successful requests.
@@ -388,14 +391,20 @@ public enum DefaultDriverOption implements DriverOption {
388391
* The class of a session-wide component that listens for node state changes.
389392
*
390393
* <p>Value-type: {@link String}
394+
*
395+
* @deprecated Use {@link #METADATA_NODE_STATE_LISTENER_CLASSES} instead.
391396
*/
397+
@Deprecated
392398
METADATA_NODE_STATE_LISTENER_CLASS("advanced.node-state-listener.class"),
393399

394400
/**
395401
* The class of a session-wide component that listens for schema changes.
396402
*
397403
* <p>Value-type: {@link String}
404+
*
405+
* @deprecated Use {@link #METADATA_SCHEMA_CHANGE_LISTENER_CLASSES} instead.
398406
*/
407+
@Deprecated
399408
METADATA_SCHEMA_CHANGE_LISTENER_CLASS("advanced.schema-change-listener.class"),
400409

401410
/**
@@ -909,6 +918,27 @@ public enum DefaultDriverOption implements DriverOption {
909918
* <p>Value-type: boolean
910919
*/
911920
PREPARED_CACHE_WEAK_VALUES("advanced.prepared-statements.prepared-cache.weak-values"),
921+
922+
/**
923+
* The classes of session-wide components that track the outcome of requests.
924+
*
925+
* <p>Value-type: List of {@link String}
926+
*/
927+
REQUEST_TRACKER_CLASSES("advanced.request-tracker.classes"),
928+
929+
/**
930+
* The classes of session-wide components that listen for node state changes.
931+
*
932+
* <p>Value-type: List of {@link String}
933+
*/
934+
METADATA_NODE_STATE_LISTENER_CLASSES("advanced.node-state-listener.classes"),
935+
936+
/**
937+
* The classes of session-wide components that listen for schema changes.
938+
*
939+
* <p>Value-type: List of {@link String}
940+
*/
941+
METADATA_SCHEMA_CHANGE_LISTENER_CLASSES("advanced.schema-change-listener.classes"),
912942
;
913943

914944
private final String path;

core/src/main/java/com/datastax/oss/driver/api/core/config/OngoingConfigOptions.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.time.Duration;
2020
import java.util.List;
2121
import java.util.Map;
22+
import java.util.stream.Collectors;
2223

2324
/** An object where config options can be set programmatically. */
2425
public interface OngoingConfigOptions<SelfT extends OngoingConfigOptions<SelfT>> {
@@ -59,6 +60,15 @@ default SelfT withClass(@NonNull DriverOption option, @NonNull Class<?> value) {
5960
return withString(option, value.getName());
6061
}
6162

63+
/**
64+
* Note that this is just a shortcut to call {@link #withStringList(DriverOption, List)} with
65+
* class names obtained from {@link Class#getName()}.
66+
*/
67+
@NonNull
68+
default SelfT withClassList(@NonNull DriverOption option, @NonNull List<Class<?>> values) {
69+
return withStringList(option, values.stream().map(Class::getName).collect(Collectors.toList()));
70+
}
71+
6272
@NonNull
6373
SelfT withStringList(@NonNull DriverOption option, @NonNull List<String> value);
6474

core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -280,10 +280,7 @@ protected static void fillWithDriverDefaults(OptionsMap map) {
280280
map.put(TypedDriverOption.TIMESTAMP_GENERATOR_DRIFT_WARNING_THRESHOLD, Duration.ofSeconds(1));
281281
map.put(TypedDriverOption.TIMESTAMP_GENERATOR_DRIFT_WARNING_INTERVAL, Duration.ofSeconds(10));
282282
map.put(TypedDriverOption.TIMESTAMP_GENERATOR_FORCE_JAVA_CLOCK, false);
283-
map.put(TypedDriverOption.REQUEST_TRACKER_CLASS, "NoopRequestTracker");
284283
map.put(TypedDriverOption.REQUEST_THROTTLER_CLASS, "PassThroughRequestThrottler");
285-
map.put(TypedDriverOption.METADATA_NODE_STATE_LISTENER_CLASS, "NoopNodeStateListener");
286-
map.put(TypedDriverOption.METADATA_SCHEMA_CHANGE_LISTENER_CLASS, "NoopSchemaChangeListener");
287284
map.put(TypedDriverOption.ADDRESS_TRANSLATOR_CLASS, "PassThroughAddressTranslator");
288285
map.put(TypedDriverOption.RESOLVE_CONTACT_POINTS, true);
289286
map.put(TypedDriverOption.PROTOCOL_MAX_FRAME_LENGTH, 256L * 1024 * 1024);

core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -254,9 +254,21 @@ public String toString() {
254254
public static final TypedDriverOption<Duration> TIMESTAMP_GENERATOR_DRIFT_WARNING_INTERVAL =
255255
new TypedDriverOption<>(
256256
DefaultDriverOption.TIMESTAMP_GENERATOR_DRIFT_WARNING_INTERVAL, GenericType.DURATION);
257-
/** The class of a session-wide component that tracks the outcome of requests. */
257+
258+
/**
259+
* The class of a session-wide component that tracks the outcome of requests.
260+
*
261+
* @deprecated Use {@link #REQUEST_TRACKER_CLASSES} instead.
262+
*/
263+
@Deprecated
258264
public static final TypedDriverOption<String> REQUEST_TRACKER_CLASS =
259265
new TypedDriverOption<>(DefaultDriverOption.REQUEST_TRACKER_CLASS, GenericType.STRING);
266+
267+
/** The classes of session-wide components that track the outcome of requests. */
268+
public static final TypedDriverOption<List<String>> REQUEST_TRACKER_CLASSES =
269+
new TypedDriverOption<>(
270+
DefaultDriverOption.REQUEST_TRACKER_CLASSES, GenericType.listOf(String.class));
271+
260272
/** Whether to log successful requests. */
261273
public static final TypedDriverOption<Boolean> REQUEST_LOGGER_SUCCESS_ENABLED =
262274
new TypedDriverOption<>(
@@ -312,14 +324,39 @@ public String toString() {
312324
public static final TypedDriverOption<Duration> REQUEST_THROTTLER_DRAIN_INTERVAL =
313325
new TypedDriverOption<>(
314326
DefaultDriverOption.REQUEST_THROTTLER_DRAIN_INTERVAL, GenericType.DURATION);
315-
/** The class of a session-wide component that listens for node state changes. */
327+
328+
/**
329+
* The class of a session-wide component that listens for node state changes.
330+
*
331+
* @deprecated Use {@link #METADATA_NODE_STATE_LISTENER_CLASSES} instead.
332+
*/
333+
@Deprecated
316334
public static final TypedDriverOption<String> METADATA_NODE_STATE_LISTENER_CLASS =
317335
new TypedDriverOption<>(
318336
DefaultDriverOption.METADATA_NODE_STATE_LISTENER_CLASS, GenericType.STRING);
319-
/** The class of a session-wide component that listens for schema changes. */
337+
338+
/**
339+
* The class of a session-wide component that listens for schema changes.
340+
*
341+
* @deprecated Use {@link #METADATA_SCHEMA_CHANGE_LISTENER_CLASSES} instead.
342+
*/
343+
@Deprecated
320344
public static final TypedDriverOption<String> METADATA_SCHEMA_CHANGE_LISTENER_CLASS =
321345
new TypedDriverOption<>(
322346
DefaultDriverOption.METADATA_SCHEMA_CHANGE_LISTENER_CLASS, GenericType.STRING);
347+
348+
/** The classes of session-wide components that listen for node state changes. */
349+
public static final TypedDriverOption<List<String>> METADATA_NODE_STATE_LISTENER_CLASSES =
350+
new TypedDriverOption<>(
351+
DefaultDriverOption.METADATA_NODE_STATE_LISTENER_CLASSES,
352+
GenericType.listOf(String.class));
353+
354+
/** The classes of session-wide components that listen for schema changes. */
355+
public static final TypedDriverOption<List<String>> METADATA_SCHEMA_CHANGE_LISTENER_CLASSES =
356+
new TypedDriverOption<>(
357+
DefaultDriverOption.METADATA_SCHEMA_CHANGE_LISTENER_CLASSES,
358+
GenericType.listOf(String.class));
359+
323360
/**
324361
* The class of the address translator to use to convert the addresses sent by Cassandra nodes
325362
* into ones that the driver uses to connect.

core/src/main/java/com/datastax/oss/driver/api/core/loadbalancing/LoadBalancingPolicy.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,40 @@
1919
import com.datastax.oss.driver.api.core.metadata.NodeState;
2020
import com.datastax.oss.driver.api.core.session.Request;
2121
import com.datastax.oss.driver.api.core.session.Session;
22+
import com.datastax.oss.driver.api.core.tracker.RequestTracker;
2223
import edu.umd.cs.findbugs.annotations.NonNull;
2324
import edu.umd.cs.findbugs.annotations.Nullable;
2425
import java.util.Map;
26+
import java.util.Optional;
2527
import java.util.Queue;
2628
import java.util.UUID;
2729

2830
/** Decides which Cassandra nodes to contact for each query. */
2931
public interface LoadBalancingPolicy extends AutoCloseable {
3032

33+
/**
34+
* Returns an optional {@link RequestTracker} to be registered with the session. Registering a
35+
* request tracker allows load-balancing policies to track node latencies in order to pick the
36+
* fastest ones.
37+
*
38+
* <p>This method is invoked only once during session configuration, and before any other methods
39+
* in this interface. Note that at this point, the driver hasn't connected to any node yet.
40+
*
41+
* @since 4.13.0
42+
*/
43+
@NonNull
44+
default Optional<RequestTracker> getRequestTracker() {
45+
return Optional.empty();
46+
}
47+
3148
/**
3249
* Initializes this policy with the nodes discovered during driver initialization.
3350
*
3451
* <p>This method is guaranteed to be called exactly once per instance, and before any other
35-
* method in this class. At this point, the driver has successfully connected to one of the
36-
* contact points, and performed a first refresh of topology information (by default, the contents
37-
* of {@code system.peers}), to discover other nodes in the cluster.
52+
* method in this interface except {@link #getRequestTracker()}. At this point, the driver has
53+
* successfully connected to one of the contact points, and performed a first refresh of topology
54+
* information (by default, the contents of {@code system.peers}), to discover other nodes in the
55+
* cluster.
3856
*
3957
* <p>This method must call {@link DistanceReporter#setDistance(Node, NodeDistance)
4058
* distanceReporter.setDistance} for each provided node (otherwise that node will stay at distance
@@ -50,7 +68,7 @@ public interface LoadBalancingPolicy extends AutoCloseable {
5068
* @param nodes all the nodes that are known to exist in the cluster (regardless of their state)
5169
* at the time of invocation.
5270
* @param distanceReporter an object that will be used by the policy to signal distance changes.
53-
* Implementations will typically store a this in a field, since new nodes may get {@link
71+
* Implementations will typically store this in a field, since new nodes may get {@link
5472
* #onAdd(Node) added} later and will need to have their distance set (or the policy might
5573
* change distances dynamically over time).
5674
*/

core/src/main/java/com/datastax/oss/driver/api/core/metadata/NodeStateListener.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,9 @@
2323
/**
2424
* A listener that gets notified when nodes states change.
2525
*
26-
* <p>An implementation of this interface can be registered in the configuration, or with {@link
27-
* SessionBuilder#withNodeStateListener(NodeStateListener)}.
26+
* <p>Implementations of this interface can be registered either via the configuration (see {@code
27+
* reference.conf} in the manual or core driver JAR), or programmatically via {@link
28+
* SessionBuilder#addNodeStateListener(NodeStateListener)}.
2829
*
2930
* <p>Note that the methods defined by this interface will be executed by internal driver threads,
3031
* and are therefore expected to have short execution times. If you need to perform long
@@ -33,6 +34,9 @@
3334
*
3435
* <p>If you implement this interface but don't need to implement all the methods, extend {@link
3536
* NodeStateListenerBase}.
37+
*
38+
* <p>If your implementation of this interface requires access to a fully-initialized session,
39+
* consider wrapping it in a {@link SafeInitNodeStateListener}.
3640
*/
3741
public interface NodeStateListener extends AutoCloseable {
3842

core/src/main/java/com/datastax/oss/driver/api/core/metadata/schema/SchemaChangeListener.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,9 @@
2323
/**
2424
* Tracks schema changes.
2525
*
26-
* <p>An implementation of this interface can be registered in the configuration, or with {@link
27-
* SessionBuilder#withSchemaChangeListener(SchemaChangeListener)}.
26+
* <p>Implementations of this interface can be registered either via the configuration (see {@code
27+
* reference.conf} in the manual or core driver JAR), or programmatically via {@link
28+
* SessionBuilder#addSchemaChangeListener(SchemaChangeListener)}.
2829
*
2930
* <p>Note that the methods defined by this interface will be executed by internal driver threads,
3031
* and are therefore expected to have short execution times. If you need to perform long

core/src/main/java/com/datastax/oss/driver/api/core/session/ProgrammaticArguments.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@
2525
import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
2626
import com.datastax.oss.driver.api.core.type.codec.registry.MutableCodecRegistry;
2727
import com.datastax.oss.driver.internal.core.loadbalancing.helper.NodeFilterToDistanceEvaluatorAdapter;
28+
import com.datastax.oss.driver.internal.core.metadata.MultiplexingNodeStateListener;
29+
import com.datastax.oss.driver.internal.core.metadata.schema.MultiplexingSchemaChangeListener;
30+
import com.datastax.oss.driver.internal.core.tracker.MultiplexingRequestTracker;
2831
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList;
2932
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap;
3033
import edu.umd.cs.findbugs.annotations.NonNull;
@@ -33,6 +36,7 @@
3336
import java.util.List;
3437
import java.util.Map;
3538
import java.util.Map.Entry;
39+
import java.util.Objects;
3640
import java.util.UUID;
3741
import java.util.function.Predicate;
3842

@@ -217,18 +221,77 @@ public Builder withNodeStateListener(@Nullable NodeStateListener nodeStateListen
217221
return this;
218222
}
219223

224+
@NonNull
225+
public Builder addNodeStateListener(@NonNull NodeStateListener nodeStateListener) {
226+
Objects.requireNonNull(nodeStateListener, "nodeStateListener cannot be null");
227+
if (this.nodeStateListener == null) {
228+
this.nodeStateListener = nodeStateListener;
229+
} else {
230+
NodeStateListener previousListener = this.nodeStateListener;
231+
if (previousListener instanceof MultiplexingNodeStateListener) {
232+
((MultiplexingNodeStateListener) previousListener).register(nodeStateListener);
233+
} else {
234+
MultiplexingNodeStateListener multiplexingNodeStateListener =
235+
new MultiplexingNodeStateListener();
236+
multiplexingNodeStateListener.register(previousListener);
237+
multiplexingNodeStateListener.register(nodeStateListener);
238+
this.nodeStateListener = multiplexingNodeStateListener;
239+
}
240+
}
241+
return this;
242+
}
243+
220244
@NonNull
221245
public Builder withSchemaChangeListener(@Nullable SchemaChangeListener schemaChangeListener) {
222246
this.schemaChangeListener = schemaChangeListener;
223247
return this;
224248
}
225249

250+
@NonNull
251+
public Builder addSchemaChangeListener(@NonNull SchemaChangeListener schemaChangeListener) {
252+
Objects.requireNonNull(schemaChangeListener, "schemaChangeListener cannot be null");
253+
if (this.schemaChangeListener == null) {
254+
this.schemaChangeListener = schemaChangeListener;
255+
} else {
256+
SchemaChangeListener previousListener = this.schemaChangeListener;
257+
if (previousListener instanceof MultiplexingSchemaChangeListener) {
258+
((MultiplexingSchemaChangeListener) previousListener).register(schemaChangeListener);
259+
} else {
260+
MultiplexingSchemaChangeListener multiplexingSchemaChangeListener =
261+
new MultiplexingSchemaChangeListener();
262+
multiplexingSchemaChangeListener.register(previousListener);
263+
multiplexingSchemaChangeListener.register(schemaChangeListener);
264+
this.schemaChangeListener = multiplexingSchemaChangeListener;
265+
}
266+
}
267+
return this;
268+
}
269+
226270
@NonNull
227271
public Builder withRequestTracker(@Nullable RequestTracker requestTracker) {
228272
this.requestTracker = requestTracker;
229273
return this;
230274
}
231275

276+
@NonNull
277+
public Builder addRequestTracker(@NonNull RequestTracker requestTracker) {
278+
Objects.requireNonNull(requestTracker, "requestTracker cannot be null");
279+
if (this.requestTracker == null) {
280+
this.requestTracker = requestTracker;
281+
} else {
282+
RequestTracker previousTracker = this.requestTracker;
283+
if (previousTracker instanceof MultiplexingRequestTracker) {
284+
((MultiplexingRequestTracker) previousTracker).register(requestTracker);
285+
} else {
286+
MultiplexingRequestTracker multiplexingRequestTracker = new MultiplexingRequestTracker();
287+
multiplexingRequestTracker.register(previousTracker);
288+
multiplexingRequestTracker.register(requestTracker);
289+
this.requestTracker = multiplexingRequestTracker;
290+
}
291+
}
292+
return this;
293+
}
294+
232295
@NonNull
233296
public Builder withLocalDatacenter(
234297
@NonNull String profileName, @NonNull String localDatacenter) {

0 commit comments

Comments
 (0)