Skip to content

Commit 7157f53

Browse files
committed
Merge branch '4.11.x' into 4.x
2 parents 7233016 + 4d7de91 commit 7157f53

File tree

35 files changed

+507
-198
lines changed

35 files changed

+507
-198
lines changed

Jenkinsfile

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -220,8 +220,7 @@ pipeline {
220220
'dse-5.1', // Legacy DataStax Enterprise
221221
'dse-6.0', // Previous DataStax Enterprise
222222
'dse-6.7', // Previous DataStax Enterprise
223-
'dse-6.8.0', // Current DataStax Enterprise
224-
'dse-6.8', // Development DataStax Enterprise
223+
'dse-6.8', // Current DataStax Enterprise
225224
'ALL'],
226225
description: '''Apache Cassandra&reg; and DataStax Enterprise server version to use for adhoc <b>BUILD-AND-EXECUTE-TESTS</b> builds
227226
<table style="width:100%">
@@ -271,13 +270,9 @@ pipeline {
271270
<td><strong>dse-6.7</strong></td>
272271
<td>DataStax Enterprise v6.7.x</td>
273272
</tr>
274-
<tr>
275-
<td><strong>dse-6.8.0</strong></td>
276-
<td>DataStax Enterprise v6.8.0</td>
277-
</tr>
278273
<tr>
279274
<td><strong>dse-6.8</strong></td>
280-
<td>DataStax Enterprise v6.8.x (<b>CURRENTLY UNDER DEVELOPMENT</b>)</td>
275+
<td>DataStax Enterprise v6.8.x</td>
281276
</tr>
282277
</table>''')
283278
choice(
@@ -361,13 +356,13 @@ pipeline {
361356
// schedules only run against release branches (i.e. 3.x, 4.x, 4.5.x, etc.)
362357
parameterizedCron(branchPatternCron.matcher(env.BRANCH_NAME).matches() ? """
363358
# Every weeknight (Monday - Friday) around 2:00 AM
364-
### JDK8 tests against 2.1, 3.0, DSE 4.8, DSE 5.0, DSE 5.1, DSE-6.0 and DSE 6.7
365-
H 2 * * 1-5 %CI_SCHEDULE=WEEKNIGHTS;CI_SCHEDULE_SERVER_VERSIONS=2.1 3.0 dse-4.8 dse-5.0 dse-5.1 dse-6.0 dse-6.7;CI_SCHEDULE_JABBA_VERSION=1.8
366-
### JDK11 tests against 3.11, 4.0, DSE 6.7 and DSE 6.8.0
367-
H 2 * * 1-5 %CI_SCHEDULE=WEEKNIGHTS;CI_SCHEDULE_SERVER_VERSIONS=3.11 4.0 dse-6.7 dse-6.8.0;[email protected]
359+
### JDK8 tests against 2.1, 3.0, DSE 4.8, DSE 5.0, DSE 5.1, DSE-6.0, DSE 6.7 and DSE 6.8
360+
H 2 * * 1-5 %CI_SCHEDULE=WEEKNIGHTS;CI_SCHEDULE_SERVER_VERSIONS=2.1 3.0 dse-4.8 dse-5.0 dse-5.1 dse-6.0 dse-6.7 dse-6.8;CI_SCHEDULE_JABBA_VERSION=1.8
361+
### JDK11 tests against 3.11, 4.0 and DSE 6.8
362+
H 2 * * 1-5 %CI_SCHEDULE=WEEKNIGHTS;CI_SCHEDULE_SERVER_VERSIONS=3.11 4.0 dse-6.8;[email protected]
368363
# Every weekend (Sunday) around 12:00 PM noon
369-
### JDK14 tests against 3.11, 4.0, DSE 6.7, DSE 6.8.0 and DSE 6.8.X
370-
H 12 * * 0 %CI_SCHEDULE=WEEKENDS;CI_SCHEDULE_SERVER_VERSIONS=3.11 4.0 dse-6.7 dse-6.8.0 dse-6.8;[email protected]
364+
### JDK14 tests against 3.11, 4.0 and DSE 6.8
365+
H 12 * * 0 %CI_SCHEDULE=WEEKENDS;CI_SCHEDULE_SERVER_VERSIONS=3.11 4.0 dse-6.8;[email protected]
371366
""" : "")
372367
}
373368

@@ -403,7 +398,7 @@ pipeline {
403398
name 'SERVER_VERSION'
404399
values '3.11', // Latest stable Apache CassandraⓇ
405400
'4.0', // Development Apache CassandraⓇ
406-
'dse-6.8.0' // Current DataStax Enterprise
401+
'dse-6.8' // Current DataStax Enterprise
407402
}
408403
}
409404

