Skip to content

Commit fd2e11f

Browse files
committed
Accord Fixes:
- WatermarkCollector should not report same closed/retired epoch N times - AccordSyncPropagator can merge pending requests and back-off retries - AccordCommandLoader should notify listeners - AccordSegmentCompactor should estimate number of keys to ensure bloom filters work - txn_blocked_by table should report what it can, not throw IllegalStateException - Permit uncompressed system tables patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20754
1 parent 8289bb4 commit fd2e11f

File tree

11 files changed

+97
-18
lines changed

11 files changed

+97
-18
lines changed

src/java/org/apache/cassandra/cql3/statements/schema/CreateTableStatement.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ public Keyspaces apply(ClusterMetadata metadata)
173173
throw ire("read_repair must be set to 'NONE' for transiently replicated keyspaces");
174174
}
175175

176-
if (!table.params.compression.isEnabled())
176+
if (!table.params.compression.isEnabled() && !SchemaConstants.isSystemKeyspace(table.keyspace))
177177
Guardrails.uncompressedTablesEnabled.ensureEnabled(state);
178178

179179
if (table.params.transactionalMode.accordIsEnabled && SchemaConstants.isSystemKeyspace(keyspaceName))

src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -998,7 +998,7 @@ public DataSet data(DecoratedKey partitionKey)
998998
process(ds, commandStores, shard, processed, id, 0, id, Reason.Self, null);
999999
// everything was processed right?
10001000
if (!shard.txns.isEmpty() && !shard.txns.keySet().containsAll(processed))
1001-
throw new IllegalStateException("Skipped txns: " + Sets.difference(shard.txns.keySet(), processed));
1001+
Invariants.expect(false, "Skipped txns: " + Sets.difference(shard.txns.keySet(), processed));
10021002
}
10031003

10041004
return ds;

src/java/org/apache/cassandra/journal/OnDiskIndex.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,11 @@ public K lastId()
199199
return lastId;
200200
}
201201

202+
public int entryCount()
203+
{
204+
return entryCount;
205+
}
206+
202207
@Override
203208
public long[] lookUp(K id)
204209
{

src/java/org/apache/cassandra/journal/StaticSegment.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,11 @@ OnDiskIndex<K> index()
239239
return index;
240240
}
241241

242+
public int entryCount()
243+
{
244+
return index.entryCount();
245+
}
246+
242247
@Override
243248
boolean isActive()
244249
{

src/java/org/apache/cassandra/service/accord/AbstractAccordSegmentCompactor.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ boolean considerWritingKey()
8484
return false;
8585
}
8686

87-
abstract void initializeWriter();
87+
abstract void initializeWriter(int estimatedKeyCount);
8888
abstract SSTableTxnWriter writer();
8989
abstract void finishAndAddWriter();
9090
abstract Throwable cleanupWriter(Throwable t);
@@ -99,9 +99,12 @@ public Collection<StaticSegment<JournalKey, V>> compact(Collection<StaticSegment
9999
Invariants.require(segments.size() >= 2, () -> String.format("Can only compact 2 or more segments, but got %d", segments.size()));
100100
logger.info("Compacting {} static segments: {}", segments.size(), segments);
101101

102+
// TODO (expected): this will be a large over-estimate. should make segments an sstable format and include cardinality estimation
103+
int estimatedKeyCount = 0;
102104
PriorityQueue<KeyOrderReader<JournalKey>> readers = new PriorityQueue<>();
103105
for (StaticSegment<JournalKey, V> segment : segments)
104106
{
107+
estimatedKeyCount += segment.entryCount();
105108
KeyOrderReader<JournalKey> reader = segment.keyOrderReader();
106109
if (reader.advance())
107110
readers.add(reader);
@@ -114,7 +117,7 @@ public Collection<StaticSegment<JournalKey, V>> compact(Collection<StaticSegment
114117
if (readers.isEmpty())
115118
return Collections.emptyList();
116119

117-
initializeWriter();
120+
initializeWriter(estimatedKeyCount);
118121

119122
JournalKey key = null;
120123
FlyweightImage builder = null;

src/java/org/apache/cassandra/service/accord/AccordKeyspace.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@
9191
import org.apache.cassandra.io.sstable.format.SSTableReader;
9292
import org.apache.cassandra.schema.ColumnMetadata;
9393
import org.apache.cassandra.schema.CompactionParams;
94+
import org.apache.cassandra.schema.CompressionParams;
9495
import org.apache.cassandra.schema.IndexMetadata;
9596
import org.apache.cassandra.schema.Indexes;
9697
import org.apache.cassandra.schema.KeyspaceMetadata;
@@ -144,8 +145,8 @@ public static TableMetadata journalMetadata(String tableName, boolean index)
144145
+ "user_version int,"
145146
+ "record blob,"
146147
+ "PRIMARY KEY((key), descriptor, offset)"
147-
+ ") WITH CLUSTERING ORDER BY (descriptor DESC, offset DESC)" +
148-
" WITH compression = {'class':'NoopCompressor'};")
148+
+ ") WITH CLUSTERING ORDER BY (descriptor DESC, offset DESC);")
149+
.compression(CompressionParams.NOOP)
149150
.compaction(CompactionParams.lcs(emptyMap()))
150151
.bloomFilterFpChance(0.01)
151152
.partitioner(new LocalPartitioner(BytesType.instance));

src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,12 @@ public AccordSegmentCompactor(Version userVersion, ColumnFamilyStore cfs)
3737
}
3838

