Skip to content

Commit f6c1002

Browse files
krummasmaedhroz
authored andcommitted
Stream individual files in their own transactions and hand over ownership to a parent transaction on completion
patch by Marcus Eriksson; reviewed by Caleb Rackliffe and Jon Meredith for CASSANDRA-20728
1 parent 7b140f6 commit f6c1002

File tree

54 files changed

+970
-280
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+970
-280
lines changed

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
5.1
2+
* Stream individual files in their own transactions and hand over ownership to a parent transaction on completion (CASSANDRA-20728)
23
* Limit the number of held heap dumps to not consume disk space excessively (CASSANDRA-20457)
34
* Accord: BEGIN TRANSACTIONs IF condition logic does not properly support meaningless emptiness and null values (CASSANDRA-20667)
45
* Accord: startup race condition where accord journal tries to access the 2i index before its ready (CASSANDRA-20686)

src/java/org/apache/cassandra/db/ColumnFamilyStore.java

Lines changed: 61 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@
8585
import org.apache.cassandra.db.compaction.OperationType;
8686
import org.apache.cassandra.db.filter.ClusteringIndexFilter;
8787
import org.apache.cassandra.db.filter.DataLimits;
88-
import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
88+
import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
8989
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
9090
import org.apache.cassandra.db.lifecycle.SSTableSet;
9191
import org.apache.cassandra.db.lifecycle.Tracker;
@@ -652,19 +652,19 @@ public boolean streamFromMemtable()
652652
return memtableFactory.streamFromMemtable();
653653
}
654654

655-
public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, TimeUUID pendingRepair, boolean isTransient, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker)
655+
public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, TimeUUID pendingRepair, boolean isTransient, SerializationHeader header, ILifecycleTransaction txn)
656656
{
657-
return createSSTableMultiWriter(descriptor, keyCount, repairedAt, pendingRepair, isTransient, null, 0, header, lifecycleNewTracker);
657+
return createSSTableMultiWriter(descriptor, keyCount, repairedAt, pendingRepair, isTransient, null, 0, header, txn);
658658
}
659659

660-
public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, TimeUUID pendingRepair, boolean isTransient, IntervalSet<CommitLogPosition> commitLogPositions, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker)
660+
public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, TimeUUID pendingRepair, boolean isTransient, IntervalSet<CommitLogPosition> commitLogPositions, SerializationHeader header, ILifecycleTransaction txn)
661661
{
662-
return createSSTableMultiWriter(descriptor, keyCount, repairedAt, pendingRepair, isTransient, commitLogPositions, 0, header, lifecycleNewTracker);
662+
return createSSTableMultiWriter(descriptor, keyCount, repairedAt, pendingRepair, isTransient, commitLogPositions, 0, header, txn);
663663
}
664664

665-
public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, TimeUUID pendingRepair, boolean isTransient, IntervalSet<CommitLogPosition> commitLogPositions, int sstableLevel, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker)
665+
public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, TimeUUID pendingRepair, boolean isTransient, IntervalSet<CommitLogPosition> commitLogPositions, int sstableLevel, SerializationHeader header, ILifecycleTransaction txn)
666666
{
667-
return getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, repairedAt, pendingRepair, isTransient, commitLogPositions, sstableLevel, header, indexManager.listIndexGroups(), lifecycleNewTracker);
667+
return getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, repairedAt, pendingRepair, isTransient, commitLogPositions, sstableLevel, header, indexManager.listIndexGroups(), txn);
668668
}
669669

670670
public boolean supportsEarlyOpen()
@@ -2410,22 +2410,67 @@ private SSTableMultiWriter writeMemtableRanges(Supplier<Collection<Range<Partiti
24102410
}
24112411
}
24122412

