Skip to content

Commit f727a0b

Browse files
demonstrate test failure for CASSANDRA-21050
1 parent 5374ff5 commit f727a0b

File tree

1 file changed

+363
-0
lines changed

1 file changed

+363
-0
lines changed
Lines changed: 363 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,363 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.distributed.test;
20+
21+
import java.io.IOException;
22+
import java.nio.file.AccessDeniedException;
23+
import java.nio.file.Files;
24+
import java.nio.file.Path;
25+
import java.nio.file.Paths;
26+
import java.nio.file.attribute.PosixFilePermission;
27+
import java.util.ArrayList;
28+
import java.util.Arrays;
29+
import java.util.Comparator;
30+
import java.util.HashMap;
31+
import java.util.List;
32+
import java.util.Map;
33+
import java.util.Optional;
34+
import java.util.Set;
35+
import java.util.concurrent.ExecutionException;
36+
import java.util.concurrent.TimeUnit;
37+
import java.util.concurrent.TimeoutException;
38+
import java.util.function.BiFunction;
39+
import java.util.function.IntFunction;
40+
import java.util.stream.Collectors;
41+
42+
import org.apache.commons.io.FileUtils;
43+
import org.assertj.core.api.Assertions;
44+
import org.junit.Ignore;
45+
import org.junit.Test;
46+
import org.slf4j.Logger;
47+
import org.slf4j.LoggerFactory;
48+
49+
import com.datastax.driver.core.ResultSet;
50+
import com.datastax.driver.core.Session;
51+
import com.fasterxml.jackson.databind.node.ArrayNode;
52+
import org.apache.cassandra.db.Keyspace;
53+
import org.apache.cassandra.distributed.Cluster;
54+
import org.apache.cassandra.distributed.api.ConsistencyLevel;
55+
import org.apache.cassandra.distributed.api.ICoordinator;
56+
import org.apache.cassandra.distributed.api.IInstance;
57+
import org.apache.cassandra.distributed.api.IInvokableInstance;
58+
import org.apache.cassandra.io.FSWriteError;
59+
import org.apache.cassandra.io.util.File;
60+
import org.apache.cassandra.io.util.PathUtils;
61+
import org.apache.cassandra.schema.Schema;
62+
import org.apache.cassandra.tools.SSTableExport;
63+
import org.apache.cassandra.tools.ToolRunner;
64+
import org.apache.cassandra.utils.Collectors3;
65+
import org.apache.cassandra.utils.JsonUtils;
66+
67+
import static java.nio.charset.StandardCharsets.UTF_8;
68+
import static java.nio.file.StandardOpenOption.CREATE;
69+
import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
70+
import static java.util.Arrays.asList;
71+
72+
import static org.apache.cassandra.config.DatabaseDescriptor.getCommitLogLocation;
73+
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
74+
import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
75+
import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
76+
import static org.apache.cassandra.distributed.shared.AssertUtils.row;
77+
import static org.assertj.core.api.Assertions.assertThat;
78+
79+
public class MissingRowsOnDroppedColumnSchemaRecreateTest extends TestBaseImpl
80+
{
81+
private final static Logger logger = LoggerFactory.getLogger(MissingRowsOnDroppedColumnSchemaRecreateTest.class);
82+
83+
private final static Path TEST_DATA_UDT_PATH = Paths.get("test/data/udt");
84+
private final static Path TMP_PRODUCT_PATH = TEST_DATA_UDT_PATH.resolve("tmp");
85+
private final static String KS = "ks";
86+
private final static String SCHEMA_TXT = "schema.txt";
87+
private final static String SCHEMA0_TXT = "schema0.txt";
88+
private final static String DATA_JSON = "data.json";
89+
90+
private Cluster startCluster() throws IOException
91+
{
92+
Cluster cluster = Cluster.build(1).withConfig(config -> config.set("auto_snapshot", "false")
93+
.set("uuid_sstable_identifiers_enabled", "false")
94+
.with(NATIVE_PROTOCOL)).start();
95+
cluster.setUncaughtExceptionsFilter(t -> {
96+
String cause = Optional.ofNullable(t.getCause()).map(c -> c.getClass().getName()).orElse("");
97+
return t.getClass().getName().equals(FSWriteError.class.getName()) && cause.equals(AccessDeniedException.class.getName());
98+
});
99+
return cluster;
100+
}
101+
102+
private static List<Path> getDataDirectories(IInvokableInstance node)
103+
{
104+
return node.callOnInstance(() -> Keyspace.open(KS).getColumnFamilyStores().stream().map(cfs -> cfs.getDirectories().getDirectoryForNewSSTables().toPath()).collect(Collectors.toList()));
105+
}
106+
107+
@Test
108+
public void testMissingRows() throws Throwable
109+
{
110+
PathUtils.deleteRecursive(TMP_PRODUCT_PATH);
111+
Files.createDirectories(TMP_PRODUCT_PATH);
112+
try (Cluster cluster = startCluster())
113+
{
114+
IInvokableInstance node = cluster.get(1);
115+
node.executeInternal("DROP KEYSPACE IF EXISTS " + KS);
116+
node.executeInternal("CREATE KEYSPACE " + KS + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
117+
createTables(node);
118+
cluster.disableAutoCompaction(KS);
119+
120+
List<Path> dataDirs = getDataDirectories(node);
121+
122+
Map<String, String> schema0 = getSchemaDesc(node);
123+
Files.writeString(TMP_PRODUCT_PATH.resolve(SCHEMA0_TXT),
124+
String.join(";\n", schema0.values()).replaceAll(";;", ";"),
125+
UTF_8,
126+
CREATE, TRUNCATE_EXISTING);
127+
128+
insertData(node, 0, true);
129+
insertData(node, 256, true);
130+
node.flush(KS);
131+
132+
// both rows already written and those written after the drop column can be missing
133+
dropComplexColumn(node);
134+
135+
insertData(node, 128, false);
136+
insertData(node, 256 + 17, false);
137+
138+
Map<String, String> schema1 = getSchemaDesc(node);
139+
Files.writeString(TMP_PRODUCT_PATH.resolve(SCHEMA_TXT),
140+
String.join(";\n", schema1.values()).replaceAll("\nALTER TABLE", ";\nALTER TABLE").replaceAll(";+", ";"),
141+
UTF_8,
142+
CREATE, TRUNCATE_EXISTING);
143+
144+
node.flush(KS);
145+
146+
for (String table : schema1.keySet())
147+
if (table.startsWith("tab"))
148+
node.forceCompact(KS, table);
149+
150+
Map<String, List<List<Object>>> data = selectData(node);
151+
Files.writeString(TMP_PRODUCT_PATH.resolve(DATA_JSON), JsonUtils.writeAsJsonString(data), UTF_8, CREATE, TRUNCATE_EXISTING);
152+
153+
node.shutdown(true).get(10, TimeUnit.SECONDS);
154+
155+
Path ksTargetPath = TMP_PRODUCT_PATH.resolve(KS);
156+
Files.createDirectories(ksTargetPath);
157+
for (Path dir : dataDirs)
158+
{
159+
String name = dir.getFileName().toString();
160+
Path targetDir = ksTargetPath.resolve(name);
161+
Files.createDirectories(targetDir);
162+
FileUtils.copyDirectory(dir.toFile(), targetDir.toFile(), pathname -> !pathname.toString().endsWith(".log"));
163+
}
164+
}
165+
Thread.sleep(2000);
166+
167+
// new cluster
168+
try (Cluster cluster = startCluster())
169+
{
170+
IInvokableInstance node = cluster.get(1);
171+
node.executeInternal("DROP KEYSPACE IF EXISTS " + KS);
172+
node.executeInternal("CREATE KEYSPACE " + KS + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
173+
174+
for (String stmt : Files.readString(TMP_PRODUCT_PATH.resolve(SCHEMA_TXT), UTF_8).split(";"))
175+
{
176+
if (!stmt.isBlank())
177+
{
178+
logger.info("Executing: {}", stmt);
179+
node.executeInternal(stmt);
180+
}
181+
}
182+
183+
cluster.disableAutoCompaction(KS);
184+
185+
List<Path> dataDirs = getDataDirectories(node);
186+
187+
node.shutdown(true).get(10, TimeUnit.SECONDS);
188+
189+
Path ksSourcePath = TMP_PRODUCT_PATH.resolve(KS);
190+
for (Path dir : dataDirs)
191+
{
192+
String name = dir.getFileName().toString();
193+
Path sourceDir = ksSourcePath.resolve(name);
194+
FileUtils.copyDirectory(sourceDir.toFile(), dir.toFile());
195+
}
196+
197+
logger.info("Restarting node");
198+
node.startup();
199+
200+
// verify same data new cluster and schema recreated
201+
Map<String, List<List<Object>>> data1 = selectData(node);
202+
String jsonData0 = Files.readString(TMP_PRODUCT_PATH.resolve(DATA_JSON), UTF_8);
203+
for (String table1 : data1.keySet())
204+
{
205+
List<List<Object>> table1Data = data1.get(table1);
206+
ArrayNode table1Json = (ArrayNode) JsonUtils.JSON_OBJECT_MAPPER.valueToTree(table1Data);
207+
String table0Json = JsonUtils.writeAsJsonString(((Map<?, ?>)JsonUtils.decodeJson(jsonData0)).get(table1));
208+
String missingRows = table0Json;
209+
int originalRowCount = (missingRows.length() - missingRows.replace("[", "").length() -1);
210+
for (List<Object> row1 : table1Data)
211+
{
212+
ArrayNode row1Json = (ArrayNode) JsonUtils.JSON_OBJECT_MAPPER.valueToTree(row1);
213+
missingRows = missingRows.replace(row1Json.toString(), "").replaceAll(",+", ",");
214+
}
215+
String missingMsg = String.format("missing %s/%s rows in %s: %s",
216+
(missingRows.length() - missingRows.replace("[", "").length() -1),
217+
originalRowCount, table1, missingRows.replaceAll(",+", ","));
218+
219+
assertThat(table1Json.toString()).as(missingMsg).isEqualTo(table0Json);
220+
}
221+
String jsonData1 = JsonUtils.writeAsJsonString(data1);
222+
assertThat(jsonData1).isEqualTo(jsonData0);
223+
}
224+
}
225+
226+
private Map<String, String> getSchemaDesc(IInvokableInstance node)
227+
{
228+
return Arrays.stream(node.executeInternal("DESCRIBE " + KS + " WITH INTERNALS"))
229+
.filter(r -> r[1].equals("table") || r[1].equals("type"))
230+
.collect(Collectors3.toImmutableMap(r -> r[2].toString(),
231+
r -> Arrays.stream(r[3].toString().split("\\n"))
232+
.filter(s -> !s.strip().startsWith("AND") || s.contains("DROPPED COLUMN RECORD"))
233+
.collect(Collectors.joining("\n"))));
234+
}
235+
236+
private static String udtValue(int i, List<Integer> bits, BiFunction<Integer, Integer, String> vals)
237+
{
238+
List<String> cols = asList("foo", "bar", "baz");
239+
ArrayList<String> udtVals = new ArrayList<>();
240+
for (int j = 0; j < bits.size(); j++)
241+
{
242+
if ((i & bits.get(j)) != 0)
243+
udtVals.add(cols.get(j) + ": " + vals.apply(i, j));
244+
}
245+
return '{' + String.join(", ", udtVals) + '}';
246+
}
247+
248+
private static String tupleValue(int i, List<Integer> bits, BiFunction<Integer, Integer, String> vals)
249+
{
250+
ArrayList<String> tupleVals = new ArrayList<>();
251+
for (int j = 0; j < bits.size(); j++)
252+
{
253+
if ((i & bits.get(j)) != 0)
254+
tupleVals.add(vals.apply(i, j));
255+
else
256+
tupleVals.add("null");
257+
}
258+
return '(' + String.join(", ", tupleVals) + ')';
259+
}
260+
261+
private static String genInsert(int pk, int i, List<Integer> bits, BiFunction<Integer, Integer, String> vals)
262+
{
263+
List<String> cols = asList("a_int", "b_complex", "c_int");
264+
ArrayList<String> c = new ArrayList<>();
265+
ArrayList<String> v = new ArrayList<>();
266+
for (int j = 0; j < bits.size(); j++)
267+
{
268+
if ((i & bits.get(j)) != 0)
269+
{
270+
c.add(cols.get(j));
271+
v.add(vals.apply(i, j));
272+
}
273+
}
274+
if (c.isEmpty())
275+
return String.format("(pk) VALUES (%d)", pk);
276+
else
277+
return String.format("(pk, %s) VALUES (%d, %s)", String.join(", ", c), pk, String.join(", ", v));
278+
}
279+
280+
private static BiFunction<Integer, Integer, String> valsFunction(IntFunction<String> nonIntFunction)
281+
{
282+
return (i, j) -> {
283+
if (j == 0)
284+
return Integer.toString(i);
285+
if (j == 1)
286+
return nonIntFunction.apply(i);
287+
if (j == 2)
288+
return Integer.toString(i * 2);
289+
assert false;
290+
return null;
291+
};
292+
}
293+
294+
private static BiFunction<Integer, Integer, String> valsFunction()
295+
{
296+
return (i, j) -> {
297+
if (j == 0)
298+
return Integer.toString(i);
299+
if (j == 1)
300+
return String.format("'bar%d'", i);
301+
if (j == 2)
302+
return Integer.toString(i * 2);
303+
assert false;
304+
return null;
305+
};
306+
}
307+
308+
private void insertData(IInstance node, int offset, boolean withComplex)
309+
{
310+
for (int pk = offset; pk < offset + (1 << 2); pk++)
311+
{
312+
int i = withComplex ? (pk - offset) : (pk - offset) & ~(2 + 4 + 8);
313+
node.executeInternal("INSERT INTO " + KS + ".tab2_frozen_udt1 " + genInsert(pk, i, asList(1, 2 + 4 + 8, 16), valsFunction(j -> udtValue(j, asList(2, 4, 8), valsFunction()))));
314+
node.executeInternal("INSERT INTO " + KS + ".tab5_tuple " + genInsert(pk, i, asList(1, 2 + 4 + 8, 16), valsFunction(j -> tupleValue(j, asList(2, 4, 8), valsFunction()))));
315+
node.executeInternal("INSERT INTO " + KS + ".tab6_frozen_tuple " + genInsert(pk, i, asList(1, 2 + 4 + 8, 16), valsFunction(j -> tupleValue(j, asList(2, 4, 8), valsFunction()))));
316+
}
317+
318+
for (int pk = offset; pk < offset + (1 << 2); pk++)
319+
{
320+
int i = withComplex ? (pk - offset) : (pk - offset) & ~(2 + 4 + 8 + 16 + 32);
321+
node.executeInternal("INSERT INTO " + KS + ".tab7_tuple_with_udt " + genInsert(pk, i, asList(1, 2 + 4 + 8 + 16 + 32, 64),
322+
valsFunction(j -> tupleValue(j, asList(2, 4 + 8 + 16, 32), valsFunction(k -> udtValue(k, asList(4, 8, 16), valsFunction()))))));
323+
node.executeInternal("INSERT INTO " + KS + ".tab8_frozen_tuple_with_udt " + genInsert(pk, i, asList(1, 2 + 4 + 8 + 16 + 32, 64),
324+
valsFunction(j -> tupleValue(j, asList(2, 4 + 8 + 16, 32), valsFunction(k -> udtValue(k, asList(4, 8, 16), valsFunction()))))));
325+
node.executeInternal("INSERT INTO " + KS + ".tab10_frozen_udt_with_tuple " + genInsert(pk, i, asList(1, 2 + 4 + 8 + 16 + 32, 64),
326+
valsFunction(j -> udtValue(j, asList(2, 4 + 8 + 16, 32), valsFunction(k -> tupleValue(k, asList(4, 8, 16), valsFunction()))))));
327+
}
328+
}
329+
330+
private static void dropComplexColumn(IInvokableInstance node)
331+
{
332+
List<String> tables = node.callOnInstance(() -> Schema.instance.getKeyspaceMetadata(KS).tables.stream().map(t -> t.name).collect(Collectors.toList()));
333+
for (String table : tables)
334+
node.executeInternal("ALTER TABLE " + KS + "." + table + " DROP b_complex");
335+
}
336+
337+
private Map<String, List<List<Object>>> selectData(IInvokableInstance node)
338+
{
339+
Map<String, List<List<Object>>> results = new HashMap<>();
340+
List<String> tables = node.callOnInstance(() -> Schema.instance.getKeyspaceMetadata(KS).tables.stream().map(t -> t.name).collect(Collectors.toList()));
341+
for (String table : tables)
342+
{
343+
Object[][] rows = node.executeInternal("SELECT * FROM " + KS + "." + table);
344+
Arrays.sort(rows, Comparator.comparing(a -> ((Integer) a[0])));
345+
results.put(table, Arrays.stream(rows).map(Arrays::asList).collect(Collectors.toList()));
346+
}
347+
return results;
348+
}
349+
350+
private static void createTables(IInvokableInstance node)
351+
{
352+
node.executeInternal("CREATE TYPE " + KS + ".udt1(foo int, bar text, baz int)");
353+
node.executeInternal("CREATE TYPE " + KS + ".udt3(foo int, bar tuple<int, text, int>, baz int)");
354+
355+
node.executeInternal("CREATE TABLE " + KS + ".tab2_frozen_udt1 (pk int PRIMARY KEY, a_int int, b_complex frozen<udt1>, c_int int) WITH ID = 450f91fe-7c47-41c9-97bf-fdad854fa7e5");
356+
node.executeInternal("CREATE TABLE " + KS + ".tab5_tuple (pk int PRIMARY KEY, a_int int, b_complex tuple<int, text, int>, c_int int) WITH ID = 90826dd3-8437-4585-9de4-15908236687f");
357+
node.executeInternal("CREATE TABLE " + KS + ".tab6_frozen_tuple (pk int PRIMARY KEY, a_int int, b_complex frozen<tuple<int, text, int>>, c_int int) WITH ID = 54185f9a-a6fd-487c-abc3-c01bd5835e48");
358+
node.executeInternal("CREATE TABLE " + KS + ".tab7_tuple_with_udt (pk int PRIMARY KEY, a_int int, b_complex tuple<int, udt1, int>, c_int int) WITH ID = 4e78f403-7b63-4e0d-a231-42e42cba7cb5");
359+
node.executeInternal("CREATE TABLE " + KS + ".tab8_frozen_tuple_with_udt (pk int PRIMARY KEY, a_int int, b_complex frozen<tuple<int, udt1, int>>, c_int int) WITH ID = 8660f235-0816-4019-9cc9-1798fa7beb17");
360+
node.executeInternal("CREATE TABLE " + KS + ".tab10_frozen_udt_with_tuple (pk int PRIMARY KEY, a_int int, b_complex frozen<udt3>, c_int int) WITH ID = 6a5cff4e-2f94-4c8b-9aa2-0fbd65292caa");
361+
}
362+
363+
}

0 commit comments

Comments
 (0)