Skip to content

Commit e8bd231

Browse files
committed
Accord Fixes:
- Don't try to update metrics when no TxnId (e.g. GetMaxConflict) - maybeExecuteImmediately did not guarantee mutual exclusivity - Cancellation of a running 'plain' task could corrupt AccordExecutor state - GetLatestDeps message serializer - txn_blocked_by StackOverflowError - Not updating CommandsForKey in all cases on restart Also Improve: - TableId.from/toString - Route toString methods - Tracing coverage of FetchRoute - Replay command store parallelism patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20763
1 parent ccf12ef commit e8bd231

32 files changed

+511
-145
lines changed

.gitmodules

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
[submodule "modules/accord"]
22
path = modules/accord
33
url = https://github.com/apache/cassandra-accord.git
4-
branch = trunk
4+
branch = trunk

src/java/org/apache/cassandra/io/AsymmetricUnversionedSerializer.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@
3030
public interface AsymmetricUnversionedSerializer<In, Out>
3131
{
3232
void serialize(In t, DataOutputPlus out) throws IOException;
33+
34+
/**
35+
* Note: it is not guaranteed that this output is compatible with the DataInput/OutputPlus variations,
36+
* as the ByteBuffer has an implied length.
37+
*/
3338
default ByteBuffer serialize(In t) throws IOException
3439
{
3540
int size = Math.toIntExact(serializedSize(t));
@@ -59,6 +64,11 @@ default ByteBuffer serializeUnchecked(In t)
5964
}
6065
}
6166
Out deserialize(DataInputPlus in) throws IOException;
67+
68+
/**
69+
* Note: it is not guaranteed to be safe to provide an input created by the DataOutputPlus serializer varation
70+
* as the ByteBuffer has an implied length.
71+
*/
6272
default Out deserialize(ByteBuffer buffer) throws IOException
6373
{
6474
try (DataInputBuffer in = new DataInputBuffer(buffer, true))
@@ -78,5 +88,6 @@ default Out deserializeUnchecked(ByteBuffer buffer)
7888
throw new UncheckedIOException(e);
7989
}
8090
}
91+
8192
long serializedSize(In t);
8293
}

src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1097,6 +1097,12 @@ public ISSTableScanner getScanner(Iterator<AbstractBounds<PartitionPosition>> bo
10971097
return new SSTableSimpleScanner(this, getPositionsForBoundsIterator(boundsIterator));
10981098
}
10991099

1100+
public ISSTableScanner getScanner(AbstractBounds<PartitionPosition> bounds)
1101+
{
1102+
PartitionPositionBounds positionBounds = getPositionsForBounds(bounds);
1103+
return new SSTableSimpleScanner(this, positionBounds == null ? Collections.emptyList() : Collections.singletonList(positionBounds));
1104+
}
1105+
11001106

11011107
/**
11021108
* Create a {@link FileDataInput} for the data file of the sstable represented by this reader. This method returns

src/java/org/apache/cassandra/journal/ActiveSegment.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@
3030
import accord.utils.Invariants;
3131
import com.codahale.metrics.Timer;
3232
import com.google.common.annotations.VisibleForTesting;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
35+
3336
import org.apache.cassandra.db.TypeSizes;
3437
import org.apache.cassandra.utils.*;
3538
import org.apache.cassandra.utils.concurrent.OpOrder;
@@ -42,6 +45,8 @@
4245
@Simulate(with=MONITORS)
4346
public final class ActiveSegment<K, V> extends Segment<K, V>
4447
{
48+
private static final Logger logger = LoggerFactory.getLogger(ActiveSegment.class);
49+
4550
final FileChannel channel;
4651

4752
// OpOrder used to order appends wrt flush
@@ -197,6 +202,7 @@ void persistComponents()
197202

198203
private void discard()
199204
{
205+
logger.debug("Discarding {}", this);
200206
selfRef.ensureReleased();
201207

202208
descriptor.fileFor(Component.DATA).deleteIfExists();

src/java/org/apache/cassandra/journal/Journal.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -936,9 +936,9 @@ public interface Writer
936936
/**
937937
* Static segment iterator iterates all keys in _static_ segments in order.
938938
*/
939-
public StaticSegmentKeyIterator staticSegmentKeyIterator()
939+
public StaticSegmentKeyIterator staticSegmentKeyIterator(K min, K max)
940940
{
941-
return new StaticSegmentKeyIterator();
941+
return new StaticSegmentKeyIterator(min, max);
942942
}
943943

