Skip to content

Commit b8fb452

Browse files
author
Nitsan Wakart
committed
Increase testing with large CK columns
1 parent 5258b38 commit b8fb452

File tree

1 file changed

+146
-1
lines changed

1 file changed

+146
-1
lines changed

test/unit/org/apache/cassandra/db/compaction/simple/CompactionDeleteAndPurgeRowTest.java

Lines changed: 146 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,7 @@ public void testLargeRowDeleteCompactionInterleaving() throws Throwable
409409
int prefix = 0;
410410
long timestamp = 0;
411411
// Writes, 3 rows in each partition
412-
byte[] blob = new byte[DatabaseDescriptor.getColumnIndexSize(BigFormatPartitionWriter.DEFAULT_GRANULARITY)];
412+
byte[] blob = new byte[DatabaseDescriptor.getColumnIndexCacheSize()*2];
413413
ByteBuffer byteBuffer = ByteBuffer.wrap(blob);
414414
ThreadLocalRandom.current().nextBytes(blob);
415415
for (int i = 0; i < 4; i++)
@@ -453,6 +453,117 @@ public void testLargeRowDeleteCompactionInterleaving() throws Throwable
453453
);
454454

455455
cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.USER_FORCED);
456+
457+
Thread.sleep(1000);
458+
cfs.forceMajorCompaction();
459+
SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
460+
verifyAndPrint(cfs, sstable);
461+
try(ISSTableScanner scanner = sstable.getScanner())
462+
{
463+
while (scanner.hasNext()) {
464+
UnfilteredRowIterator partition = scanner.next();
465+
long pk = partition.partitionKey().getKey().getLong();
466+
if (pk == 5) {
467+
fail();
468+
}
469+
else if (pk % 2 == 0)
470+
{
471+
assertTrue("pk="+pk,partition.hasNext());
472+
assertTrue("pk="+pk,partition.partitionLevelDeletion().isLive());
473+
assertTrue("pk="+pk,!partition.staticRow().isEmpty());
474+
// only have 2 live rows
475+
Unfiltered row = partition.next();
476+
assertTrue("pk=" + pk, !row.isEmpty());
477+
assertTrue("pk="+pk,((Row)row).deletion().time().isLive());
478+
479+
row = partition.next();
480+
assertTrue("pk=" + pk, !row.isEmpty());
481+
assertTrue("pk="+pk,((Row)row).deletion().time().isLive());
482+
483+
assertTrue("pk="+pk,!partition.hasNext());
484+
}
485+
else
486+
{
487+
assertTrue("pk="+pk,partition.hasNext());
488+
assertTrue("pk="+pk,partition.partitionLevelDeletion().isLive());
489+
assertTrue("pk="+pk,!partition.staticRow().isEmpty());
490+
assertTrue("pk="+pk,!partition.next().isEmpty());
491+
assertTrue("pk="+pk,!partition.next().isEmpty());
492+
assertTrue("pk="+pk,!partition.next().isEmpty());
493+
assertTrue("pk="+pk,!partition.hasNext());
494+
}
495+
}
496+
}
497+
try (KeyReader it = sstable.keyReader())
498+
{
499+
ByteBuffer last = it.key();
500+
while (it.advance()) last = it.key(); // no-op, just check if index is readable
501+
if (!Objects.equals(last, sstable.getLast().getKey()))
502+
throw new CorruptSSTableException(new IOException("Failed to read partition index"), it.toString());
503+
}
504+
}
505+
506+
@Test
507+
public void testLargeCKRowDeleteCompactionInterleaving() throws Throwable
508+
{
509+
String keyspace = createKeyspace("CREATE KEYSPACE %s with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 } and durable_writes = false");
510+
String table = createTable(keyspace, "CREATE TABLE %s ( pk bigint, sc1 bigint static, sc2 int static, ck1 bigint, ck2 blob, c1 bigint, c2 int, PRIMARY KEY(pk, ck1, ck2)) with gc_grace_seconds=0");
511+
execute("use " + keyspace + ";");
512+
Keyspace.system().forEach(k -> k.getColumnFamilyStores().forEach(c -> c.disableAutoCompaction()));
513+
514+
ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
515+
cfs.disableAutoCompaction();
516+
517+
String writeStatement1 = "INSERT INTO " + table + "(pk,sc1,sc2, ck1,ck2, c1,c2)VALUES(?, ?,?, ?,?, ?,?) using timestamp ?";
518+
String writeStatement2 = "INSERT INTO " + table + "(pk,ck1,ck2, c1,c2)VALUES(?, ?,?, ?,?) using timestamp ?";
519+
int prefix = 0;
520+
long timestamp = 0;
521+
// Writes, 3 rows in each partition
522+
byte[] blob = new byte[DatabaseDescriptor.getColumnIndexCacheSize()*2];
523+
ByteBuffer byteBuffer = ByteBuffer.wrap(blob);
524+
ThreadLocalRandom.current().nextBytes(blob);
525+
for (int i = 0; i < 4; i++)
526+
{
527+
execute(writeStatement1,
528+
Long.valueOf(i), //pk
529+
Long.valueOf(111), Integer.valueOf(222),//sc1,sc2
530+
Long.valueOf(1), byteBuffer,//ck1,ck2
531+
Long.valueOf(prefix+i), Integer.valueOf(1),//c1,c2
532+
timestamp++);
533+
execute(writeStatement2,
534+
Long.valueOf(i), //pk
535+
Long.valueOf(2), byteBuffer,//ck1,ck2
536+
Long.valueOf(prefix+i), Integer.valueOf(1),//c1,c2
537+
timestamp++);
538+
execute(writeStatement2,
539+
Long.valueOf(i), //pk
540+
Long.valueOf(3), byteBuffer, //ck1,ck2
541+
Long.valueOf(prefix+i), Integer.valueOf(1), //c1,c2
542+
timestamp++);
543+
}
544+
cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.USER_FORCED);
545+
546+
// delete rows [0,2,bb] and [2,2,bb]
547+
for (int i = 0; i < 4; i+=2)
548+
{
549+
execute("DELETE FROM " + table + " using timestamp ? WHERE pk = ? AND ck1 = ? AND ck2 = ?;",
550+
Long.valueOf(timestamp + i), // timestamp
551+
Long.valueOf(i), //pk
552+
Long.valueOf(2), //ck1
553+
byteBuffer //ck2
554+
);
555+
}
556+
557+
// delete a partition + row that we don't have
558+
execute("DELETE FROM " + table + " using timestamp ? WHERE pk = ? AND ck1 = ? AND ck2 = ?;",
559+
Long.valueOf(timestamp + 5), // timestamp
560+
Long.valueOf(5), //pk
561+
Long.valueOf(11), //ck1
562+
byteBuffer //ck2
563+
);
564+
565+
cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.USER_FORCED);
566+
456567
Thread.sleep(1000);
457568
cfs.forceMajorCompaction();
458569
SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
@@ -502,6 +613,40 @@ else if (pk % 2 == 0)
502613
}
503614
}
504615

