Skip to content

Commit d077f69

Browse files
committed
Ensure prepared_statement INSERT timestamp precedes eviction DELETE
Updates SystemKeyspace.writePreparedStatement to accept a timestamp associated with the Prepared creation time. Using this timestamp will ensure that an INSERT into system.prepared_statements will always precede the timestamp for the same Prepared in SystemKeyspace.removePreparedStatement. This is needed because Caffeine 2.9.2 may evict an entry as soon as it is inserted if the maximum weight of the cache is exceeded causing the DELETE to be executed before the INSERT. Additionally, any clusters currently experiencing a leaky system.prepared_statements table from this bug may struggle to bounce into a version with this fix as SystemKeyspace.loadPreparedPreparedStatements currently does not paginate the query to system.prepared_statements, causing heap OOMs. To fix this this patch adds pagination at 5000 rows and aborts loading once the cache size is loaded. This should allow nodes to come up and delete older prepared statements that may no longer be used as the cache fills up (which should happen immediately). This patch does not address the issue of Caffeine immediately evicting a prepared statement, however it will prevent the system.prepared_statements table from growing unbounded. For most users this should be adequate, as the cache should only be filled when there are erroneously many unique prepared statements. In such a case we can expect that clients will constantly prepare statements regardless of whether or not the cache is evicting statements. patch by Andy Tolbert; reviewed by Berenguer Blasi and Caleb Rackliffe for CASSANDRA-19703
1 parent c736d22 commit d077f69

File tree

8 files changed

+377
-35
lines changed

8 files changed

+377
-35
lines changed

CHANGES.txt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
1-
4.0.18
1+
4.0.19
2+
* Ensure prepared_statement INSERT timestamp precedes eviction DELETE (CASSANDRA-19703)
23
* Gossip doesn't converge due to race condition when updating EndpointStates multiple fields (CASSANDRA-20659)
4+
5+
6+
4.0.18
37
* Handle sstable metadata stats file getting a new mtime after compaction has finished (CASSANDRA-18119)
48
* Honor MAX_PARALLEL_TRANSFERS correctly (CASSANDRA-20532)
59
* Updating a column with a new TTL but same expiration time is non-deterministic and causes repair mismatches. (CASSANDRA-20561)

doc/modules/cassandra/pages/cql/cql_singlefile.adoc

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,28 @@ provide values for `LIMIT`, `TIMESTAMP`, and `TTL` clauses. If anonymous
239239
bind markers are used, the names for the query parameters will be
240240
`[limit]`, `[timestamp]`, and `[ttl]`, respectively.
241241

242+
===== Prepared Statement Caching
243+
244+
Prepared Statements are cached by cassandra in-memory using a
245+
https://github.com/ben-manes/caffeine[Caffeine]-managed cache which
246+
can be configured using
247+
xref:managing/configuration/cass_yaml_file.adoc#_prepared_statements_cache_size[`prepared_statements_cache_size`].
248+
The cache is also persisted to the `system.prepared_statements` table
249+
so it can be preloaded into memory on startup.
250+
251+
To ensure optimal performance, it's important to use a bind `<variable>`
252+
for *all non-constant values* in your CQL statements. If you include
253+
literal values directly in the query instead, each variation will be
254+
treated as a unique statement that must be prepared and cached
255+
separately. This will soon overflow the prepared statement cache,
256+
which is small by design.
257+
258+
When the cache reaches its maximum size, older or less frequently
259+
used statements are
260+
https://github.com/ben-manes/caffeine/wiki/Eviction[evicted],
261+
leading to additional overhead as previously prepared statements must
262+
be re-prepared.
263+
242264
[[dataDefinition]]
243265
=== Data Definition
244266

src/java/org/apache/cassandra/cql3/QueryHandler.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,12 @@ public static class Prepared
6565

6666
public final MD5Digest resultMetadataId;
6767