@@ -521,8 +516,7 @@ pipeline {
521516
'dse-5.1', // Legacy DataStax Enterprise
522517
'dse-6.0', // Previous DataStax Enterprise
523518
'dse-6.7', // Previous DataStax Enterprise
524-
'dse-6.8.0', // Current DataStax Enterprise
525-
'dse-6.8' // Development DataStax Enterprise
519+
'dse-6.8' // Current DataStax Enterprise
526520
}
527521
}
528522
when {

changelog/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
### 4.11.2 (in progress)
66

77
- [bug] JAVA-2945: Reinstate InternalDriverContext.getNodeFilter method
8+
- [bug] JAVA-2947: Release buffer after decoding multi-slice frame
89
- [bug] JAVA-2946: Make MapperResultProducerService instances be located with user-provided class loader
910
- [bug] JAVA-2942: GraphStatement.setConsistencyLevel() is not effective
1011
- [bug] JAVA-2941: Cannot add a single static column with the alter table API

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,7 @@ private class SingleThreaded {
314314
private SingleThreaded(InternalDriverContext context, DriverExecutionProfile config) {
315315
this.schemaRefreshDebouncer =
316316
new Debouncer<>(
317+
logPrefix + "|metadata debouncer",
317318
adminExecutor,
318319
this::coalesceSchemaRequests,
319320
this::startSchemaRequest,

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/NodeStateManager.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ private SingleThreaded(InternalDriverContext context) {
103103
DriverExecutionProfile config = context.getConfig().getDefaultProfile();
104104
this.topologyEventDebouncer =
105105
new Debouncer<>(
106+
logPrefix + "|topology debouncer",
106107
adminExecutor,
107108
this::coalesceTopologyEvents,
108109
this::flushTopologyEvents,

core/src/main/java/com/datastax/oss/driver/internal/core/protocol/SegmentToFrameDecoder.java

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -69,14 +69,19 @@ protected void decode(
6969
private void decodeSelfContained(Segment<ByteBuf> segment, List<Object> out) {
7070
ByteBuf payload = segment.payload;
7171
int frameCount = 0;
72-
do {
73-
Frame frame = frameCodec.decode(payload);
74-
LOG.trace(
75-
"[{}] Decoded response frame {} from self-contained segment", logPrefix, frame.streamId);
76-
out.add(frame);
77-
frameCount += 1;
78-
} while (payload.isReadable());
79-
payload.release();
72+
try {
73+
do {
74+
Frame frame = frameCodec.decode(payload);
75+
LOG.trace(
76+
"[{}] Decoded response frame {} from self-contained segment",
77+
logPrefix,
78+
frame.streamId);
79+
out.add(frame);
80+
frameCount += 1;
81+
} while (payload.isReadable());
82+
} finally {
83+
payload.release();
84+
}
8085
LOG.trace("[{}] Done processing self-contained segment ({} frames)", logPrefix, frameCount);
8186
}
8287

@@ -89,28 +94,34 @@ private void decodeSlice(Segment<ByteBuf> segment, ByteBufAllocator allocator, L
8994
}
9095
accumulatedSlices.add(slice);
9196
accumulatedLength += slice.readableBytes();
97+
int accumulatedSlicesSize = accumulatedSlices.size();
9298
LOG.trace(
9399
"[{}] Decoded slice {}, {}/{} bytes",
94100
logPrefix,
95-
accumulatedSlices.size(),
101+
accumulatedSlicesSize,
96102
accumulatedLength,
97103
targetLength);
98104
assert accumulatedLength <= targetLength;
99105
if (accumulatedLength == targetLength) {
100106
// We've received enough data to reassemble the whole message
101-
CompositeByteBuf encodedFrame = allocator.compositeBuffer(accumulatedSlices.size());
107+
CompositeByteBuf encodedFrame = allocator.compositeBuffer(accumulatedSlicesSize);
102108
encodedFrame.addComponents(true, accumulatedSlices);
103-
Frame frame = frameCodec.decode(encodedFrame);
109+
Frame frame;
110+
try {
111+
frame = frameCodec.decode(encodedFrame);
112+
} finally {
113+
encodedFrame.release();
114+
// Reset our state
115+
targetLength = UNKNOWN_LENGTH;
116+
accumulatedSlices.clear();
117+
accumulatedLength = 0;
118+
}
104119
LOG.trace(
105120
"[{}] Decoded response frame {} from {} slices",
106121
logPrefix,
107122
frame.streamId,
108-
accumulatedSlices.size());
123+
accumulatedSlicesSize);
109124
out.add(frame);
110-
// Reset our state
111-
targetLength = UNKNOWN_LENGTH;
112-
accumulatedSlices.clear();
113-
accumulatedLength = 0;
114125
}
115126
}
116127
}

core/src/main/java/com/datastax/oss/driver/internal/core/util/concurrent/Debouncer.java

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
public class Debouncer<IncomingT, CoalescedT> {
4545
private static final Logger LOG = LoggerFactory.getLogger(Debouncer.class);
4646

47+
private final String logPrefix;
4748
private final EventExecutor adminExecutor;
4849
private final Consumer<CoalescedT> onFlush;
4950
private final Duration window;
@@ -69,6 +70,27 @@ public Debouncer(
6970
Consumer<CoalescedT> onFlush,
7071
Duration window,
7172
long maxEvents) {
73+
this("debouncer", adminExecutor, coalescer, onFlush, window, maxEvents);
74+
}
75+
76+
/**
77+
* Creates a new instance.
78+
*
79+
* @param logPrefix the log prefix to use in log messages.
80+
* @param adminExecutor the executor that will be used to schedule all tasks.
81+
* @param coalescer how to transform a batch of events into a result.
82+
* @param onFlush what to do with a result.
83+
* @param window the time window.
84+
* @param maxEvents the maximum number of accumulated events before a flush is forced.
85+
*/
86+
public Debouncer(
87+
String logPrefix,
88+
EventExecutor adminExecutor,
89+
Function<List<IncomingT>, CoalescedT> coalescer,
90+
Consumer<CoalescedT> onFlush,
91+
Duration window,
92+
long maxEvents) {
93+
this.logPrefix = logPrefix;
7294
this.coalescer = coalescer;
7395
Preconditions.checkArgument(maxEvents >= 1, "maxEvents should be at least 1");
7496
this.adminExecutor = adminExecutor;
@@ -85,7 +107,8 @@ public void receive(IncomingT element) {
85107
}
86108
if (window.isZero() || maxEvents == 1) {
87109
LOG.debug(
88-
"Received {}, flushing immediately (window = {}, maxEvents = {})",
110+
"[{}] Received {}, flushing immediately (window = {}, maxEvents = {})",
111+
logPrefix,
89112
element,
90113
window,
91114
maxEvents);
@@ -94,20 +117,21 @@ public void receive(IncomingT element) {
94117
currentBatch.add(element);
95118
if (currentBatch.size() == maxEvents) {
96119
LOG.debug(
97-
"Received {}, flushing immediately (because {} accumulated events)",
120+
"[{}] Received {}, flushing immediately (because {} accumulated events)",
121+
logPrefix,
98122
element,
99123
maxEvents);
100124
flushNow();
101125
} else {
102-
LOG.debug("Received {}, scheduling next flush in {}", element, window);
126+
LOG.debug("[{}] Received {}, scheduling next flush in {}", logPrefix, element, window);
103127
scheduleFlush();
104128
}
105129
}
106130
}
107131

108132
public void flushNow() {
109133
assert adminExecutor.inEventLoop();
110-
LOG.debug("Flushing now");
134+
LOG.debug("[{}] Flushing now", logPrefix);
111135
cancelNextFlush();
112136
if (!currentBatch.isEmpty()) {
113137
onFlush.accept(coalescer.apply(currentBatch));
@@ -127,7 +151,7 @@ private void cancelNextFlush() {
127151
if (nextFlush != null && !nextFlush.isDone()) {
128152
boolean cancelled = nextFlush.cancel(true);
129153
if (cancelled) {
130-
LOG.debug("Cancelled existing scheduled flush");
154+
LOG.debug("[{}] Cancelled existing scheduled flush", logPrefix);
131155
}
132156
}
133157
}

core/src/test/java/com/datastax/oss/driver/internal/core/protocol/SegmentToFrameDecoderTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ public void should_decode_sequence_of_slices() {
7373
encodeFrame(new AuthResponse(Bytes.fromHexString("0x" + Strings.repeat("aa", 1011))));
7474
int sliceLength = 100;
7575
do {
76-
ByteBuf payload = encodedFrame.readSlice(Math.min(sliceLength, encodedFrame.readableBytes()));
76+
ByteBuf payload =
77+
encodedFrame.readRetainedSlice(Math.min(sliceLength, encodedFrame.readableBytes()));
7778
channel.writeInbound(new Segment<>(payload, false));
7879
} while (encodedFrame.isReadable());
7980

integration-tests/pom.xml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,7 @@
254254
<reuseForks>false</reuseForks>
255255
<summaryFile>${project.build.directory}/failsafe-reports/failsafe-summary-isolated.xml</summaryFile>
256256
<skipITs>${skipIsolatedITs}</skipITs>
257+
<argLine>${blockhound.argline}</argLine>
257258
</configuration>
258259
</execution>
259260
<execution>
@@ -316,4 +317,16 @@
316317
</plugin>
317318
</plugins>
318319
</build>
320+
<profiles>
321+
<profile>
322+
<id>jdk 13+</id>
323+
<activation>
324+
<jdk>[13,)</jdk>
325+
</activation>
326+
<properties>
327+
<!-- for DriverBlockHoundIntegrationIT when using JDK 13+, see https://github.com/reactor/BlockHound/issues/33 -->
328+
<blockhound.argline>-XX:+AllowRedefinitionToAddDeleteMethods</blockhound.argline>
329+
</properties>
330+
</profile>
331+
</profiles>
319332
</project>

integration-tests/src/test/java/com/datastax/dse/driver/api/core/cql/continuous/ContinuousPagingIT.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import java.time.Duration;
4444
import java.util.Collections;
4545
import java.util.Iterator;
46+
import java.util.Objects;
4647
import java.util.concurrent.CancellationException;
4748
import java.util.concurrent.CompletableFuture;
4849
import java.util.concurrent.CompletionStage;
@@ -183,7 +184,10 @@ public void simple_statement_paging_should_be_resilient_to_schema_change() {
183184
.getDefaultProfile()
184185
.withInt(DseDriverOption.CONTINUOUS_PAGING_MAX_ENQUEUED_PAGES, 1)
185186
.withInt(DseDriverOption.CONTINUOUS_PAGING_PAGE_SIZE, 1)
186-
.withInt(DefaultDriverOption.REQUEST_TIMEOUT, 120000000);
187+
.withDuration(
188+
DseDriverOption.CONTINUOUS_PAGING_TIMEOUT_FIRST_PAGE, Duration.ofSeconds(30))
189+
.withDuration(
190+
DseDriverOption.CONTINUOUS_PAGING_TIMEOUT_OTHER_PAGES, Duration.ofSeconds(30));
187191
ContinuousResultSet result = session.executeContinuously(simple.setExecutionProfile(profile));
188192
Iterator<Row> it = result.iterator();
189193
// First row should have a non-null values.
@@ -193,11 +197,7 @@ public void simple_statement_paging_should_be_resilient_to_schema_change() {
193197
// Make schema change to add b, its metadata should NOT be present in subsequent rows.
194198
CqlSession schemaChangeSession =
195199
SessionUtils.newSession(
196-
ccmRule,
197-
session.getKeyspace().orElseThrow(IllegalStateException::new),
198-
SessionUtils.configLoaderBuilder()
199-
.withDuration(DefaultDriverOption.REQUEST_TIMEOUT, Duration.ofSeconds(30))
200-
.build());
200+
ccmRule, session.getKeyspace().orElseThrow(IllegalStateException::new));
201201
SimpleStatement statement =
202202
SimpleStatement.newInstance("ALTER TABLE test_prepare add b int")
203203
.setExecutionProfile(sessionRule.slowProfile());
@@ -251,7 +251,11 @@ public void prepared_statement_paging_should_be_resilient_to_schema_change() {
251251
.getConfig()
252252
.getDefaultProfile()
253253
.withInt(DseDriverOption.CONTINUOUS_PAGING_MAX_ENQUEUED_PAGES, 1)
254-
.withInt(DseDriverOption.CONTINUOUS_PAGING_PAGE_SIZE, 1);
254+
.withInt(DseDriverOption.CONTINUOUS_PAGING_PAGE_SIZE, 1)
255+
.withDuration(
256+
DseDriverOption.CONTINUOUS_PAGING_TIMEOUT_FIRST_PAGE, Duration.ofSeconds(30))
257+
.withDuration(
258+
DseDriverOption.CONTINUOUS_PAGING_TIMEOUT_OTHER_PAGES, Duration.ofSeconds(30));
255259
ContinuousResultSet result =
256260
session.executeContinuously(prepared.bind("foo").setExecutionProfile(profile));
257261
Iterator<Row> it = result.iterator();
@@ -262,11 +266,7 @@ public void prepared_statement_paging_should_be_resilient_to_schema_change() {
262266
// Make schema change to drop v, its metadata should be present, values will be null.
263267
CqlSession schemaChangeSession =
264268
SessionUtils.newSession(
265-
ccmRule,
266-
session.getKeyspace().orElseThrow(IllegalStateException::new),
267-
SessionUtils.configLoaderBuilder()
268-
.withDuration(DefaultDriverOption.REQUEST_TIMEOUT, Duration.ofSeconds(30))
269-
.build());
269+
ccmRule, session.getKeyspace().orElseThrow(IllegalStateException::new));
270270
schemaChangeSession.execute("ALTER TABLE test_prep DROP v;");
271271
while (it.hasNext()) {
272272
// Each row should have a value for k, v should still be present, but null since column was
@@ -276,7 +276,7 @@ public void prepared_statement_paging_should_be_resilient_to_schema_change() {
276276
if (ccmRule
277277
.getDseVersion()
278278
.orElseThrow(IllegalStateException::new)
279-
.compareTo(Version.parse("6.0.0"))
279+
.compareTo(Objects.requireNonNull(Version.parse("6.0.0")))
280280
>= 0) {
281281
// DSE 6 only, v should be null here since dropped.
282282
// Not reliable for 5.1 since we may have gotten page queued before schema changed.

integration-tests/src/test/java/com/datastax/dse/driver/api/core/cql/continuous/ContinuousPagingITBase.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -105,17 +105,21 @@ public static Object[][] pagingOptions() {
105105

106106
protected void validateMetrics(CqlSession session) {
107107
Node node = session.getMetadata().getNodes().values().iterator().next();
108-
assertThat(session.getMetrics()).isPresent();
108+
assertThat(session.getMetrics()).as("assert session.getMetrics() present").isPresent();
109109
Metrics metrics = session.getMetrics().get();
110-
assertThat(metrics.getNodeMetric(node, DefaultNodeMetric.CQL_MESSAGES)).isPresent();
110+
assertThat(metrics.getNodeMetric(node, DefaultNodeMetric.CQL_MESSAGES))
111+
.as("assert metrics.getNodeMetric(node, DefaultNodeMetric.CQL_MESSAGES) present")
112+
.isPresent();
111113
Timer messages = (Timer) metrics.getNodeMetric(node, DefaultNodeMetric.CQL_MESSAGES).get();
112-
assertThat(messages.getCount()).isGreaterThan(0);
113-
assertThat(messages.getMeanRate()).isGreaterThan(0);
114-
assertThat(metrics.getSessionMetric(DseSessionMetric.CONTINUOUS_CQL_REQUESTS)).isPresent();
114+
assertThat(messages.getCount()).as("assert messages.getCount() >= 0").isGreaterThan(0);
115+
assertThat(messages.getMeanRate()).as("assert messages.getMeanRate() >= 0").isGreaterThan(0);
116+
assertThat(metrics.getSessionMetric(DseSessionMetric.CONTINUOUS_CQL_REQUESTS))
117+
.as("assert metrics.getSessionMetric(DseSessionMetric.CONTINUOUS_CQL_REQUESTS) present")
118+
.isPresent();
115119
Timer requests =
116120
(Timer) metrics.getSessionMetric(DseSessionMetric.CONTINUOUS_CQL_REQUESTS).get();
117-
assertThat(requests.getCount()).isGreaterThan(0);
118-
assertThat(requests.getMeanRate()).isGreaterThan(0);
121+
assertThat(requests.getCount()).as("assert requests.getCount() >= 0").isGreaterThan(0);
122+
assertThat(requests.getMeanRate()).as("assert requests.getMeanRate() >= 0").isGreaterThan(0);
119123
}
120124

121125
public static class Options {

0 commit comments

Comments
 (0)