Skip to content

Commit df96494

Browse files
committed
Merge branch 'cassandra-5.0' into trunk
* cassandra-5.0: Support null column value tombstones in FQL batch statements
2 parents cc63d3c + aee3c75 commit df96494

File tree

6 files changed

+129
-8
lines changed

6 files changed

+129
-8
lines changed

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,7 @@ Merged from 4.1:
279279
* Reduce info logging from automatic paxos repair (CASSANDRA-19445)
280280
* Support legacy plain_text_auth section in credentials file removed unintentionally (CASSANDRA-19498)
281281
Merged from 4.0:
282+
* Support null column value tombstones in FQL batch statements (CASSANDRA-20397)
282283
* Fix millisecond and microsecond precision for commit log replay (CASSANDRA-19448)
283284
* Improve accuracy of memtable heap usage tracking (CASSANDRA-17298)
284285
* Fix rendering UNSET collection types in query tracing (CASSANDRA-19880)

src/java/org/apache/cassandra/fql/FullQueryLogger.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -454,9 +454,7 @@ public void writeMarshallablePayload(WireOut wire)
454454
{
455455
valueOut.int32(subValues.size());
456456
for (ByteBuffer value : subValues)
457-
{
458-
valueOut.bytes(BytesStore.wrap(value));
459-
}
457+
valueOut.bytes(value == null ? null : BytesStore.wrap(value));
460458
}
461459
}
462460

test/distributed/org/apache/cassandra/distributed/test/FqlReplayDDLExclusionTest.java renamed to test/distributed/org/apache/cassandra/distributed/test/fql/FqlReplayDDLExclusionTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.cassandra.distributed.test;
19+
package org.apache.cassandra.distributed.test.fql;
2020

2121
import org.junit.Ignore;
2222
import org.junit.Rule;
@@ -27,6 +27,7 @@
2727
import org.apache.cassandra.distributed.Cluster;
2828
import org.apache.cassandra.distributed.api.IInvokableInstance;
2929
import org.apache.cassandra.distributed.api.QueryResults;
30+
import org.apache.cassandra.distributed.test.TestBaseImpl;
3031
import org.apache.cassandra.tools.ToolRunner;
3132
import org.apache.cassandra.tools.ToolRunner.ToolResult;
3233

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.distributed.test.fql;
20+
21+
import com.google.common.collect.Sets;
22+
import org.junit.AfterClass;
23+
import org.junit.BeforeClass;
24+
import org.junit.Rule;
25+
import org.junit.Test;
26+
import org.junit.rules.TemporaryFolder;
27+
28+
import com.datastax.driver.core.BatchStatement;
29+
import com.datastax.driver.core.PreparedStatement;
30+
import com.datastax.driver.core.Session;
31+
import org.apache.cassandra.distributed.Cluster;
32+
import org.apache.cassandra.distributed.test.TestBaseImpl;
33+
import org.apache.cassandra.tools.ToolRunner;
34+
35+
import static org.junit.Assert.assertEquals;
36+
import static org.junit.Assert.assertTrue;
37+
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
38+
import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
39+
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
40+
import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
41+
import static org.apache.cassandra.distributed.shared.AssertUtils.row;
42+
43+
public class FqlTombstoneHandlingTest extends TestBaseImpl
44+
{
45+
private static Cluster CLUSTER;
46+
47+
@Rule
48+
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
49+
50+
@BeforeClass
51+
public static void beforeClass() throws Throwable
52+
{
53+
CLUSTER = init(Cluster.build(1).withConfig(updater -> updater.with(NETWORK, GOSSIP, NATIVE_PROTOCOL)).start());
54+
}
55+
56+
@Test
57+
public void testNullCellBindingInBatch()
58+
{
59+
String tableName = "null_as_tombstone_in_batch";
60+
CLUSTER.schemaChange(withKeyspace("CREATE TABLE %s." + tableName + " (k int, c int, s set<int>, primary key (k, c))"));
61+
CLUSTER.get(1).nodetool("enablefullquerylog", "--path", temporaryFolder.getRoot().getAbsolutePath());
62+
String insertTemplate = withKeyspace("INSERT INTO %s." + tableName + " (k, c, s) VALUES ( ?, ?, ?) USING TIMESTAMP 2");
63+
String select = withKeyspace("SELECT * FROM %s." + tableName + " WHERE k = 0 AND c = 0");
64+
65+
com.datastax.driver.core.Cluster.Builder builder1 =com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1");
66+
67+
// Use the driver to write this initial row, since otherwise we won't hit the dispatcher
68+
try (com.datastax.driver.core.Cluster cluster1 = builder1.build(); Session session1 = cluster1.connect())
69+
{
70+
BatchStatement batch = new BatchStatement(BatchStatement.Type.UNLOGGED);
71+
PreparedStatement preparedWrite = session1.prepare(insertTemplate);
72+
batch.add(preparedWrite.bind(0, 0, null));
73+
session1.execute(batch);
74+
}
75+
76+
CLUSTER.get(1).nodetool("disablefullquerylog");
77+
78+
// The dump should contain a null entry for our tombstone
79+
ToolRunner.ToolResult runner = ToolRunner.invokeClass("org.apache.cassandra.fqltool.FullQueryLogTool",
80+
"dump",
81+
"--",
82+
temporaryFolder.getRoot().getAbsolutePath());
83+
assertTrue(runner.getStdout().contains(insertTemplate));
84+
assertEquals(0, runner.getExitCode());
85+
86+
Object[][] preReplayResult = CLUSTER.get(1).executeInternal(select);
87+
assertRows(preReplayResult, row(0, 0, null));
88+
89+
// Make sure the row no longer exists after truncate...
90+
CLUSTER.get(1).executeInternal(withKeyspace("TRUNCATE %s." + tableName));
91+
assertRows(CLUSTER.get(1).executeInternal(select));
92+
93+
// ...insert a new row with an actual value for the set at an earlier timestamp...
94+
CLUSTER.get(1).executeInternal(withKeyspace("INSERT INTO %s." + tableName + " (k, c, s) VALUES ( ?, ?, ?) USING TIMESTAMP 1"), 0, 0, Sets.newHashSet(1));
95+
assertRows(CLUSTER.get(1).executeInternal(select), row(0, 0, Sets.newHashSet(1)));
96+
97+
runner = ToolRunner.invokeClass("org.apache.cassandra.fqltool.FullQueryLogTool",
98+
"replay",
99+
"--keyspace", KEYSPACE,
100+
"--target", "127.0.0.1",
101+
"--", temporaryFolder.getRoot().getAbsolutePath());
102+
assertEquals(0, runner.getExitCode());
103+
104+
// ...then ensure the replayed row deletes the one we wrote before replay.
105+
Object[][] postReplayResult = CLUSTER.get(1).executeInternal(select);
106+
assertRows(postReplayResult, preReplayResult);
107+
}
108+
109+
@AfterClass
110+
public static void afterClass()
111+
{
112+
if (CLUSTER != null)
113+
CLUSTER.close();
114+
}
115+
}

