Skip to content

Commit d2dda61

Browse files
authored
CNDB-14123: allows intercepting issues opening flushed sstables (#1742)
For riptano/cndb#14123, we want to be able to catch issues happening during the opening of just-flushed sstables to use shallow sstables. This commit enable this by adding a method to `StorageHandler` that is called on such issue, and allow to provide a "replacement" `SSTableReader` instance.
1 parent f7a70ff commit d2dda61

31 files changed

+394
-149
lines changed

src/java/org/apache/cassandra/db/ColumnFamilyStore.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1457,11 +1457,11 @@ public Collection<SSTableReader> flushMemtable(ColumnFamilyStore cfs, Memtable m
14571457
// This can throw on remote storage, e.g. if a file cannot be uploaded
14581458
txn.prepareToCommit();
14591459

1460-
// Open the underlying readers, the one that will be returne below by `finished()`.
1460+
// Open the underlying readers, the one that will be returned below by `finished()`.
14611461
// Currently needs to be called before commit, because committing will close a certain number
14621462
// of resources used by the writers which are accessed to open the readers.
14631463
for (SSTableMultiWriter writer : flushResults)
1464-
writer.openResult();
1464+
writer.openResult(storageHandler);
14651465
}
14661466
catch (Throwable t)
14671467
{
@@ -2579,7 +2579,7 @@ public void writeAndAddMemtableRanges(UUID repairSessionID,
25792579
{
25802580
try
25812581
{
2582-
Collection<SSTableReader> sstables = memtableContent.finish(true);
2582+
Collection<SSTableReader> sstables = memtableContent.finish(true, storageHandler);
25832583
try (Refs sstableReferences = Refs.ref(sstables))
25842584
{
25852585
// This moves all references to placeIntoRefs, clearing sstableReferences

src/java/org/apache/cassandra/db/compaction/Scrubber.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -375,7 +375,8 @@ public List<SSTableReader> scrub()
375375
{
376376
for (Partition partition : outOfOrder)
377377
inOrderWriter.append(partition.unfilteredIterator());
378-
newInOrderSstable = inOrderWriter.finish(-1, sstable.maxDataAge, true);
378+
inOrderWriter.setMaxDataAge(sstable.maxDataAge);
379+
newInOrderSstable = inOrderWriter.finish(true, null);
379380
}
380381
transaction.update(newInOrderSstable, false);
381382
finished.add(newInOrderSstable);

src/java/org/apache/cassandra/db/compaction/unified/ShardedMultiWriter.java

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import java.util.List;
2222
import java.util.UUID;
2323

24+
import javax.annotation.Nullable;
25+
2426
import org.slf4j.Logger;
2527
import org.slf4j.LoggerFactory;
2628

@@ -35,6 +37,7 @@
3537
import org.apache.cassandra.index.Index;
3638
import org.apache.cassandra.io.sstable.Descriptor;
3739
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
40+
import org.apache.cassandra.io.sstable.StorageHandler;
3841
import org.apache.cassandra.io.sstable.format.SSTableReader;
3942
import org.apache.cassandra.io.sstable.format.SSTableWriter;
4043
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
@@ -143,27 +146,14 @@ public void append(UnfilteredRowIterator partition)
143146
}
144147

145148
@Override
146-
public Collection<SSTableReader> finish(long repairedAt, long maxDataAge, boolean openResult)
147-
{
148-
List<SSTableReader> sstables = new ArrayList<>(writers.length);
149-
for (SSTableWriter writer : writers)
150-
if (writer != null)
151-
{
152-
boundaries.applyTokenSpaceCoverage(writer);
153-
sstables.add(writer.finish(repairedAt, maxDataAge, openResult));
154-
}
155-
return sstables;
156-
}
157-
158-
@Override
159-
public Collection<SSTableReader> finish(boolean openResult)
149+
public Collection<SSTableReader> finish(boolean openResult, @Nullable StorageHandler storageHandler)
160150
{
161151
List<SSTableReader> sstables = new ArrayList<>(writers.length);
162152
for (SSTableWriter writer : writers)
163153
if (writer != null)
164154
{
165155
boundaries.applyTokenSpaceCoverage(writer);
166-
sstables.add(writer.finish(openResult));
156+
sstables.add(writer.finish(openResult, storageHandler));
167157
}
168158
return sstables;
169159
}
@@ -179,11 +169,11 @@ public Collection<SSTableReader> finished()
179169
}
180170

181171
@Override
182-
public void openResult()
172+
public void openResult(@Nullable StorageHandler storageHandler)
183173
{
184174
for (SSTableWriter writer : writers)
185175
if (writer != null)
186-
writer.openResult();
176+
writer.openResult(storageHandler);
187177
}
188178

189179
@Override

src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ public synchronized void received(IncomingStream stream)
104104
SSTableMultiWriter sstable = file.getSSTable();
105105
try
106106
{
107-
finished = sstable.finish(true);
107+
finished = sstable.finish(true, cfs.getStorageHandler());
108108
}
109109
catch (Throwable t)
110110
{

src/java/org/apache/cassandra/index/sai/disk/format/DefaultIndexComponentDiscovery.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,14 @@
2525
public class DefaultIndexComponentDiscovery extends IndexComponentDiscovery
2626
{
2727
@Override
28-
public SSTableIndexComponentsState discoverComponents(SSTableReader sstable) {
29-
Descriptor descriptor = sstable.getDescriptor();
28+
public SSTableIndexComponentsState discoverComponents(SSTableReader sstable)
29+
{
30+
return discoverComponents(sstable.getDescriptor(), sstable.metadata());
31+
}
3032

33+
@Override
34+
public SSTableIndexComponentsState discoverComponents(Descriptor descriptor, TableMetadata metadata)
35+
{
3136
// Older versions might not have all components in the TOC, we should not trust it (fix for CNDB-13582):
3237
if (descriptor.version.getVersion().compareTo("ca") < 0)
3338
return discoverComponentsFromDiskFallback(descriptor);

src/java/org/apache/cassandra/index/sai/disk/format/IndexComponentDiscovery.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,14 +77,34 @@ public static IndexComponentDiscovery instance()
7777
* be used for each group of components (per-sstable and per-index).
7878
*
7979
* @param sstable the sstable reader for which to discover components.
80-
* @return the discovered {@link ComponentsBuildId} to use for both per-sstable and each per-index components. The
81-
* returned build IDs should usually correspond to existing index components on disk but this is not a strong
80+
* @return the discovered {@link ComponentsBuildId} to use for both per-sstable and each per-index component. The
81+
* returned build IDs should usually correspond to existing index components on disk, but this is not a strong
8282
* asumption: if some group of components corresponding to the returned build ID has no completion marker or is
83-
* missing files, the group will not be usuable (and the corresponding index/indexes will not be usable) but this
83+
* missing files, the group will not be usuable (and the corresponding index/indexes will not be usable), but this
8484
* should be handled "gracefully" by callers.
8585
*/
8686
public abstract SSTableIndexComponentsState discoverComponents(SSTableReader sstable);
8787

88+
/**
89+
* Returns the set of groups of SAI components that should be used for the provided sstable.
90+
* <p>
91+
* Note that "discovery" in this method only means finding out the "build ID" (version and generation) that should
92+
* be used for each group of components (per-sstable and per-index).
93+
* <p>
94+
* Please note that the {@link #discoverComponents(SSTableReader)} method should be prefered when a
95+
* {@link SSTableReader} exists as some implementations may be more optimal or use additional checks when provided
96+
* a reader.
97+
*
98+
* @param descriptor the descriptor of the sstable for which to discover components.
99+
* @param metadata the metadata of the table the sstable belongs to.
100+
* @return the discovered {@link ComponentsBuildId} to use for both per-sstable and each per-index component. The
101+
* returned build IDs should usually correspond to existing index components on disk, but this is not a strong
102+
* asumption: if some group of components corresponding to the returned build ID has no completion marker or is
103+
* missing files, the group will not be usuable (and the corresponding index/indexes will not be usable), but this
104+
* should be handled "gracefully" by callers.
105+
*/
106+
public abstract SSTableIndexComponentsState discoverComponents(Descriptor descriptor, TableMetadata metadata);
107+
88108
protected static IndexComponentType completionMarker(@Nullable String name)
89109
{
90110
return name == null ? IndexComponentType.GROUP_COMPLETION_MARKER : IndexComponentType.COLUMN_COMPLETION_MARKER;

src/java/org/apache/cassandra/io/sstable/SSTableMultiWriter.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
import java.util.Collection;
2222

23+
import javax.annotation.Nullable;
24+
2325
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
2426
import org.apache.cassandra.io.sstable.format.SSTableReader;
2527
import org.apache.cassandra.schema.TableId;
@@ -36,17 +38,21 @@ public interface SSTableMultiWriter extends Transactional
3638
*/
3739
void append(UnfilteredRowIterator partition);
3840

39-
Collection<SSTableReader> finish(long repairedAt, long maxDataAge, boolean openResult);
40-
Collection<SSTableReader> finish(boolean openResult);
41+
Collection<SSTableReader> finish(boolean openResult, @Nullable StorageHandler handler);
4142
Collection<SSTableReader> finished();
4243

4344
/**
4445
* Opens the resulting sstables after writing has finished. If those readers need to be accessed, then this must
4546
* be called after `prepareToCommit` (so the writing is complete) but before `commit` (because committing closes
4647
* some of the resources used to create the underlying readers). When used, the readers can then be accessed by
4748
* calling `finished()`.
49+
*
50+
* @param storageHandler the underlying storage handler. This is used in case of a failure opening the
51+
* SSTableReader(s) to call the `StorageHandler#onOpeningWrittenSSTableFailure` callback, which
52+
* in some implementations may attempt to recover from the error. If `null`, the said callback
53+
* will not be called on failure (and the exception will propagate).
4854
*/
49-
void openResult();
55+
void openResult(@Nullable StorageHandler storageHandler);
5056

5157
String getFilename();
5258
long getBytesWritten();

src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,7 @@ protected void doPrepare()
354354
{
355355
assert writer.getFilePointer() > 0;
356356
writer.setRepairedAt(repairedAt).prepareToCommit();
357-
writer.openResult();
357+
writer.openResult(null);
358358
SSTableReader reader = writer.finished();
359359
transaction.update(reader, false);
360360
preparedForCommit.add(reader);

src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ public void run()
260260
{
261261
for (Map.Entry<DecoratedKey, PartitionUpdate.Builder> entry : b.entrySet())
262262
writer.append(entry.getValue().build().unfilteredIterator());
263-
writer.finish(false);
263+
writer.finish(false, null);
264264
}
265265
}
266266
catch (Throwable e)

src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public void close()
8181
if (update != null)
8282
writePartition(update.build());
8383
if (writer != null)
84-
writer.finish(false);
84+
writer.finish(false, null);
8585
}
8686
catch (Throwable t)
8787
{

0 commit comments

Comments
 (0)