616+
@Test
617+
public void testSingleLargeCKRow() throws Throwable
618+
{
619+
String keyspace = createKeyspace("CREATE KEYSPACE %s with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 } and durable_writes = false");
620+
String table = createTable(keyspace, "CREATE TABLE %s ( pk bigint, ck1 bigint, ck2 blob, c1 bigint, c2 int, PRIMARY KEY(pk, ck1, ck2)) with gc_grace_seconds=0");
621+
execute("use " + keyspace + ";");
622+
Keyspace.system().forEach(k -> k.getColumnFamilyStores().forEach(c -> c.disableAutoCompaction()));
623+
624+
ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
625+
cfs.disableAutoCompaction();
626+
627+
String writeStatement2 = "INSERT INTO " + table + "(pk,ck1,ck2, c1,c2)VALUES(?, ?,?, ?,?) using timestamp ?";
628+
int prefix = 0;
629+
long timestamp = 0;
630+
// Writes, 3 rows in each partition
631+
byte[] blob = new byte[DatabaseDescriptor.getColumnIndexCacheSize()*2];
632+
ByteBuffer byteBuffer = ByteBuffer.wrap(blob);
633+
ThreadLocalRandom.current().nextBytes(blob);
634+
for (int i = 0; i < 1; i++)
635+
{
636+
execute(writeStatement2,
637+
Long.valueOf(i), //pk
638+
Long.valueOf(2), byteBuffer,//ck1,ck2
639+
Long.valueOf(prefix+i), Integer.valueOf(1),//c1,c2
640+
timestamp++);
641+
}
642+
cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.USER_FORCED);
643+
644+
Thread.sleep(1000);
645+
cfs.forceMajorCompaction();
646+
SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
647+
verifyAndPrint(cfs, sstable);
648+
}
649+
505650
private static void verifyAndPrint(ColumnFamilyStore cfs, SSTableReader sstable) throws IOException
506651
{
507652
try (IVerifier verifier = new BigTableVerifier(cfs, (BigTableReader) sstable, new OutputHandler.LogOutput(), false, IVerifier.options().invokeDiskFailurePolicy(true).extendedVerification(true).build()))

0 commit comments

Comments
 (0)