68+
/**
69+
* Timestamp of when this prepared statement was created. Used in QueryProcessor.preparedStatements cache
70+
* to ensure that the deletion timestamp always succeeds the insert timestamp.
71+
*/
72+
public final long timestamp;
73+
6874
/**
6975
* Contains the CQL statement source if the statement has been "regularly" perpared via
7076
* {@link QueryHandler#prepare(String, ClientState, Map)}.
@@ -81,6 +87,7 @@ public Prepared(CQLStatement statement, String rawCQLStatement, boolean fullyQua
8187
this.resultMetadataId = ResultSet.ResultMetadata.fromPrepared(statement).getResultMetadataId();
8288
this.fullyQualified = fullyQualified;
8389
this.keyspace = keyspace;
90+
this.timestamp = ClientState.getTimestamp();
8491
}
8592
}
8693
}

src/java/org/apache/cassandra/cql3/QueryProcessor.java

Lines changed: 56 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
import com.github.benmanes.caffeine.cache.Cache;
2828
import com.github.benmanes.caffeine.cache.Caffeine;
29+
import com.github.benmanes.caffeine.cache.RemovalCause;
2930
import com.google.common.annotations.VisibleForTesting;
3031
import com.google.common.base.Predicate;
3132
import com.google.common.collect.*;
@@ -86,23 +87,22 @@ public class QueryProcessor implements QueryHandler
8687
// counters. Callers of processStatement are responsible for correctly notifying metrics
8788
public static final CQLMetrics metrics = new CQLMetrics();
8889

90+
// Paging size to use when preloading prepared statements.
91+
public static final int PRELOAD_PREPARED_STATEMENTS_FETCH_SIZE = 5000;
92+
93+
// Size of the prepared statement cache in bytes.
94+
public static long PREPARED_STATEMENT_CACHE_SIZE_BYTES = capacityToBytes(DatabaseDescriptor.getPreparedStatementsCacheSizeMB());
95+
8996
private static final AtomicInteger lastMinuteEvictionsCount = new AtomicInteger(0);
9097

9198
static
9299
{
93100
preparedStatements = Caffeine.newBuilder()
94101
.executor(MoreExecutors.directExecutor())
95-
.maximumWeight(capacityToBytes(DatabaseDescriptor.getPreparedStatementsCacheSizeMB()))
102+
.maximumWeight(PREPARED_STATEMENT_CACHE_SIZE_BYTES)
96103
.weigher(QueryProcessor::getSizeOfPreparedStatementForCache)
97-
.removalListener((key, prepared, cause) -> {
98-
MD5Digest md5Digest = (MD5Digest) key;
99-
if (cause.wasEvicted())
100-
{
101-
metrics.preparedStatementsEvicted.inc();
102-
lastMinuteEvictionsCount.incrementAndGet();
103-
SystemKeyspace.removePreparedStatement(md5Digest);
104-
}
105-
}).build();
104+
.removalListener((key, prepared, cause) -> evictPreparedStatement(key, cause))
105+
.build();
106106

107107
ScheduledExecutors.scheduledTasks.scheduleAtFixedRate(() -> {
108108
long count = lastMinuteEvictionsCount.getAndSet(0);
@@ -116,6 +116,16 @@ public class QueryProcessor implements QueryHandler
116116
DatabaseDescriptor.getPreparedStatementsCacheSizeMB());
117117
}
118118

119+
private static void evictPreparedStatement(MD5Digest key, RemovalCause cause)
120+
{
121+
if (cause.wasEvicted())
122+
{
123+
metrics.preparedStatementsEvicted.inc();
124+
lastMinuteEvictionsCount.incrementAndGet();
125+
SystemKeyspace.removePreparedStatement(key);
126+
}
127+
}
128+
119129
private static long capacityToBytes(long cacheSizeMB)
120130
{
121131
return cacheSizeMB * 1024 * 1024;
@@ -140,6 +150,12 @@ private enum InternalStateInstance
140150
}
141151

142152
public void preloadPreparedStatements()
153+
{
154+
preloadPreparedStatements(PRELOAD_PREPARED_STATEMENTS_FETCH_SIZE);
155+
}
156+
157+
@VisibleForTesting
158+
public int preloadPreparedStatements(int pageSize)
143159
{
144160
int count = SystemKeyspace.loadPreparedStatements((id, query, keyspace) -> {
145161
try
@@ -154,17 +170,18 @@ public void preloadPreparedStatements()
154170
// Preload `null` statement for non-fully qualified statements, since it can't be parsed if loaded from cache and will be dropped
155171
if (!prepared.fullyQualified)
156172
preparedStatements.get(computeId(query, null), (ignored_) -> prepared);
157-
return true;
173+
return prepared;
158174
}
159175
catch (RequestValidationException e)
160176
{
161177
JVMStabilityInspector.inspectThrowable(e);
162178
logger.warn(String.format("Prepared statement recreation error, removing statement: %s %s %s", id, query, keyspace));
163179
SystemKeyspace.removePreparedStatement(id);
164-
return false;
180+
return null;
165181
}
166-
});
182+
}, pageSize);
167183
logger.info("Preloaded {} prepared statements", count);
184+
return count;
168185
}
169186

170187

@@ -466,11 +483,33 @@ public static UntypedResultSet execute(String query, ConsistencyLevel cl, QueryS
466483
public static UntypedResultSet executeInternalWithPaging(String query, int pageSize, Object... values)
467484
{
468485
Prepared prepared = prepareInternal(query);
469-
if (!(prepared.statement instanceof SelectStatement))
486+
487+
return executeInternalWithPaging(prepared.statement, pageSize, values);
488+
}
489+
490+
/**
491+
* Executes with a non-prepared statement using paging. Generally {@link #executeInternalWithPaging(String, int, Object...)}
492+
* should be used instead of this, but this may be used in niche cases like
493+
* {@link SystemKeyspace#loadPreparedStatement(MD5Digest, SystemKeyspace.TriFunction)} where prepared statements are
494+
* being loaded into {@link #preparedStatements} so it doesn't make sense to prepare a statement in this context.
495+
*/
496+
public static UntypedResultSet executeOnceInternalWithPaging(String query, int pageSize, Object... values)
497+
{
498+
QueryState queryState = internalQueryState();
499+
CQLStatement statement = parseStatement(query, queryState.getClientState());
500+
statement.validate(queryState.getClientState());
501+
502+
return executeInternalWithPaging(statement, pageSize, values);
503+
}
504+
505+
private static UntypedResultSet executeInternalWithPaging(CQLStatement statement, int pageSize, Object... values)
506+
{
507+
if (!(statement instanceof SelectStatement))
470508
throw new IllegalArgumentException("Only SELECTs can be paged");
471509

472-
SelectStatement select = (SelectStatement)prepared.statement;
473-
QueryPager pager = select.getQuery(makeInternalOptions(prepared.statement, values), FBUtilities.nowInSeconds()).getPager(null, ProtocolVersion.CURRENT);
510+
SelectStatement select = (SelectStatement) statement;
511+
int nowInSec = FBUtilities.nowInSeconds();
512+
QueryPager pager = select.getQuery(makeInternalOptions(select, values), nowInSec).getPager(null, ProtocolVersion.CURRENT);
474513
return UntypedResultSet.create(select, pager, pageSize);
475514
}
476515