3939
@Override
40-
void initializeWriter()
40+
void initializeWriter(int estimatedKeyCount)
4141
{
4242
Descriptor descriptor = cfs.newSSTableDescriptor(cfs.getDirectories().getDirectoryForNewSSTables());
4343
SerializationHeader header = new SerializationHeader(true, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS);
4444

45-
this.writer = SSTableTxnWriter.create(cfs, descriptor, 0, 0, null, false, header);
45+
this.writer = SSTableTxnWriter.create(cfs, descriptor, estimatedKeyCount, 0, null, false, header);
4646
}
4747

4848
@Override

src/java/org/apache/cassandra/service/accord/AccordSyncPropagator.java

Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.Collections;
2525
import java.util.HashSet;
2626
import java.util.Set;
27+
import java.util.concurrent.ConcurrentHashMap;
2728
import java.util.concurrent.TimeUnit;
2829

2930
import com.google.common.collect.ImmutableSet;
@@ -193,6 +194,7 @@ boolean ack(Node.Id id, Notification notifications)
193194
private final IFailureDetector failureDetector;
194195
private final ScheduledExecutorPlus scheduler;
195196
private final Listener listener;
197+
private final ConcurrentHashMap<RetryKey, Notification> retryingNotifications = new ConcurrentHashMap<>();
196198

197199
public AccordSyncPropagator(Node.Id localId, AccordEndpointMapper endpointMapper,
198200
MessageDelivery messagingService, IFailureDetector failureDetector, ScheduledExecutorPlus scheduler,
@@ -304,6 +306,27 @@ private boolean hasSyncCompletedFor(long epoch)
304306
});
305307
}
306308

309+
private void scheduleRetry(Node.Id to, Notification notification)
310+
{
311+
Notification retry = new Notification(notification.epoch, notification.syncComplete, notification.closed, notification.retired, notification.attempts + 1);
312+
RetryKey key = new RetryKey(to, notification.epoch);
313+
retryingNotifications.compute(key, (k, cur) -> {
314+
if (cur == null)
315+
{
316+
scheduler.schedule(() -> retry(k), Math.max(1, Math.min(15, retry.attempts)), TimeUnit.MINUTES);
317+
return retry;
318+
}
319+
return cur.merge(retry);
320+
});
321+
}
322+
323+
private void retry(RetryKey key)
324+
{
325+
Notification retry = retryingNotifications.remove(key);
326+
if (retry != null)
327+
notify(key.to, retry);
328+
}
329+
307330
private boolean notify(Node.Id to, Notification notification)
308331
{
309332
InetAddressAndPort toEp = endpointMapper.mappedEndpoint(to);
@@ -335,7 +358,7 @@ public void onResponse(Message<SimpleReply> msg)
335358
@Override
336359
public void onFailure(InetAddressAndPort from, RequestFailure failure)
337360
{
338-
scheduler.schedule(() -> AccordSyncPropagator.this.notify(to, notification), 1, TimeUnit.SECONDS);
361+
scheduleRetry(to, notification);
339362
}
340363

341364
@Override
@@ -355,7 +378,7 @@ public boolean invokeOnFailure()
355378
return true;
356379
}
357380
noSpamLogger.warn("Node{} is not alive, unable to notify of {}", to, notification);
358-
scheduler.schedule(() -> notify(to, notification), 1, TimeUnit.MINUTES);
381+
scheduleRetry(to, notification);
359382
return false;
360383
}
361384
messagingService.sendWithCallback(msg, toEp, cb);
@@ -397,13 +420,30 @@ public long serializedSize(Notification notification)
397420
final long epoch;
398421
final Collection<Node.Id> syncComplete;
399422
final Ranges closed, retired;
423+
final int attempts;
400424

