Skip to content

Commit 6b57acc

Browse files
committed
Reduce contention in MemtableAllocator.allocate
- Estimate memory required to allocate for applying a partition update into a memtable and allocate this memory in one shot, then use it as a request-local SLAB - Reduce contention by switching MemtableAllocator.SubAllocator#owns from updates via AtomicLongFieldUpdater to LongAdder usage. MemtableAllocator.SubAllocator#acquired(..) method updates "owns" value but does not use the updated result. - Reduce contention by replacing of CAS loop in MemtablePool.SubPool#tryAllocate with allocatedUpdater.addAndGet(this, size) Patch by Dmitry Konstantinov; reviewed by Michael Semb Wever for CASSANDRA-20226
1 parent 4cf684b commit 6b57acc

File tree

19 files changed

+574
-66
lines changed

19 files changed

+574
-66
lines changed

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
5.1
2+
* Reduce contention in MemtableAllocator.allocate (CASSANDRA-20226)
23
* Add export, list, import sub-commands for nodetool compressiondictionary (CASSANDRA-20941)
34
* Add support in the binary protocol to allow transactions to have multiple conditions (CASSANDRA-20883)
45
* Enable CQLSSTableWriter to create SSTables compressed with a dictionary (CASSANDRA-20938)

src/java/org/apache/cassandra/db/NativeClustering.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@
2929
import org.apache.cassandra.utils.FBUtilities;
3030
import org.apache.cassandra.utils.ObjectSizes;
3131
import org.apache.cassandra.utils.concurrent.OpOrder;
32+
import org.apache.cassandra.utils.memory.AddressBasedAllocator;
3233
import org.apache.cassandra.utils.memory.HeapCloner;
3334
import org.apache.cassandra.utils.memory.MemoryUtil;
3435
import org.apache.cassandra.utils.memory.NativeEndianMemoryUtil;
35-
import org.apache.cassandra.utils.memory.NativeAllocator;
3636