tools/fqltool/src/org/apache/cassandra/fqltool/FQLQueryReader.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,10 @@ public void readMarshallable(WireIn wireIn) throws IORuntimeException
9494
values.add(subValues);
9595
int numSubValues = in.int32();
9696
for (int zz = 0; zz < numSubValues; zz++)
97-
subValues.add(ByteBuffer.wrap(in.bytes()));
97+
{
98+
byte[] valueBytes = in.bytes();
99+
subValues.add(valueBytes == null ? null : ByteBuffer.wrap(valueBytes));
100+
}
98101
}
99102
query = new FQLQuery.Batch(keyspace,
100103
protocolVersion,

tools/fqltool/src/org/apache/cassandra/fqltool/commands/Dump.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ public static void dump(List<String> arguments, String rollCycle, boolean follow
126126
break;
127127

128128
case (FullQueryLogger.BATCH):
129-
dumpBatch(options, wireIn, sb);
129+
dumpBatch(wireIn, sb);
130130
break;
131131

132132
default:
@@ -183,7 +183,7 @@ static void dumpQuery(QueryOptions options, WireIn wireIn, StringBuilder sb)
183183
sb.append(System.lineSeparator());
184184
}
185185

186-
private static void dumpBatch(QueryOptions options, WireIn wireIn, StringBuilder sb)
186+
private static void dumpBatch(WireIn wireIn, StringBuilder sb)
187187
{
188188
sb.append("Batch type: ")
189189
.append(wireIn.read(FullQueryLogger.BATCH_TYPE).text())
@@ -203,7 +203,10 @@ private static void dumpBatch(QueryOptions options, WireIn wireIn, StringBuilder
203203
int numSubValues = in.int32();
204204
List<ByteBuffer> subValues = new ArrayList<>(numSubValues);
205205
for (int j = 0; j < numSubValues; j++)
206-
subValues.add(ByteBuffer.wrap(in.bytes()));
206+
{
207+
byte[] valueBytes = in.bytes();
208+
subValues.add(valueBytes == null ? null : ByteBuffer.wrap(valueBytes));
209+
}
207210

208211
sb.append("Query: ")
209212
.append(queries.get(i))

0 commit comments

Comments
 (0)