@@ -559,13 +559,21 @@ protected <M extends Message> CompletableFuture<FDBStoredRecord<M>> saveTypedRec
559
559
560
560
@ SuppressWarnings ("PMD.CloseResource" )
561
561
private <M extends Message > void addRecordCount (@ Nonnull RecordMetaData metaData , @ Nonnull FDBStoredRecord <M > rec , @ Nonnull byte [] increment ) {
562
- if (metaData .getRecordCountKey () == null ) {
563
- return ;
562
+ if (metaData .getRecordCountKey () != null ) {
563
+ beginRecordStoreStateRead ();
564
+ try {
565
+ RecordMetaDataProto .DataStoreInfo header = recordStoreStateRef .get ().getStoreHeader ();
566
+ // We do not need to check the format version here. In order for it to be DISABLED we would have to be
567
+ // on a format version that supports such a state.
568
+ if (header .getRecordCountState () != RecordMetaDataProto .DataStoreInfo .RecordCountState .DISABLED ) {
569
+ Key .Evaluated subkey = metaData .getRecordCountKey ().evaluateSingleton (rec );
570
+ final byte [] keyBytes = getSubspace ().pack (Tuple .from (RECORD_COUNT_KEY ).addAll (subkey .toTupleAppropriateList ()));
571
+ ensureContextActive ().mutate (MutationType .ADD , keyBytes , increment );
572
+ }
573
+ } finally {
574
+ endRecordStoreStateRead ();
575
+ }
564
576
}
565
- final Transaction tr = ensureContextActive ();
566
- Key .Evaluated subkey = metaData .getRecordCountKey ().evaluateSingleton (rec );
567
- final byte [] keyBytes = getSubspace ().pack (Tuple .from (RECORD_COUNT_KEY ).addAll (subkey .toTupleAppropriateList ()));
568
- tr .mutate (MutationType .ADD , keyBytes , increment );
569
577
}
570
578
571
579
@ Nullable
@@ -1851,7 +1859,9 @@ private Key.Evaluated deleteRecordsWhereCheckRecordTypes() {
1851
1859
}
1852
1860
1853
1861
final KeyExpression recordCountKey = getRecordMetaData ().getRecordCountKey ();
1854
- if (recordCountKey != null ) {
1862
+ if (recordCountKey != null
1863
+ // we don't need to call beginRecordStoreStateRead(), that is checked in deleteRecordsWhereAsync
1864
+ && recordStoreStateRef .get ().getStoreHeader ().getRecordCountState () != RecordMetaDataProto .DataStoreInfo .RecordCountState .DISABLED ) {
1855
1865
final QueryToKeyMatcher .Match match = matcher .matchesSatisfyingQuery (recordCountKey );
1856
1866
if (match .getType () != QueryToKeyMatcher .MatchType .EQUALITY ) {
1857
1867
throw new Query .InvalidExpressionException ("Record count key not matching for deleteRecordsWhere" );
@@ -2077,8 +2087,11 @@ private CompletableFuture<Void> run() {
2077
2087
context .clear (versionRange );
2078
2088
}
2079
2089
2090
+
2080
2091
final KeyExpression recordCountKey = getRecordMetaData ().getRecordCountKey ();
2081
- if (recordCountKey != null ) {
2092
+ if (recordCountKey != null
2093
+ // we don't need to call beginRecordStoreStateRead(), that is checked in deleteRecordsWhereAsync
2094
+ && recordStoreStateRef .get ().getStoreHeader ().getRecordCountState () != RecordMetaDataProto .DataStoreInfo .RecordCountState .DISABLED ) {
2082
2095
if (prefix .size () == recordCountKey .getColumnSize ()) {
2083
2096
// Delete a single record used for counting
2084
2097
context .clear (getSubspace ().pack (Tuple .from (RECORD_COUNT_KEY ).addAll (prefix )));
@@ -2153,17 +2166,39 @@ private CompletableFuture<Long> estimateSize(@Nonnull Range range, long startTim
2153
2166
@ Override
2154
2167
public CompletableFuture <Long > getSnapshotRecordCount (@ Nonnull KeyExpression key , @ Nonnull Key .Evaluated value ,
2155
2168
@ Nonnull IndexQueryabilityFilter indexQueryabilityFilter ) {
2156
- if (getRecordMetaData ().getRecordCountKey () != null ) {
2157
- if (key .getColumnSize () != value .size ()) {
2158
- throw recordCoreException ("key and value are not the same size" );
2159
- }
2160
- final ReadTransaction tr = context .readTransaction (true );
2161
- final Tuple subkey = Tuple .from (RECORD_COUNT_KEY ).addAll (value .toTupleAppropriateList ());
2162
- if (getRecordMetaData ().getRecordCountKey ().equals (key )) {
2163
- return tr .get (getSubspace ().pack (subkey )).thenApply (FDBRecordStore ::decodeRecordCount );
2164
- } else if (key .isPrefixKey (getRecordMetaData ().getRecordCountKey ())) {
2165
- AsyncIterable <KeyValue > kvs = tr .getRange (getSubspace ().range (Tuple .from (RECORD_COUNT_KEY )));
2166
- return MoreAsyncUtil .reduce (getExecutor (), kvs .iterator (), 0L , (count , kv ) -> count + decodeRecordCount (kv .getValue ()));
2169
+ final RecordMetaData recordMetaData = getRecordMetaData ();
2170
+ if (recordMetaData .getRecordCountKey () != null ) {
2171
+ beginRecordStoreStateRead ();
2172
+ boolean futureCreated = false ;
2173
+ try {
2174
+ RecordMetaDataProto .DataStoreInfo header = recordStoreStateRef .get ().getStoreHeader ();
2175
+ // We can always check the state, even if the formatVersion is older, because older versions will always
2176
+ // have the default of READABLE
2177
+ if (header .getRecordCountState () == RecordMetaDataProto .DataStoreInfo .RecordCountState .READABLE ) {
2178
+ if (key .getColumnSize () != value .size ()) {
2179
+ throw recordCoreException ("key and value are not the same size" );
2180
+ }
2181
+ final ReadTransaction tr = context .readTransaction (true );
2182
+ final Tuple subkey = Tuple .from (RECORD_COUNT_KEY ).addAll (value .toTupleAppropriateList ());
2183
+ if (recordMetaData .getRecordCountKey ().equals (key )) {
2184
+ final CompletableFuture <Long > result = tr .get (getSubspace ().pack (subkey ))
2185
+ .thenApply (FDBRecordStore ::decodeRecordCount )
2186
+ .whenComplete ((ignored , error ) -> endRecordStoreStateRead ());
2187
+ futureCreated = true ;
2188
+ return result ;
2189
+ } else if (key .isPrefixKey (recordMetaData .getRecordCountKey ())) {
2190
+ AsyncIterable <KeyValue > kvs = tr .getRange (getSubspace ().range (Tuple .from (RECORD_COUNT_KEY )));
2191
+ final CompletableFuture <Long > result = MoreAsyncUtil .reduce (getExecutor (), kvs .iterator (), 0L ,
2192
+ (count , kv ) -> count + decodeRecordCount (kv .getValue ()))
2193
+ .whenComplete ((ignored , error ) -> endRecordStoreStateRead ());
2194
+ futureCreated = true ;
2195
+ return result ;
2196
+ }
2197
+ }
2198
+ } finally {
2199
+ if (!futureCreated ) {
2200
+ endRecordStoreStateRead ();
2201
+ }
2167
2202
}
2168
2203
}
2169
2204
return evaluateAggregateFunction (Collections .emptyList (), IndexFunctionHelper .count (key ),
@@ -3237,6 +3272,48 @@ public void clearHeaderUserField(@Nonnull String userField) {
3237
3272
context .asyncToSync (FDBStoreTimer .Waits .WAIT_EDIT_HEADER_USER_FIELD , clearHeaderUserFieldAsync (userField ));
3238
3273
}
3239
3274
3275
+ /**
3276
+ * Update the RecordCount to have a new state.
3277
+ * <p>
3278
+ * The state can go from {@code READABLE} to {@code WRITE_ONLY}, which is mostly useful to validate that if you
3279
+ * drop the {@link RecordMetaData#getRecordCountKey() recordCountKey} from the metadata, it won't have adverse
3280
+ * affects.
3281
+ * The state can always be set to {@code DISABLED}, at which point there is no way to any other state. This
3282
+ * limitation exists, in large part, because there is not a way to rebuild the count across indexes, and since
3283
+ * it is long deprecated, investing in that doesn't make sense.
3284
+ * </p>
3285
+ * @param newState the new state to update it to.
3286
+ * @return a future once that completes once the state is updated, and the data is cleared.
3287
+ */
3288
+ public CompletableFuture <Void > updateRecordCountStateAsync (@ Nonnull RecordMetaDataProto .DataStoreInfo .RecordCountState newState ) {
3289
+ return updateStoreHeaderAsync (builder -> {
3290
+ if (!getFormatVersionEnum ().isAtLeast (FormatVersion .RECORD_COUNT_STATE )) {
3291
+ throw new RecordCoreException ("Store does not support updating record count state" )
3292
+ .addLogInfo (LogMessageKeys .FORMAT_VERSION , getFormatVersionEnum ());
3293
+ }
3294
+ final RecordMetaDataProto .DataStoreInfo .RecordCountState existing = getRecordStoreState ().getStoreHeader ().getRecordCountState ();
3295
+ if (existing == newState ) {
3296
+ return builder ;
3297
+ }
3298
+ boolean toWriteOnly = existing == RecordMetaDataProto .DataStoreInfo .RecordCountState .READABLE &&
3299
+ newState == RecordMetaDataProto .DataStoreInfo .RecordCountState .WRITE_ONLY ;
3300
+ boolean toReadable = existing == RecordMetaDataProto .DataStoreInfo .RecordCountState .WRITE_ONLY &&
3301
+ newState == RecordMetaDataProto .DataStoreInfo .RecordCountState .READABLE ;
3302
+ if (toWriteOnly || toReadable || newState == RecordMetaDataProto .DataStoreInfo .RecordCountState .DISABLED ) {
3303
+ builder .setRecordCountState (newState );
3304
+ if (newState == RecordMetaDataProto .DataStoreInfo .RecordCountState .DISABLED ) {
3305
+ // Note: getSubspace().range(tuple) does not include getSubspace().pack(tuple), but this does.
3306
+ // Ungrouped recordCountKey will be stored directly at getSubspace().pack(tuple).
3307
+ ensureContextActive ().clear (Range .startsWith (getSubspace ().pack (Tuple .from (RECORD_COUNT_KEY ))));
3308
+ }
3309
+ return builder ;
3310
+ }
3311
+ throw new RecordCoreException ("Invalid state transition for RecordCountState" )
3312
+ .addLogInfo (LogMessageKeys .OLD , existing )
3313
+ .addLogInfo (LogMessageKeys .NEW , newState );
3314
+ });
3315
+ }
3316
+
3240
3317
// Actually (1) writes the index state to the database and (2) updates the cached state with the new state
3241
3318
@ SuppressWarnings ("PMD.CloseResource" )
3242
3319
private void updateIndexState (@ Nonnull String indexName , byte [] indexKey , @ Nonnull IndexState indexState ) {
@@ -4841,6 +4918,8 @@ protected boolean checkPossiblyRebuildRecordCounts(@Nonnull RecordMetaData metaD
4841
4918
4842
4919
if (rebuildRecordCounts ) {
4843
4920
// We want to clear all record counts.
4921
+ // This code will leave data behind if the previous RecordCountKey was not grouped
4922
+ // https://github.com/FoundationDB/fdb-record-layer/issues/3335
4844
4923
if (existingStore ) {
4845
4924
context .clear (getSubspace ().range (Tuple .from (RECORD_COUNT_KEY )));
4846
4925
}
@@ -4865,7 +4944,8 @@ protected boolean checkPossiblyRebuildRecordCounts(@Nonnull RecordMetaData metaD
4865
4944
@ SuppressWarnings ("PMD.CloseResource" )
4866
4945
public void addRebuildRecordCountsJob (List <CompletableFuture <Void >> work ) {
4867
4946
final KeyExpression recordCountKey = getRecordMetaData ().getRecordCountKey ();
4868
- if (recordCountKey == null ) {
4947
+ if (recordCountKey == null ||
4948
+ getRecordStoreState ().getStoreHeader ().getRecordCountState () == RecordMetaDataProto .DataStoreInfo .RecordCountState .DISABLED ) {
4869
4949
return ;
4870
4950
}
4871
4951
if (LOGGER .isDebugEnabled ()) {
0 commit comments