2413-
private static final LifecycleNewTracker DO_NOT_TRACK = new LifecycleNewTracker()
2413+
private static final ILifecycleTransaction DO_NOT_TRACK = new ILifecycleTransaction()
24142414
{
2415-
public void trackNew(SSTable table)
2415+
@Override
2416+
public void trackNew(SSTable sstable)
24162417
{
2417-
// not tracking
2418-
}
24192418

2420-
public void untrackNew(SSTable table)
2421-
{
2422-
// not tracking
24232419
}
24242420

2425-
public OperationType opType()
2421+
@Override
2422+
public void untrackNew(SSTable sstable)
24262423
{
2427-
return OperationType.FLUSH;
2424+
24282425
}
2426+
2427+
@Override
2428+
public OperationType opType() {return null;}
2429+
2430+
@Override
2431+
public void checkpoint() {}
2432+
2433+
@Override
2434+
public void update(SSTableReader reader, boolean original) {}
2435+
2436+
@Override
2437+
public void update(Collection<SSTableReader> readers, boolean original) {}
2438+
2439+
@Override
2440+
public SSTableReader current(SSTableReader reader) {return null;}
2441+
2442+
@Override
2443+
public void obsolete(SSTableReader reader) {}
2444+
2445+
@Override
2446+
public void obsoleteOriginals() {}
2447+
2448+
@Override
2449+
public Set<SSTableReader> originals() {return Set.of();}
2450+
2451+
@Override
2452+
public boolean isObsolete(SSTableReader reader) {return false;}
2453+
2454+
@Override
2455+
public boolean isOffline() {return false;}
2456+
2457+
@Override
2458+
public TimeUUID opId() {return null;}
2459+
2460+
@Override
2461+
public void cancel(SSTableReader removedSSTable) {}
2462+
2463+
@Override
2464+
public Throwable commit(Throwable accumulate) {return null;}
2465+
2466+
@Override
2467+
public Throwable abort(Throwable accumulate) {return null;}
2468+
2469+
@Override
2470+
public void prepareToCommit() {}
2471+
2472+
@Override
2473+
public void close() {}
24292474
};
24302475

