Skip to content

Commit 6e759fa

Browse files
blambovdjatnieks
authored andcommitted
CNDB-13163: Fix incorrect data file length for early open sstables (#1707)
### What is the issue Failing `EarlyOpenCachingTest` due to incorrect assignment of data file length. ### What does this PR fix and why was it fixed Passes the correct lengths to the early open sstable construction. This lets `SimpleSSTableScanner` correctly finish iteration. This was already fixed in CC (as part of CNDB-9104).
1 parent d5896b9 commit 6e759fa

File tree

8 files changed

+185
-39
lines changed

8 files changed

+185
-39
lines changed

src/java/org/apache/cassandra/db/lifecycle/WrappedLifecycleTransaction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
public class WrappedLifecycleTransaction implements ILifecycleTransaction
3030
{
3131

32-
final ILifecycleTransaction delegate;
32+
protected final ILifecycleTransaction delegate;
3333
public WrappedLifecycleTransaction(ILifecycleTransaction delegate)
3434
{
3535
this.delegate = delegate;

src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableWriter.java

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ protected TrieIndexEntry createRowIndexEntry(DecoratedKey key, DeletionTime part
8282
}
8383

8484
@SuppressWarnings({ "resource", "RedundantSuppression" }) // dataFile is closed along with the reader
85-
private BtiTableReader openInternal(OpenReason openReason, boolean isFinal, Supplier<PartitionIndex> partitionIndexSupplier)
85+
private BtiTableReader openInternal(OpenReason openReason, long lengthOverride, Supplier<PartitionIndex> partitionIndexSupplier)
8686
{
8787
IFilter filter = null;
8888
FileHandle dataFile = null;
@@ -99,7 +99,7 @@ private BtiTableReader openInternal(OpenReason openReason, boolean isFinal, Supp
9999

100100
partitionIndex = partitionIndexSupplier.get();
101101
rowIndexFile = indexWriter.rowIndexFHBuilder.complete();
102-
dataFile = openDataFile(isFinal ? NO_LENGTH_OVERRIDE : dataWriter.getLastFlushOffset(), builder.getStatsMetadata());
102+
dataFile = openDataFile(lengthOverride, builder.getStatsMetadata());
103103
filter = indexWriter.getFilterCopy();
104104

105105
return builder.setPartitionIndex(partitionIndex)
@@ -121,11 +121,12 @@ private BtiTableReader openInternal(OpenReason openReason, boolean isFinal, Supp
121121
@Override
122122
public void openEarly(Consumer<SSTableReader> callWhenReady)
123123
{
124-
long dataLength = dataWriter.position();
124+
// Because the partition index writer is one partition behind, we want the file to stop at the start of the
125+
// last partition that was written.
126+
long dataLength = partitionWriter.getInitialPosition();
125127
indexWriter.buildPartial(dataLength, partitionIndex ->
126128
{
127-
indexWriter.rowIndexFHBuilder.withLengthOverride(indexWriter.rowIndexWriter.getLastFlushOffset());
128-
BtiTableReader reader = openInternal(OpenReason.EARLY, false, () -> partitionIndex);
129+
BtiTableReader reader = openInternal(OpenReason.EARLY, dataLength, () -> partitionIndex);
129130
callWhenReady.accept(reader);
130131
});
131132
}
@@ -151,7 +152,7 @@ protected SSTableReader openFinal(OpenReason openReason)
151152
if (maxDataAge < 0)
152153
maxDataAge = Clock.Global.currentTimeMillis();
153154

154-
return openInternal(openReason, true, indexWriter::completedPartitionIndex);
155+
return openInternal(openReason, NO_LENGTH_OVERRIDE, indexWriter::completedPartitionIndex);
155156
}
156157

157158
/**
@@ -214,7 +215,14 @@ public long append(DecoratedKey key, AbstractRowIndexEntry indexEntry) throws IO
214215

215216
public boolean buildPartial(long dataPosition, Consumer<PartitionIndex> callWhenReady)
216217
{
217-
return partitionIndex.buildPartial(callWhenReady, rowIndexWriter.position(), dataPosition);
218+
long rowIndexPosition = rowIndexWriter.position();
219+
return partitionIndex.buildPartial(partitionIndex ->
220+
{
221+
rowIndexFHBuilder.withLengthOverride(rowIndexPosition);
222+
callWhenReady.accept(partitionIndex);
223+
rowIndexFHBuilder.withLengthOverride(NO_LENGTH_OVERRIDE);
224+
},
225+
rowIndexPosition, dataPosition);
218226
}
219227

220228
public void mark()
@@ -262,8 +270,8 @@ void complete() throws FSWriteError
262270
PartitionIndex completedPartitionIndex()
263271
{
264272
complete();
265-
rowIndexFHBuilder.withLengthOverride(0);
266-
partitionIndexFHBuilder.withLengthOverride(0);
273+
rowIndexFHBuilder.withLengthOverride(NO_LENGTH_OVERRIDE);
274+
partitionIndexFHBuilder.withLengthOverride(NO_LENGTH_OVERRIDE);
267275
try
268276
{
269277
return PartitionIndex.load(partitionIndexFHBuilder, metadata.getLocal().partitioner, false, descriptor.version.getByteComparableVersion());

src/java/org/apache/cassandra/io/sstable/format/bti/PartitionIndexBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ private void refreshReadableBoundary()
125125
}
126126
finally
127127
{
128-
fhBuilder.withLengthOverride(-1);
128+
fhBuilder.withLengthOverride(FileHandle.Builder.NO_LENGTH_OVERRIDE);
129129
}
130130

131131
}

src/java/org/apache/cassandra/io/util/FileHandle.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -442,7 +442,7 @@ public FileHandle complete(Function<File, ChannelProxy> channelProxyFactory)
442442
channel = channelProxyFactory.apply(file);
443443

444444
long fileLength = (compressionMetadata != null) ? compressionMetadata.compressedFileLength : channel.size();
445-
long length = lengthOverride > 0 ? lengthOverride : fileLength;
445+
long length = lengthOverride >= 0 ? lengthOverride : fileLength;
446446

447447
RebuffererFactory rebuffererFactory;
448448
if (length == 0)

test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,7 @@ private List<SSTableReader> createSSTables(ColumnFamilyStore cfs, int count, int
385385
{
386386
long first = i * 10;
387387
long last = (i + 1) * 10 - 1;
388-
sstables.add(MockSchema.sstable(startGeneration + i, 0, true, first, last, cfs));
388+
sstables.add(MockSchema.sstable(startGeneration + i, -1, true, first, last, cfs));
389389
}
390390
cfs.disableAutoCompaction();
391391
cfs.addSSTables(sstables);

test/unit/org/apache/cassandra/io/sstable/EarlyOpenCachingTest.java

Lines changed: 3 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import org.apache.cassandra.config.DatabaseDescriptor;
3737
import org.apache.cassandra.cql3.CQLTester;
3838
import org.apache.cassandra.db.ColumnFamilyStore;
39-
import org.apache.cassandra.db.DecoratedKey;
4039
import org.apache.cassandra.db.compaction.OperationType;
4140
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
4241
import org.apache.cassandra.io.sstable.format.SSTableFormat;
@@ -85,7 +84,7 @@ public void testFinalOpenRetainsCachedData() throws InterruptedException
8584
{
8685
String pkey = getRandom().nextAsciiString(10, 10);
8786
for (int j = 0; j < 100; j++)
88-
execute("INSERT INTO %s (pkey, ckey, val) VALUES (?, ?, ?)", pkey, "" + j, ByteBuffer.allocate(1000));
87+
execute("INSERT INTO %s (pkey, ckey, val) VALUES (?, ?, ?)", pkey, "" + j, ByteBuffer.allocate(300));
8988
}
9089
flush();
9190
ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
@@ -102,7 +101,7 @@ public void testFinalOpenRetainsCachedData() throws InterruptedException
102101
return;
103102
SSTableAddingNotification n = (SSTableAddingNotification) notification;
104103
SSTableReader s = n.adding.iterator().next();
105-
readAllAndVerifyKeySpan(s);
104+
EarlyOpenIterationTest.readAllAndVerifyKeySpan(s);
106105
assertTrue("Chunk cache is not used",
107106
ChunkCache.instance.sizeOfFile(s.getDataFile()) > 0);
108107
phaser.register();
@@ -131,25 +130,6 @@ public void testFinalOpenRetainsCachedData() throws InterruptedException
131130
assertTrue("Chunk cache is not retained for early open sstable",
132131
ChunkCache.instance.sizeOfFile(finalReader.getDataFile()) > 0);
133132
assertEquals(Sets.newHashSet(finalReader), cfs.getLiveSSTables());
134-
readAllAndVerifyKeySpan(finalReader);
135-
}
136-
137-
private static void readAllAndVerifyKeySpan(SSTableReader s)
138-
{
139-
DecoratedKey firstKey = null;
140-
DecoratedKey lastKey = null;
141-
for (var iter = s.getScanner(); iter.hasNext(); )
142-
{
143-
var partition = iter.next();
144-
// consume all rows, so that the data is cached
145-
partition.forEachRemaining(column -> {
146-
// consume all columns
147-
});
148-
if (firstKey == null)
149-
firstKey = partition.partitionKey();
150-
lastKey = partition.partitionKey();
151-
}
152-
assertEquals("Simple scanner does not iterate all content", firstKey, s.first);
153-
assertEquals("Simple scanner does not iterate all content", lastKey, s.last);
133+
EarlyOpenIterationTest.readAllAndVerifyKeySpan(finalReader);
154134
}
155135
}
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.io.sstable;
20+
21+
import java.nio.ByteBuffer;
22+
import java.util.Collection;
23+
import java.util.Random;
24+
import java.util.concurrent.atomic.AtomicInteger;
25+
import java.util.function.Consumer;
26+
27+
import com.google.common.collect.Lists;
28+
import com.google.common.collect.Sets;
29+
import org.junit.BeforeClass;
30+
import org.junit.Test;
31+
import org.junit.runner.RunWith;
32+
import org.junit.runners.Parameterized;
33+
34+
import com.carrotsearch.randomizedtesting.generators.RandomStrings;
35+
import org.apache.cassandra.config.Config;
36+
import org.apache.cassandra.config.DatabaseDescriptor;
37+
import org.apache.cassandra.cql3.CQLTester;
38+
import org.apache.cassandra.db.ColumnFamilyStore;
39+
import org.apache.cassandra.db.DecoratedKey;
40+
import org.apache.cassandra.db.compaction.OperationType;
41+
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
42+
import org.apache.cassandra.db.lifecycle.WrappedLifecycleTransaction;
43+
import org.apache.cassandra.io.sstable.format.SSTableFormat;
44+
import org.apache.cassandra.io.sstable.format.SSTableReader;
45+
46+
import static org.apache.cassandra.config.CassandraRelevantProperties.SSTABLE_FORMAT_DEFAULT;
47+
import static org.junit.Assert.assertEquals;
48+
import static org.junit.Assert.assertTrue;
49+
50+
@RunWith(Parameterized.class)
51+
public class EarlyOpenIterationTest extends CQLTester
52+
{
53+
@Parameterized.Parameters(name = "format={0}")
54+
public static Collection<Object> generateParameters()
55+
{
56+
// We need to set up the class here, as the parameterized test runner will not call the @BeforeClass method
57+
paramaterizedSetUpClass();
58+
59+
return Lists.newArrayList(DatabaseDescriptor.getSSTableFormats().values());
60+
}
61+
62+
public static void paramaterizedSetUpClass()
63+
{
64+
CQLTester.setUpClass();
65+
DatabaseDescriptor.setDiskAccessMode(Config.DiskAccessMode.standard);
66+
}
67+
68+
Random rand = new Random();
69+
70+
@Parameterized.Parameter
71+
public SSTableFormat<?, ?> format = DatabaseDescriptor.getSelectedSSTableFormat();
72+
73+
@BeforeClass
74+
public static void setUpClass() // override CQLTester's setUpClass
75+
{
76+
// No-op, as initialization was done in paramaterizedSetUpClass, and we don't want to call CQLTester.setUpClass again
77+
}
78+
79+
@Test
80+
public void testFinalOpenIteration() throws InterruptedException
81+
{
82+
SSTABLE_FORMAT_DEFAULT.setString(format.name());
83+
createTable("CREATE TABLE %s (pkey text, ckey text, val blob, PRIMARY KEY (pkey, ckey))");
84+
85+
for (int i = 0; i < 800; i++)
86+
{
87+
String pkey = RandomStrings.randomAsciiOfLengthBetween(rand, 10, 10);
88+
for (int j = 0; j < 100; j++)
89+
execute("INSERT INTO %s (pkey, ckey, val) VALUES (?, ?, ?)", pkey, "" + j, ByteBuffer.allocate(300));
90+
}
91+
flush();
92+
ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
93+
94+
AtomicInteger opened = new AtomicInteger(0);
95+
assertEquals(1, cfs.getLiveSSTables().size());
96+
SSTableReader source = cfs.getLiveSSTables().iterator().next();
97+
98+
Consumer<Iterable<SSTableReader>> consumer = current -> {
99+
for (SSTableReader s : current)
100+
{
101+
readAllAndVerifyKeySpan(s);
102+
if (s.openReason == SSTableReader.OpenReason.EARLY)
103+
opened.incrementAndGet();
104+
}
105+
};
106+
107+
SSTableReader finalReader;
108+
try (WrappedLifecycleTransaction txn = new WrappedLifecycleTransaction(cfs.getTracker().tryModify(source, OperationType.COMPACTION))
109+
{
110+
@Override
111+
public void checkpoint()
112+
{
113+
consumer.accept(((LifecycleTransaction) delegate).current());
114+
super.checkpoint();
115+
}
116+
};
117+
SSTableRewriter writer = new SSTableRewriter(txn, 1000, 100L << 10, false))
118+
{
119+
writer.switchWriter(SSTableWriterTestBase.getWriter(format, cfs, cfs.getDirectories().getDirectoryForNewSSTables(), txn));
120+
var iter = source.getScanner();
121+
while (iter.hasNext())
122+
{
123+
var next = iter.next();
124+
writer.append(next);
125+
}
126+
finalReader = writer.finish().iterator().next();
127+
}
128+
assertTrue("No early opening occured", opened.get() > 0);
129+
130+
assertEquals(Sets.newHashSet(finalReader), cfs.getLiveSSTables());
131+
readAllAndVerifyKeySpan(finalReader);
132+
}
133+
134+
static void readAllAndVerifyKeySpan(SSTableReader s)
135+
{
136+
DecoratedKey firstKey = null;
137+
DecoratedKey lastKey = null;
138+
try (var iter = s.getScanner())
139+
{
140+
while (iter.hasNext())
141+
{
142+
try (var partition = iter.next())
143+
{
144+
// consume all rows, so that the data is cached
145+
partition.forEachRemaining(column -> {
146+
// consume all columns
147+
});
148+
if (firstKey == null)
149+
firstKey = partition.partitionKey();
150+
lastKey = partition.partitionKey();
151+
}
152+
}
153+
}
154+
assertEquals("Simple scanner does not iterate all content", s.getFirst(), firstKey);
155+
assertEquals("Simple scanner does not iterate all content", s.getLast(), lastKey);
156+
}
157+
}

test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.cassandra.db.Keyspace;
3737
import org.apache.cassandra.db.SerializationHeader;
3838
import org.apache.cassandra.db.compaction.CompactionManager;
39+
import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
3940
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
4041
import org.apache.cassandra.db.rows.EncodingStats;
4142
import org.apache.cassandra.exceptions.ConfigurationException;
@@ -154,7 +155,7 @@ public static void validateCFS(ColumnFamilyStore cfs)
154155
assertFalse(CompactionManager.instance.submitMaximal(cfs, cfs.gcBefore((int) (System.currentTimeMillis() / 1000)), false).isEmpty());
155156
}
156157

157-
public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory, LifecycleTransaction txn, long repairedAt, TimeUUID pendingRepair, boolean isTransient)
158+
public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory, ILifecycleTransaction txn, long repairedAt, TimeUUID pendingRepair, boolean isTransient)
158159
{
159160
Descriptor desc = cfs.newSSTableDescriptor(directory);
160161
return desc.getFormat().getWriterFactory().builder(desc)
@@ -170,12 +171,12 @@ public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory, Lif
170171
.build(txn, cfs);
171172
}
172173

173-
public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory, LifecycleTransaction txn)
174+
public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory, ILifecycleTransaction txn)
174175
{
175176
return getWriter(cfs, directory, txn, 0, null, false);
176177
}
177178

178-
public static SSTableWriter getWriter(SSTableFormat<?, ?> format, ColumnFamilyStore cfs, File directory, LifecycleTransaction txn)
179+
public static SSTableWriter getWriter(SSTableFormat<?, ?> format, ColumnFamilyStore cfs, File directory, ILifecycleTransaction txn)
179180
{
180181
Descriptor desc = cfs.newSSTableDescriptor(directory, format);
181182
return desc.getFormat().getWriterFactory().builder(desc)

0 commit comments

Comments
 (0)