27
27
import com .apple .foundationdb .record .logging .LogMessageKeys ;
28
28
import com .apple .foundationdb .record .metadata .Index ;
29
29
import com .apple .foundationdb .synchronizedsession .SynchronizedSessionLockedException ;
30
- import com .apple .foundationdb .tuple .Tuple ;
31
30
import com .google .protobuf .InvalidProtocolBufferException ;
32
31
33
32
import javax .annotation .Nonnull ;
@@ -73,12 +72,29 @@ public CompletableFuture<Void> checkAndUpdateHeartbeat(@Nonnull FDBRecordStore s
73
72
case MULTI_TARGET_BY_RECORDS :
74
73
case BY_INDEX :
75
74
final AsyncIterator <KeyValue > iterator = heartbeatsIterator (store , index );
75
+ final long now = nowMilliseconds ();
76
76
return AsyncUtil .whileTrue (() -> iterator .onHasNext ()
77
77
.thenApply (hasNext -> {
78
78
if (!hasNext ) {
79
79
return false ;
80
80
}
81
- validateNonCompetingHeartbeat (iterator .next (), nowMilliseconds ());
81
+ final KeyValue kv = iterator .next ();
82
+ try {
83
+ final UUID otherIndexerId = heartbeatKeyToIndexerId (store , index , kv .getKey ());
84
+ if (!otherIndexerId .equals (this .indexerId )) {
85
+ final IndexBuildProto .IndexBuildHeartbeat otherHeartbeat = IndexBuildProto .IndexBuildHeartbeat .parseFrom (kv .getValue ());
86
+ final long age = now - otherHeartbeat .getHeartbeatTimeMilliseconds ();
87
+ if (age > 0 && age < leaseLength ) {
88
+ throw new SynchronizedSessionLockedException ("Failed to initialize the session because of an existing session in progress" )
89
+ .addLogInfo (LogMessageKeys .INDEXER_ID , indexerId )
90
+ .addLogInfo (LogMessageKeys .EXISTING_INDEXER_ID , otherIndexerId )
91
+ .addLogInfo (LogMessageKeys .AGE_MILLISECONDS , age )
92
+ .addLogInfo (LogMessageKeys .TIME_LIMIT_MILLIS , leaseLength );
93
+ }
94
+ }
95
+ } catch (InvalidProtocolBufferException e ) {
96
+ throw new RuntimeException (e );
97
+ }
82
98
return true ;
83
99
}))
84
100
.thenApply (ignore -> {
@@ -92,29 +108,6 @@ public CompletableFuture<Void> checkAndUpdateHeartbeat(@Nonnull FDBRecordStore s
92
108
}
93
109
}
94
110
95
- private void validateNonCompetingHeartbeat (KeyValue kv , long now ) {
96
- final Tuple keyTuple = Tuple .fromBytes (kv .getKey ());
97
- if (keyTuple .size () < 2 ) { // expecting 8
98
- return ;
99
- }
100
- final UUID otherIndexerId = keyTuple .getUUID (keyTuple .size () - 1 );
101
- if (!otherIndexerId .equals (this .indexerId )) {
102
- try {
103
- final IndexBuildProto .IndexBuildHeartbeat otherHeartbeat = IndexBuildProto .IndexBuildHeartbeat .parseFrom (kv .getValue ());
104
- final long age = now - otherHeartbeat .getHeartbeatTimeMilliseconds ();
105
- if (age > 0 && age < leaseLength ) {
106
- throw new SynchronizedSessionLockedException ("Failed to initialize the session because of an existing session in progress" )
107
- .addLogInfo (LogMessageKeys .INDEXER_ID , indexerId )
108
- .addLogInfo (LogMessageKeys .EXISTING_INDEXER_ID , otherIndexerId )
109
- .addLogInfo (LogMessageKeys .AGE_MILLISECONDS , age )
110
- .addLogInfo (LogMessageKeys .TIME_LIMIT_MILLIS , leaseLength );
111
- }
112
- } catch (InvalidProtocolBufferException e ) {
113
- throw new RuntimeException (e );
114
- }
115
- }
116
- }
117
-
118
111
public void clearHeartbeat (@ Nonnull FDBRecordStore store , @ Nonnull Index index ) {
119
112
store .ensureContextActive ().clear (IndexingSubspaces .indexheartbeatSubspace (store , index , indexerId ).pack ());
120
113
}
@@ -132,11 +125,7 @@ public static CompletableFuture<Map<UUID, IndexBuildProto.IndexBuildHeartbeat>>
132
125
return false ;
133
126
}
134
127
final KeyValue kv = iterator .next ();
135
- final Tuple keyTuple = Tuple .fromBytes (kv .getKey ());
136
- if (keyTuple .size () < 2 ) { // expecting 8
137
- return true ; // ignore, next
138
- }
139
- final UUID otherIndexerId = keyTuple .getUUID (keyTuple .size () - 1 );
128
+ final UUID otherIndexerId = heartbeatKeyToIndexerId (store , index , kv .getKey ());
140
129
try {
141
130
final IndexBuildProto .IndexBuildHeartbeat otherHeartbeat = IndexBuildProto .IndexBuildHeartbeat .parseFrom (kv .getValue ());
142
131
ret .put (otherIndexerId , otherHeartbeat );
@@ -183,10 +172,13 @@ public static CompletableFuture<Integer> clearIndexingHeartbeats(@Nonnull FDBRec
183
172
.thenApply (ignore -> deleteCount .get ());
184
173
}
185
174
186
- public static AsyncIterator <KeyValue > heartbeatsIterator (FDBRecordStore store , Index index ) {
175
+ private static AsyncIterator <KeyValue > heartbeatsIterator (FDBRecordStore store , Index index ) {
187
176
return store .getContext ().ensureActive ().snapshot ().getRange (IndexingSubspaces .indexheartbeatSubspace (store , index ).range ()).iterator ();
188
177
}
189
178
179
+ private static UUID heartbeatKeyToIndexerId (FDBRecordStore store , Index index , byte [] key ) {
180
+ return IndexingSubspaces .indexheartbeatSubspace (store , index ).unpack (key ).getUUID (0 );
181
+ }
190
182
191
183
private static long nowMilliseconds () {
192
184
return System .currentTimeMillis ();
0 commit comments