24312476
/**

src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
import org.apache.cassandra.db.SerializationHeader;
3737
import org.apache.cassandra.db.commitlog.CommitLogPosition;
3838
import org.apache.cassandra.db.commitlog.IntervalSet;
39-
import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
39+
import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
4040
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
4141
import org.apache.cassandra.dht.Range;
4242
import org.apache.cassandra.dht.Token;
@@ -562,7 +562,7 @@ public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor,
562562
int sstableLevel,
563563
SerializationHeader header,
564564
Collection<Index.Group> indexGroups,
565-
LifecycleNewTracker lifecycleNewTracker)
565+
ILifecycleTransaction txn)
566566
{
567567
return SimpleSSTableMultiWriter.create(descriptor,
568568
keyCount,
@@ -574,7 +574,7 @@ public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor,
574574
sstableLevel,
575575
header,
576576
indexGroups,
577-
lifecycleNewTracker, cfs);
577+
txn, cfs);
578578
}
579579

580580
public boolean supportsEarlyOpen()

src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import org.apache.cassandra.db.SerializationHeader;
3232
import org.apache.cassandra.db.commitlog.CommitLogPosition;
3333
import org.apache.cassandra.db.commitlog.IntervalSet;
34-
import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
34+
import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
3535
import org.apache.cassandra.dht.Range;
3636
import org.apache.cassandra.dht.Token;
3737
import org.apache.cassandra.index.Index;
@@ -200,7 +200,7 @@ public abstract SSTableMultiWriter createSSTableMultiWriter(Descriptor descripto
200200
int sstableLevel,
201201
SerializationHeader header,
202202
Collection<Index.Group> indexGroups,
203-
LifecycleNewTracker lifecycleNewTracker);
203+
ILifecycleTransaction txn);
204204

205205
/**
206206
* Return the directory index the given compaction strategy belongs to, or -1

src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import org.apache.cassandra.db.SerializationHeader;
3030
import org.apache.cassandra.db.commitlog.CommitLogPosition;
3131
import org.apache.cassandra.db.commitlog.IntervalSet;
32-
import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
32+
import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
3333
import org.apache.cassandra.dht.Range;
3434
import org.apache.cassandra.dht.Token;
3535
import org.apache.cassandra.index.Index;
@@ -230,7 +230,7 @@ public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor,
230230
int sstableLevel,
231231
SerializationHeader header,
232232
Collection<Index.Group> indexGroups,
233-
LifecycleNewTracker lifecycleNewTracker)
233+
ILifecycleTransaction txn)
234234
{
235235
if (isRepaired)
236236
{
@@ -255,7 +255,7 @@ public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor,
255255
sstableLevel,
256256
header,
257257
indexGroups,
258-
lifecycleNewTracker);
258+
txn);
259259
}
260260

261261
@Override

src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
import org.apache.cassandra.db.commitlog.IntervalSet;
5555
import org.apache.cassandra.db.compaction.AbstractStrategyHolder.TaskSupplier;
5656
import org.apache.cassandra.db.compaction.PendingRepairManager.CleanupTask;
57-
import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
57+
import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
5858
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
5959
import org.apache.cassandra.db.lifecycle.SSTableSet;
6060
import org.apache.cassandra.dht.Range;
@@ -1257,7 +1257,7 @@ public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor,
12571257
int sstableLevel,
12581258
SerializationHeader header,
12591259
Collection<Index.Group> indexGroups,
1260-
LifecycleNewTracker lifecycleNewTracker)
1260+
ILifecycleTransaction txn)
12611261
{
12621262
SSTable.validateRepairedMetadata(repairedAt, pendingRepair, isTransient);
12631263
maybeReloadDiskBoundaries();
@@ -1273,7 +1273,7 @@ public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor,
12731273
sstableLevel,
12741274
header,
12751275
indexGroups,
1276-
lifecycleNewTracker);
1276+
txn);
12771277
}
12781278
finally
12791279
{

src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import org.apache.cassandra.db.SerializationHeader;
3131
import org.apache.cassandra.db.commitlog.CommitLogPosition;
3232
import org.apache.cassandra.db.commitlog.IntervalSet;
33-
import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
33+
import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
3434
import org.apache.cassandra.dht.Range;
3535
import org.apache.cassandra.dht.Token;
3636
import org.apache.cassandra.index.Index;
@@ -250,7 +250,7 @@ public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor,
250250
int sstableLevel,
251251
SerializationHeader header,
252252
Collection<Index.Group> indexGroups,
253-
LifecycleNewTracker lifecycleNewTracker)
253+
ILifecycleTransaction txn)
254254
{
255255
Preconditions.checkArgument(repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE,
256256
"PendingRepairHolder can't create sstablewriter with repaired at set");
@@ -267,7 +267,7 @@ public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor,
267267
sstableLevel,
268268
header,
269269
indexGroups,
270-
lifecycleNewTracker);
270+
txn);
271271
}
272272

273273
@Override

src/java/org/apache/cassandra/db/compaction/UnifiedCompactionStrategy.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
import org.apache.cassandra.db.compaction.unified.ShardedMultiWriter;
4747
import org.apache.cassandra.db.compaction.unified.UnifiedCompactionTask;
4848
import org.apache.cassandra.db.lifecycle.CompositeLifecycleTransaction;
49-
import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
49+
import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
5050
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
5151
import org.apache.cassandra.db.lifecycle.PartialLifecycleTransaction;
5252
import org.apache.cassandra.exceptions.ConfigurationException;
@@ -303,7 +303,7 @@ public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor,
303303
int sstableLevel,
304304
SerializationHeader header,
305305
Collection<Index.Group> indexGroups,
306-
LifecycleNewTracker lifecycleNewTracker)
306+
ILifecycleTransaction txn)
307307
{
308308
ShardManager shardManager = getShardManager();
309309
double flushDensity = cfs.metric.flushSizeOnDisk.get() * shardManager.shardSetCoverage() / shardManager.localSpaceCoverage();
@@ -317,7 +317,7 @@ public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor,
317317
commitLogPositions,
318318
header,
319319
indexGroups,
320-
lifecycleNewTracker,
320+
txn,
321321
boundaries);
322322
}
323323

src/java/org/apache/cassandra/db/compaction/unified/ShardedMultiWriter.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import org.apache.cassandra.db.commitlog.CommitLogPosition;
3232
import org.apache.cassandra.db.commitlog.IntervalSet;
3333
import org.apache.cassandra.db.compaction.ShardTracker;
34-
import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
34+
import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
3535
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
3636
import org.apache.cassandra.index.Index;
3737
import org.apache.cassandra.io.sstable.Descriptor;
@@ -64,7 +64,7 @@ public class ShardedMultiWriter implements SSTableMultiWriter
6464
private final IntervalSet<CommitLogPosition> commitLogPositions;
6565
private final SerializationHeader header;
6666
private final Collection<Index.Group> indexGroups;
67-
private final LifecycleNewTracker lifecycleNewTracker;
67+
private final ILifecycleTransaction txn;
6868
private final ShardTracker boundaries;
6969
private final SSTableWriter[] writers;
7070
private int currentWriter;
@@ -78,7 +78,7 @@ public ShardedMultiWriter(ColumnFamilyStore cfs,
7878
IntervalSet<CommitLogPosition> commitLogPositions,
7979
SerializationHeader header,
8080
Collection<Index.Group> indexGroups,
81-
LifecycleNewTracker lifecycleNewTracker,
81+
ILifecycleTransaction txn,
8282
ShardTracker boundaries)
8383
{
8484
this.cfs = cfs;
@@ -90,7 +90,7 @@ public ShardedMultiWriter(ColumnFamilyStore cfs,
9090
this.commitLogPositions = commitLogPositions;
9191
this.header = header;
9292
this.indexGroups = indexGroups;
93-
this.lifecycleNewTracker = lifecycleNewTracker;
93+
this.txn = txn;
9494
this.boundaries = boundaries;
9595
this.writers = new SSTableWriter[this.boundaries.count()]; // at least one
9696

@@ -118,7 +118,7 @@ private SSTableWriter createWriter(Descriptor descriptor)
118118
.setSerializationHeader(header)
119119
.addDefaultComponents(indexGroups)
120120
.setSecondaryIndexGroups(indexGroups)
121-
.build(lifecycleNewTracker, cfs);
121+
.build(txn, cfs);
122122
}
123123

124124
private long forSplittingKeysBy(long splits) {
@@ -227,7 +227,7 @@ public Throwable abort(Throwable accumulate)
227227
for (SSTableWriter writer : writers)
228228
if (writer != null)
229229
{
230-
lifecycleNewTracker.untrackNew(writer);
230+
txn.untrackNew(writer);
231231
t = writer.abort(t);
232232
}
233233
return t;

src/java/org/apache/cassandra/db/lifecycle/ILifecycleTransaction.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,18 @@
2323

2424
import com.google.common.collect.Iterables;
2525

26+
import org.apache.cassandra.db.compaction.OperationType;
27+
import org.apache.cassandra.io.sstable.SSTable;
2628
import org.apache.cassandra.io.sstable.format.SSTableReader;
2729
import org.apache.cassandra.utils.Throwables;
2830
import org.apache.cassandra.utils.TimeUUID;
2931
import org.apache.cassandra.utils.concurrent.Transactional;
3032

31-
public interface ILifecycleTransaction extends Transactional, LifecycleNewTracker
33+
public interface ILifecycleTransaction extends Transactional
3234
{
35+
void trackNew(SSTable sstable);
36+
void untrackNew(SSTable sstable);
37+
OperationType opType();
3338
void checkpoint();
3439
void update(SSTableReader reader, boolean original);
3540
void update(Collection<SSTableReader> readers, boolean original);

0 commit comments

Comments
 (0)