31
31
import com .google .protobuf .InvalidProtocolBufferException ;
32
32
33
33
import javax .annotation .Nonnull ;
34
+ import java .util .HashMap ;
35
+ import java .util .Map ;
34
36
import java .util .UUID ;
35
37
import java .util .concurrent .CompletableFuture ;
38
+ import java .util .concurrent .atomic .AtomicInteger ;
36
39
37
40
public class IndexingHeartbeat {
38
41
// [prefix, xid] -> [indexing-type, genesis time, heartbeat time]
@@ -74,7 +77,7 @@ public CompletableFuture<Void> checkAndUpdateHeartbeat(@Nonnull FDBRecordStore s
74
77
if (!hasNext ) {
75
78
return false ;
76
79
}
77
- validateNonCompetingHeartbeat (iterator .next ());
80
+ validateNonCompetingHeartbeat (iterator .next (), nowMilliseconds () );
78
81
return true ;
79
82
}));
80
83
@@ -84,13 +87,12 @@ public CompletableFuture<Void> checkAndUpdateHeartbeat(@Nonnull FDBRecordStore s
84
87
}
85
88
}
86
89
87
- private void validateNonCompetingHeartbeat (KeyValue kv ) {
90
+ private void validateNonCompetingHeartbeat (KeyValue kv , long now ) {
88
91
final Tuple keyTuple = Tuple .fromBytes (kv .getKey ());
89
92
if (keyTuple .size () < 2 ) { // expecting 8
90
93
return ;
91
94
}
92
95
final UUID otherSessionId = keyTuple .getUUID (keyTuple .size () - 1 );
93
- final long now = nowMilliseconds ();
94
96
if (!otherSessionId .equals (this .sessionId )) {
95
97
try {
96
98
final IndexBuildProto .IndexingHeartbeat otherHeartbeat = IndexBuildProto .IndexingHeartbeat .parseFrom (kv .getValue ());
@@ -101,7 +103,6 @@ private void validateNonCompetingHeartbeat(KeyValue kv) {
101
103
.addLogInfo (LogMessageKeys .EXISTING_SESSION_ID , otherSessionId )
102
104
.addLogInfo (LogMessageKeys .AGE_MILLISECONDS , age )
103
105
.addLogInfo (LogMessageKeys .TIME_LIMIT_MILLIS , leaseLength );
104
- // TODO: log details
105
106
}
106
107
} catch (InvalidProtocolBufferException e ) {
107
108
throw new RuntimeException (e );
@@ -113,8 +114,71 @@ public void clearHeartbeat(@Nonnull FDBRecordStore store, @Nonnull Index index)
113
114
store .ensureContextActive ().clear (IndexingSubspaces .indexheartbeatSubspace (store , index , sessionId ).pack ());
114
115
}
115
116
117
+ public static CompletableFuture <Map <UUID , IndexBuildProto .IndexingHeartbeat >> getIndexingHeartbeats (FDBRecordStore store , Index index , int maxCount ) {
118
+ final Map <UUID , IndexBuildProto .IndexingHeartbeat > ret = new HashMap <>();
119
+ final AsyncIterator <KeyValue > iterator = heartbeatsIterator (store , index );
120
+ final AtomicInteger iterationCount = new AtomicInteger (0 );
121
+ return AsyncUtil .whileTrue (() -> iterator .onHasNext ()
122
+ .thenApply (hasNext -> {
123
+ if (!hasNext ) {
124
+ return false ;
125
+ }
126
+ if (maxCount > 0 && maxCount < iterationCount .incrementAndGet ()) {
127
+ return false ;
128
+ }
129
+ final KeyValue kv = iterator .next ();
130
+ final Tuple keyTuple = Tuple .fromBytes (kv .getKey ());
131
+ if (keyTuple .size () < 2 ) { // expecting 8
132
+ return true ; // ignore, next
133
+ }
134
+ final UUID otherSessionId = keyTuple .getUUID (keyTuple .size () - 1 );
135
+ try {
136
+ final IndexBuildProto .IndexingHeartbeat otherHeartbeat = IndexBuildProto .IndexingHeartbeat .parseFrom (kv .getValue ());
137
+ ret .put (otherSessionId , otherHeartbeat );
138
+ } catch (InvalidProtocolBufferException e ) {
139
+ // put a NONE heartbeat to indicate an invalid item
140
+ ret .put (otherSessionId , IndexBuildProto .IndexingHeartbeat .newBuilder ()
141
+ .setMethod (IndexBuildProto .IndexBuildIndexingStamp .Method .NONE )
142
+ .build ());
143
+ }
144
+ return true ;
145
+ }))
146
+ .thenApply (ignore -> ret );
147
+ }
148
+
149
+ public static CompletableFuture <Integer > clearIndexingHeartbeats (@ Nonnull FDBRecordStore store , @ Nonnull Index index , long minAgenMilliseconds , int maxIteration ) {
150
+ final AsyncIterator <KeyValue > iterator = heartbeatsIterator (store , index );
151
+ final AtomicInteger deleteCount = new AtomicInteger (0 );
152
+ final AtomicInteger iterationCount = new AtomicInteger (0 );
153
+ final long now = nowMilliseconds ();
154
+ return AsyncUtil .whileTrue (() -> iterator .onHasNext ()
155
+ .thenApply (hasNext -> {
156
+ if (!hasNext ) {
157
+ return false ;
158
+ }
159
+ if (maxIteration > 0 && maxIteration < iterationCount .incrementAndGet ()) {
160
+ return false ;
161
+ }
162
+ final KeyValue kv = iterator .next ();
163
+ boolean shouldRemove ;
164
+ try {
165
+ final IndexBuildProto .IndexingHeartbeat otherHeartbeat = IndexBuildProto .IndexingHeartbeat .parseFrom (kv .getValue ());
166
+ // remove heartbeat if too old
167
+ shouldRemove = now + minAgenMilliseconds <= otherHeartbeat .getHeartbeatTimeMilliseconds ();
168
+ } catch (InvalidProtocolBufferException e ) {
169
+ // remove heartbeat if invalid
170
+ shouldRemove = true ;
171
+ }
172
+ if (shouldRemove ) {
173
+ store .ensureContextActive ().clear (kv .getKey ());
174
+ deleteCount .incrementAndGet ();
175
+ }
176
+ return true ;
177
+ }))
178
+ .thenApply (ignore -> deleteCount .get ());
179
+ }
116
180
117
- public AsyncIterator <KeyValue > heartbeatsIterator (FDBRecordStore store , Index index ) {
181
+ public static AsyncIterator <KeyValue > heartbeatsIterator (FDBRecordStore store , Index index ) {
118
182
return store .getContext ().ensureActive ().snapshot ().getRange (IndexingSubspaces .indexheartbeatSubspace (store , index ).range ()).iterator ();
119
183
}
120
184
0 commit comments