401425
public Notification(long epoch, Collection<Node.Id> syncComplete, Ranges closed, Ranges retired)
426+
{
427+
this(epoch, syncComplete, closed, retired, 0);
428+
}
429+
430+
public Notification(long epoch, Collection<Node.Id> syncComplete, Ranges closed, Ranges retired, int attempts)
402431
{
403432
this.epoch = epoch;
404433
this.syncComplete = syncComplete;
405434
this.closed = closed;
406435
this.retired = retired;
436+
this.attempts = attempts;
437+
}
438+
439+
Notification merge(Notification add)
440+
{
441+
Invariants.require(add.epoch == this.epoch);
442+
Collection<Node.Id> syncComplete = ImmutableSet.<Node.Id>builder()
443+
.addAll(this.syncComplete)
444+
.addAll(add.syncComplete)
445+
.build();
446+
return new Notification(epoch, syncComplete, closed.with(add.closed), retired.with(add.retired), Math.max(add.attempts, this.attempts));
407447
}
408448

409449
@Override
@@ -417,4 +457,32 @@ public String toString()
417457
'}';
418458
}
419459
}
460+
461+
static class RetryKey
462+
{
463+
final Node.Id to;
464+
final long epoch;
465+
466+
RetryKey(Node.Id id, long epoch)
467+
{
468+
to = id;
469+
this.epoch = epoch;
470+
}
471+
472+
@Override
473+
public int hashCode()
474+
{
475+
return to.id * 31 + (int)epoch;
476+
}
477+
478+
@Override
479+
public boolean equals(Object obj)
480+
{
481+
if (!(obj instanceof RetryKey))
482+
return false;
483+
484+
RetryKey that = (RetryKey) obj;
485+
return that.epoch == this.epoch && that.to.equals(this.to);
486+
}
487+
}
420488
}

src/java/org/apache/cassandra/service/accord/WatermarkCollector.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -135,22 +135,19 @@ static void fetchAndReportWatermarksAsync(AccordConfigurationService configServi
135135
MessageDelivery.RetryErrorMessage.EMPTY)
136136
.addCallback((m, fail) -> {
137137
if (fail != null)
138-
{
139138
return;
140-
}
139+
141140
Snapshot snapshot = m.payload;
142141
long minEpoch = configService.minEpoch();
143142
for (Map.Entry<Range, Long> e : snapshot.closed.entrySet())
144143
{
145144
Ranges r = Ranges.of(e.getKey());
146-
for (long epoch = minEpoch; epoch <= e.getValue(); epoch++)
147-
configService.receiveClosed(r, e.getValue());
145+
configService.receiveClosed(r, e.getValue());
148146
}
149147
for (Map.Entry<Range, Long> e : snapshot.retired.entrySet())
150148
{
151149
Ranges r = Ranges.of(e.getKey());
152-
for (long epoch = minEpoch; epoch <= e.getValue(); epoch++)
153-
configService.receiveRetired(r, e.getValue());
150+
configService.receiveRetired(r, e.getValue());
154151
}
155152
for (Map.Entry<Integer, Long> e : snapshot.synced.entrySet())
156153
{

0 commit comments

Comments
 (0)