944944
/**
@@ -1011,17 +1011,22 @@ public class StaticSegmentKeyIterator implements CloseableIterator<KeyRefs<K>>
10111011
private final ReferencedSegments<K, V> segments;
10121012
private final MergeIterator<Head, KeyRefs<K>> iterator;
10131013

1014-
public StaticSegmentKeyIterator()
1014+
public StaticSegmentKeyIterator(K min, K max)
10151015
{
1016-
this.segments = selectAndReference(Segment::isStatic);
1016+
this.segments = selectAndReference(s -> s.isStatic() && (min == null || keySupport.compare(s.index().lastId(), min) >= 0) && (max == null || keySupport.compare(s.index().firstId(), max) <= 0));
10171017
List<Iterator<Head>> iterators = new ArrayList<>(segments.count());
10181018

10191019
for (Segment<K, V> segment : segments.allSorted(true))
10201020
{
1021-
StaticSegment<K, V> staticSegment = (StaticSegment<K, V>) segment;
1021+
final StaticSegment<K, V> staticSegment = (StaticSegment<K, V>) segment;
1022+
final OnDiskIndex<K>.IndexReader iter = staticSegment.index().reader();
1023+
if (min != null) iter.seek(min);
1024+
if (max != null) iter.seekEnd(max);
1025+
if (!iter.hasNext())
1026+
continue;
1027+
10221028
iterators.add(new AbstractIterator<>()
10231029
{
1024-
final Iterator<K> iter = staticSegment.index().reader();
10251030
final Head head = new Head(staticSegment.descriptor.timestamp);
10261031

10271032
@Override

src/java/org/apache/cassandra/journal/OnDiskIndex.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,7 @@ IndexReader reader()
252252

253253
public class IndexReader extends AbstractIterator<K>
254254
{
255+
int lastIdx = entryCount - 1;
255256
int idx;
256257
K key;
257258
int offset;
@@ -262,6 +263,20 @@ public class IndexReader extends AbstractIterator<K>
262263
idx = -1;
263264
}
264265

266+
public void seek(K key)
267+
{
268+
int i = binarySearch(key);
269+
if (i < 0) i = -1 - i;
270+
idx = i - 1;
271+
}
272+
273+
public void seekEnd(K key)
274+
{
275+
int i = binarySearch(key);
276+
if (i < 0) i = -2 - i;
277+
lastIdx = i;
278+
}
279+
265280
protected K computeNext()
266281
{
267282
if (advance())
@@ -284,7 +299,7 @@ public int recordSize()
284299

285300
public boolean advance()
286301
{
287-
if (idx >= entryCount - 1)
302+
if (idx >= lastIdx)
288303
return false;
289304

290305
idx++;

src/java/org/apache/cassandra/journal/Segments.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,12 @@ public void close()
220220
}
221221
}
222222

223+
@Override
224+
public String toString()
225+
{
226+
return sorted.toString();
227+
}
228+
223229
private static final Long2ObjectHashMap<?> EMPTY_MAP = new Long2ObjectHashMap<>();
224230

225231
@SuppressWarnings("unchecked")

src/java/org/apache/cassandra/journal/StaticSegment.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,8 @@ public void close(Journal<K, V> journal)
164164
*/
165165
void discard(Journal<K, V> journal)
166166
{
167+
logger.debug("Discarding {}", this);
168+
167169
((Tidier)selfRef.tidier()).discard = true;
168170
close(journal);
169171
}

src/java/org/apache/cassandra/metrics/AccordMetrics.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -201,12 +201,14 @@ public Listener(AccordMetrics readMetrics, AccordMetrics writeMetrics)
201201

202202
private AccordMetrics forTransaction(TxnId txnId)
203203
{
204-
if (txnId.isWrite())
205-
return writeMetrics;
206-
else if (txnId.isSomeRead())
207-
return readMetrics;
208-
else
209-
return null;
204+
if (txnId != null)
205+
{
206+
if (txnId.isWrite())
207+
return writeMetrics;
208+
else if (txnId.isSomeRead())
209+
return readMetrics;
210+
}
211+
return null;
210212
}
211213

212214
@Override

0 commit comments

Comments
 (0)