3737
public class NativeClustering implements Clustering<NativeData>
3838
{
@@ -42,7 +42,17 @@ public class NativeClustering implements Clustering<NativeData>
4242

4343
private NativeClustering() { peer = 0; }
4444

45-
public NativeClustering(NativeAllocator allocator, OpOrder.Group writeOp, Clustering<?> clustering)
45+
public static int estimateAllocationSize(Clustering<?> clustering)
46+
{
47+
int count = clustering.size();
48+
int metadataSize = (count * 2) + 4;
49+
int dataSize = clustering.dataSize();
50+
int bitmapSize = ((count + 7) >>> 3);
51+
52+
return metadataSize + dataSize + bitmapSize;
53+
}
54+
55+
public NativeClustering(AddressBasedAllocator allocator, OpOrder.Group writeOp, Clustering<?> clustering)
4656
{
4757
int count = clustering.size();
4858
int metadataSize = (count * 2) + 4;

src/java/org/apache/cassandra/db/partitions/BTreePartitionUpdater.java

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ public class BTreePartitionUpdater implements UpdateFunction<Row, Row>, ColumnDa
4040
{
4141
final MemtableAllocator allocator;
4242
final OpOrder.Group writeOp;
43+
4344
final Cloner cloner;
45+
Cloner contextCloner;
4446
final UpdateTransaction indexer;
4547
public long dataSize;
4648

@@ -84,6 +86,34 @@ public BTreePartitionData mergePartitions(BTreePartitionData current, final Part
8486

8587
protected BTreePartitionData makeMergedPartition(BTreePartitionData current, PartitionUpdate update)
8688
{
89+
if (cloner.isContextAwareCloningSupported()) // to avoid an estimation cost if context aware cloning is not supported
90+
{
91+
int estimitedCloneSize = 0;
92+
// a typical case when all values in the update are used in the result of the merge
93+
// clustering key cloning is needed when we have an insert but not needed when we have an update,
94+
// so we may allocate a bit more than needed sometimes
95+
for (Row row : update)
96+
{
97+
estimitedCloneSize += (int) row.accumulate((cd, v) -> v + cd.estimateCloneSize(cloner), 0);
98+
estimitedCloneSize += cloner.estimateCloneSize(row.clustering());
99+
}
100+
101+
Row staticRow = update.staticRow();
102+
if (!staticRow.isEmpty())
103+
{
104+
estimitedCloneSize += (int) staticRow.accumulate((cd, v) -> v + cd.estimateCloneSize(cloner), 0);
105+
// there are no clustering keys for static rows
106+
}
107+
108+
if (contextCloner != null && contextCloner != cloner)
109+
contextCloner.adjustUnused();
110+
contextCloner = cloner.createContextAwareCloner(estimitedCloneSize);
111+
}
112+
else
113+
{
114+
contextCloner = cloner;
115+
}
116+
87117
DeletionInfo newDeletionInfo = merge(current.deletionInfo, update.deletionInfo());
88118

89119
RegularAndStaticColumns columns = current.columns;
@@ -130,7 +160,7 @@ private DeletionInfo merge(DeletionInfo existing, DeletionInfo update)
130160
@Override
131161
public Row insert(Row insert)
132162
{
133-
Row data = insert.clone(cloner);
163+
Row data = insert.clone(contextCloner);
134164
indexer.onInserted(insert);
135165

136166
dataSize += data.dataSize();
@@ -154,17 +184,17 @@ public Cell<?> merge(Cell<?> previous, Cell<?> insert)
154184
long timeDelta = Math.abs(insert.timestamp() - previous.timestamp());
155185
if (timeDelta < colUpdateTimeDelta)
156186
colUpdateTimeDelta = timeDelta;
157-
if (cloner != null)
158-
insert = cloner.clone(insert);
187+
if (contextCloner != null)
188+
insert = contextCloner.clone(insert);
159189
dataSize += insert.dataSize() - previous.dataSize();
160190
heapSize += insert.unsharedHeapSizeExcludingData() - previous.unsharedHeapSizeExcludingData();
161191
return insert;
162192
}
163193

164194
public ColumnData insert(ColumnData insert)
165195
{
166-
if (cloner != null)
167-
insert = insert.clone(cloner);
196+
if (contextCloner != null)
197+
insert = insert.clone(contextCloner);
168198
dataSize += insert.dataSize();
169199
heapSize += insert.unsharedHeapSizeExcludingData();
170200
return insert;
@@ -185,5 +215,7 @@ public void onAllocatedOnHeap(long heapSize)
185215
public void reportAllocatedMemory()
186216
{
187217
allocator.onHeap().adjust(heapSize, writeOp);
218+
if (contextCloner != null && contextCloner != cloner)
219+
contextCloner.adjustUnused();
188220
}
189221
}

src/java/org/apache/cassandra/db/rows/AbstractCell.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,18 @@ public Cell<?> clone(ByteBufferCloner cloner)
114114
return new BufferCell(column, timestamp(), ttl(), localDeletionTime(), cloner.clone(buffer()), path == null ? null : path.clone(cloner));
115115
}
116116

117+
public static int estimateAllocationSize(Cell<?> cell)
118+
{
119+
long size = cell.valueSize();
120+
CellPath path = cell.path();
121+
if (path != null)
122+
{
123+
assert path.size() == 1 : String.format("Expected path size to be 1 but was not; %s", path);
124+
size += path.get(0).remaining();
125+
}
126+
return (int) size;
127+
}
128+
117129
// note: while the cell returned may be different, the value is the same, so if the value is offheap it must be referenced inside a guarded context (or copied)
118130
public Cell<?> updateAllTimestamp(long newTimestamp)
119131
{

src/java/org/apache/cassandra/db/rows/Cell.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,11 @@ public final Cell<?> clone(Cloner cloner)
201201
return cloner.clone(this);
202202
}
203203

204+
public int estimateCloneSize(Cloner cloner)
205+
{
206+
return cloner.estimateCloneSize(this);
207+
}
208+
204209
public abstract Cell<?> clone(ByteBufferCloner cloner);
205210

206211
@Override

src/java/org/apache/cassandra/db/rows/ColumnData.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,9 @@ public static void digest(Digest digest, ColumnData cd)
280280

281281
public abstract ColumnData clone(Cloner cloner);
282282

283+
public abstract int estimateCloneSize(Cloner cloner);
284+
285+
283286
/**
284287
* Returns a copy of the data where all timestamps for live data have replaced by {@code newTimestamp} and
285288
* all deletion timestamp by {@code newTimestamp - 1}.

src/java/org/apache/cassandra/db/rows/ComplexColumnData.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,11 @@ public ColumnData clone(Cloner cloner)
260260
return transform(c -> cloner.clone(c));
261261
}
262262

263+
public int estimateCloneSize(Cloner cloner)
264+
{
265+
return (int) accumulate((c, v) -> v + cloner.estimateCloneSize(c), 0);
266+
}
267+
263268
public ComplexColumnData updateAllTimestamp(long newTimestamp)
264269
{
265270
DeletionTime newDeletion = complexDeletion.isLive() ? complexDeletion : DeletionTime.build(newTimestamp - 1, complexDeletion.localDeletionTime());

src/java/org/apache/cassandra/db/rows/NativeCell.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@
2828
import org.apache.cassandra.utils.ByteBufferUtil;
2929
import org.apache.cassandra.utils.ObjectSizes;
3030
import org.apache.cassandra.utils.concurrent.OpOrder;
31+
import org.apache.cassandra.utils.memory.AddressBasedAllocator;
3132
import org.apache.cassandra.utils.memory.MemoryUtil;
32-
import org.apache.cassandra.utils.memory.NativeAllocator;
3333
import org.apache.cassandra.utils.memory.NativeEndianMemoryUtil;
3434

3535
public class NativeCell extends AbstractCell<NativeData> implements NativeData
@@ -45,13 +45,28 @@ public class NativeCell extends AbstractCell<NativeData> implements NativeData
4545

4646
private final long peer;
4747

48+
public static int estimateAllocationSize(Cell<?> cell)
49+
{
50+
long size = offHeapSizeWithoutPath(cell.valueSize());
51+
CellPath path = cell.path();
52+
if (path != null)
53+
{
54+
assert path.size() == 1 : String.format("Expected path size to be 1 but was not; %s", path);
55+
size += 4 + path.get(0).remaining();
56+
}
57+
58+
if (size > Integer.MAX_VALUE)
59+
throw new IllegalStateException();
60+
return (int) size;
61+
}
62+
4863
private NativeCell()
4964
{
5065
super(null);
5166
this.peer = 0;
5267
}
5368

54-
public NativeCell(NativeAllocator allocator,
69+
public NativeCell(AddressBasedAllocator allocator,
5570
OpOrder.Group writeOp,
5671
Cell<?> cell)
5772
{
@@ -67,7 +82,7 @@ public NativeCell(NativeAllocator allocator,
6782

6883
// Please keep both int/long overloaded ctros public. Otherwise silent casts will mess timestamps when one is not
6984
// available.
70-
public NativeCell(NativeAllocator allocator,
85+
public NativeCell(AddressBasedAllocator allocator,
7186
OpOrder.Group writeOp,
7287
ColumnMetadata column,
7388
long timestamp,
@@ -79,7 +94,7 @@ public NativeCell(NativeAllocator allocator,
7994
this(allocator, writeOp, column, timestamp, ttl, deletionTimeLongToUnsignedInteger(localDeletionTime), value, path);
8095
}
8196

82-
public NativeCell(NativeAllocator allocator,
97+
public NativeCell(AddressBasedAllocator allocator,
8398
OpOrder.Group writeOp,
8499
ColumnMetadata column,
85100
long timestamp,
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.utils.memory;
20+
21+
import org.apache.cassandra.utils.concurrent.OpOrder;
22+
23+
public interface AddressBasedAllocator
24+
{
25+
long allocate(int sizeToAllocate, OpOrder.Group opGroup);
26+
}

src/java/org/apache/cassandra/utils/memory/ByteBufferCloner.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.cassandra.db.marshal.ByteArrayAccessor;
2727
import org.apache.cassandra.db.marshal.ByteBufferAccessor;
2828
import org.apache.cassandra.db.marshal.ValueAccessor;
29+
import org.apache.cassandra.db.rows.AbstractCell;
2930
import org.apache.cassandra.db.rows.Cell;
3031
import org.apache.cassandra.utils.ByteBufferUtil;
3132

@@ -52,12 +53,34 @@ public Clustering<?> clone(Clustering<?> clustering)
5253
return clustering.clone(this);
5354
}
5455

56+
public int estimateCloneSize(Clustering<?> clustering)
57+
{
58+
return clustering.dataSize();
59+
}
60+
5561
@Override
5662
public Cell<?> clone(Cell<?> cell)
5763
{
5864
return cell.clone(this);
5965
}
6066

67+
public int estimateCloneSize(Cell<?> cell)
68+
{
69+
return AbstractCell.estimateAllocationSize(cell);
70+
}
71+
72+
@Override
73+
public Cloner createContextAwareCloner(int estimatedCloneSize)
74+
{
75+
return this;
76+
}
77+
78+
@Override
79+
public void adjustUnused()
80+
{
81+
// nothing to do by default
82+
}
83+
6184
public final ByteBuffer clone(ByteBuffer buffer)
6285
{
6386
return clone(buffer, ByteBufferAccessor.instance);

0 commit comments

Comments
 (0)