@@ -696,7 +735,7 @@ public static ResultMessage.Prepared storePreparedStatement(String queryString,
696735

697736
Prepared previous = preparedStatements.get(statementId, (ignored_) -> prepared);
698737
if (previous == prepared)
699-
SystemKeyspace.writePreparedStatement(keyspace, statementId, queryString);
738+
SystemKeyspace.writePreparedStatement(keyspace, statementId, queryString, prepared.timestamp);
700739

701740
ResultSet.PreparedMetadata preparedMetadata = ResultSet.PreparedMetadata.fromPrepared(prepared.statement);
702741
ResultSet.ResultMetadata resultMetadata = ResultSet.ResultMetadata.fromPrepared(prepared.statement);

src/java/org/apache/cassandra/db/SystemKeyspace.java

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.slf4j.LoggerFactory;
5454

5555
import org.apache.cassandra.config.DatabaseDescriptor;
56+
import org.apache.cassandra.cql3.QueryHandler.Prepared;
5657
import org.apache.cassandra.cql3.QueryProcessor;
5758
import org.apache.cassandra.cql3.UntypedResultSet;
5859
import org.apache.cassandra.cql3.functions.AggregateFcts;
@@ -78,6 +79,7 @@
7879
import org.apache.cassandra.exceptions.ConfigurationException;
7980
import org.apache.cassandra.io.util.DataInputBuffer;
8081
import org.apache.cassandra.io.util.DataOutputBuffer;
82+
import org.apache.cassandra.io.util.FileUtils;
8183
import org.apache.cassandra.io.util.RebufferingInputStream;
8284
import org.apache.cassandra.locator.IEndpointSnitch;
8385
import org.apache.cassandra.locator.InetAddressAndPort;
@@ -109,8 +111,10 @@
109111
import static java.lang.String.format;
110112
import static java.util.Collections.emptyMap;
111113
import static java.util.Collections.singletonMap;
114+
import static org.apache.cassandra.cql3.QueryProcessor.PREPARED_STATEMENT_CACHE_SIZE_BYTES;
112115
import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
113116
import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal;
117+
import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternalWithPaging;
114118

115119
public final class SystemKeyspace
116120
{
@@ -1615,11 +1619,11 @@ private static Range<Token> byteBufferToRange(ByteBuffer rawRange, IPartitioner
16151619
}
16161620
}
16171621

1618-
public static void writePreparedStatement(String loggedKeyspace, MD5Digest key, String cql)
1622+
public static void writePreparedStatement(String loggedKeyspace, MD5Digest key, String cql, long timestamp)
16191623
{
1620-
executeInternal(format("INSERT INTO %s (logged_keyspace, prepared_id, query_string) VALUES (?, ?, ?)",
1624+
executeInternal(format("INSERT INTO %s (logged_keyspace, prepared_id, query_string) VALUES (?, ?, ?) USING TIMESTAMP ?",
16211625
PreparedStatements.toString()),
1622-
loggedKeyspace, key.byteBuffer(), cql);
1626+
loggedKeyspace, key.byteBuffer(), cql, timestamp);
16231627
logger.debug("stored prepared statement for logged keyspace '{}': '{}'", loggedKeyspace, cql);
16241628
}
16251629

@@ -1635,17 +1639,50 @@ public static void resetPreparedStatements()
16351639
preparedStatements.truncateBlockingWithoutSnapshot();
16361640
}
16371641

1638-
public static int loadPreparedStatements(TriFunction<MD5Digest, String, String, Boolean> onLoaded)
1642+
public static int loadPreparedStatements(TriFunction<MD5Digest, String, String, Prepared> onLoaded)
1643+
{
1644+
return loadPreparedStatements(onLoaded, QueryProcessor.PRELOAD_PREPARED_STATEMENTS_FETCH_SIZE);
1645+
}
1646+
1647+
public static int loadPreparedStatements(TriFunction<MD5Digest, String, String, Prepared> onLoaded, int pageSize)
16391648
{
16401649
String query = String.format("SELECT prepared_id, logged_keyspace, query_string FROM %s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, PREPARED_STATEMENTS);
1641-
UntypedResultSet resultSet = executeOnceInternal(query);
1650+
UntypedResultSet resultSet = executeOnceInternalWithPaging(query, pageSize);
16421651
int counter = 0;
1652+
1653+
// As the cache size may be briefly exceeded before statements are evicted, we allow loading 110% the cache size
1654+
// to avoid logging early.
1655+
long preparedBytesLoadThreshold = (long) (PREPARED_STATEMENT_CACHE_SIZE_BYTES * 1.1);
1656+
long preparedBytesLoaded = 0L;
16431657
for (UntypedResultSet.Row row : resultSet)
16441658
{
1645-
if (onLoaded.accept(MD5Digest.wrap(row.getByteArray("prepared_id")),
1646-
row.getString("query_string"),
1647-
row.has("logged_keyspace") ? row.getString("logged_keyspace") : null))
1659+
Prepared prepared = onLoaded.accept(MD5Digest.wrap(row.getByteArray("prepared_id")),
1660+
row.getString("query_string"),
1661+
row.has("logged_keyspace") ? row.getString("logged_keyspace") : null);
1662+
if (prepared != null)
1663+
{
16481664
counter++;
1665+
preparedBytesLoaded += Math.max(0, prepared.pstmntSize);
1666+
1667+
if (preparedBytesLoaded > preparedBytesLoadThreshold)
1668+
{
1669+
// In the event that we detect that we have loaded more bytes than the cache size return early to
1670+
// prevent an indefinite startup time. This is almost certainly caused by the prepared statement cache
1671+
// leaking (CASSANDRA-19703) which should not recur after being on a version running this code.
1672+
// In such a case it's better to warn and continue startup than to continually page over millions of
1673+
// prepared statements that would be immediately evicted.
1674+
logger.warn("Detected prepared statement cache filling up during preload after preparing {} " +
1675+
"statements (loaded {} with prepared_statements_cache_size being {}). " +
1676+
"This could be an indication that prepared statements leaked prior to CASSANDRA-19703 " +
1677+
"being fixed. Returning early to prevent indefinite startup. " +
1678+
"Consider truncating {}.{} to clear out leaked prepared statements.",
1679+
counter,
1680+
FileUtils.stringifyFileSize(preparedBytesLoaded),
1681+
FileUtils.stringifyFileSize(PREPARED_STATEMENT_CACHE_SIZE_BYTES),
1682+
SchemaConstants.SYSTEM_KEYSPACE_NAME, PREPARED_STATEMENTS);
1683+
break;
1684+
}
1685+
}
16491686
}
16501687
return counter;
16511688
}

test/distributed/org/apache/cassandra/distributed/test/MixedModeFuzzTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
5050
import net.bytebuddy.implementation.MethodDelegation;
5151
import org.apache.cassandra.cql3.CQLStatement;
52-
import org.apache.cassandra.cql3.QueryHandler;
52+
import org.apache.cassandra.cql3.QueryHandler.Prepared;
5353
import org.apache.cassandra.cql3.QueryProcessor;
5454
import org.apache.cassandra.db.SystemKeyspace;
5555
import org.apache.cassandra.distributed.api.ConsistencyLevel;
@@ -268,9 +268,10 @@ public void mixedModeFuzzTest() throws Throwable
268268

269269
c.get(nodeWithFix.get() ? 1 : 2).runOnInstance(() -> {
270270
SystemKeyspace.loadPreparedStatements((id, query, keyspace) -> {
271+
Prepared prepared = QueryProcessor.instance.getPrepared(id);
271272
if (rng.nextBoolean())
272273
QueryProcessor.instance.evictPrepared(id);
273-
return true;
274+
return prepared;
274275
});
275276
});
276277
break;
@@ -450,7 +451,7 @@ public static ResultMessage.Prepared prepare(String queryString, ClientState cli
450451
if (existing != null)
451452
return existing;
452453

453-
QueryHandler.Prepared prepared = QueryProcessor.parseAndPrepare(queryString, clientState, false);
454+
Prepared prepared = QueryProcessor.parseAndPrepare(queryString, clientState, false);
454455
CQLStatement statement = prepared.statement;
455456

456457
int boundTerms = statement.getBindVariables().size();

test/distributed/org/apache/cassandra/distributed/test/ReprepareFuzzTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import net.bytebuddy.dynamic.DynamicType;
4444
import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
4545
import net.bytebuddy.implementation.MethodDelegation;
46+
import org.apache.cassandra.cql3.QueryHandler.Prepared;
4647
import org.apache.cassandra.cql3.QueryProcessor;
4748
import org.apache.cassandra.db.SystemKeyspace;
4849
import org.apache.cassandra.distributed.api.ConsistencyLevel;
@@ -226,9 +227,10 @@ public void fuzzTest() throws Throwable
226227
case CLEAR_CACHES:
227228
c.get(1).runOnInstance(() -> {
228229
SystemKeyspace.loadPreparedStatements((id, query, keyspace) -> {
230+
Prepared prepared = QueryProcessor.instance.getPrepared(id);
229231
if (rng.nextBoolean())
230232
QueryProcessor.instance.evictPrepared(id);
231-
return true;
233+
return prepared;
232234
});
233235
});
234236
break;

0 commit comments

Comments
 (0)