42
42
import org .dcache .nfs .v4 .xdr .stateid4 ;
43
43
import org .dcache .nfs .vfs .Inode ;
44
44
import org .dcache .nfs .util .Opaque ;
45
+ import org .slf4j .Logger ;
46
+ import org .slf4j .LoggerFactory ;
45
47
46
48
/**
47
49
* A class which tracks open files.
48
50
*/
49
51
public class FileTracker {
50
52
53
+ public static final Logger LOG = LoggerFactory .getLogger (FileTracker .class );
54
+
51
55
/*
52
56
* we use {@link Striped} locks here to split synchronized block on open files
53
57
* into multiple partitions to increase concurrency, while guaranteeing atomicity
@@ -135,7 +139,7 @@ public NFS4Client getClient() {
135
139
* @param stateid
136
140
* @param delegationType
137
141
*/
138
- record DelegationState (NFS4Client client , stateid4 openStateId , stateid4 stateid , int delegationType ) {
142
+ record DelegationState (NFS4Client client , NFS4State delegationStateid , int delegationType ) {
139
143
140
144
}
141
145
@@ -187,6 +191,43 @@ public OpenRecord addOpen(NFS4Client client, StateOwner owner, Inode inode, int
187
191
throw new ShareDeniedException ("Conflicting share" );
188
192
}
189
193
194
+ /*
195
+ * REVISIT: currently only read-delegations are supported
196
+ */
197
+ var existingDelegations = delegations .get (fileId );
198
+
199
+ /*
200
+ * delegation is possible if:
201
+ * - client has a callback channel
202
+ * - client does not have a delegation for this file
203
+ * - no other open has write access
204
+ */
205
+ boolean canDelegateRead = client .getCB () != null &&
206
+ (existingDelegations == null || existingDelegations .stream ().noneMatch (d -> d .client ().getId () == client .getId ())) &&
207
+ opens .stream ().noneMatch (os -> (os .shareAccess & nfs4_prot .OPEN4_SHARE_ACCESS_WRITE ) != 0 );
208
+
209
+ // recall any read delegations if write
210
+ if ((existingDelegations != null ) && (shareAccess & nfs4_prot .OPEN4_SHARE_ACCESS_WRITE ) != 0 ) {
211
+ var fh = new nfs_fh4 (inode .toNfsHandle ());
212
+ int recalledDelegations = existingDelegations .stream ()
213
+ .filter (d -> d .client ().isLeaseValid ())
214
+ .filter (d -> !d .client ().getId ().equals (client .getId ()))
215
+ .reduce (0 , (c , d ) -> {
216
+ try {
217
+ d .client ().getCB ().cbDelegationRecall (fh , d .delegationStateid ().stateid (), false );
218
+ return c + 1 ;
219
+ } catch (IOException e ) {
220
+ LOG .warn ("Failed to recall delegation from {} : {}" , d .client (), e .toString ());
221
+ d .delegationStateid ().disposeIgnoreFailures ();
222
+ return c ;
223
+ }
224
+ }, Integer ::sum );
225
+
226
+ if (recalledDelegations > 0 ) {
227
+ throw new DelayException ("Recalling read delegations" );
228
+ }
229
+ }
230
+
190
231
// if there is another open from the same client we must merge
191
232
// access mode and return the same stateid as required by rfc5661#18.16.3
192
233
@@ -210,41 +251,6 @@ public OpenRecord addOpen(NFS4Client client, StateOwner owner, Inode inode, int
210
251
}
211
252
}
212
253
213
- /*
214
- * REVISIT: currently only read-delegations are supported
215
- */
216
- var existingDelegations = delegations .get (fileId );
217
-
218
- /*
219
- * delegation is possible if:
220
- * - client has a callback channel
221
- * - client does not have a delegation for this file
222
- * - no other open has write access
223
- */
224
- boolean canDelegate = client .getCB () != null &&
225
- (existingDelegations == null || existingDelegations .stream ().noneMatch (d -> d .client ().getId () == client .getId ())) &&
226
- opens .stream ().noneMatch (os -> (os .shareAccess & nfs4_prot .OPEN4_SHARE_ACCESS_WRITE ) != 0 );
227
-
228
- // recall any read delegations if write
229
- if ((existingDelegations != null ) && (shareAccess & nfs4_prot .OPEN4_SHARE_ACCESS_WRITE ) != 0 ) {
230
-
231
- // REVISIT: usage of Stream#peek is an anti-pattern
232
- boolean haveRecalled = existingDelegations .stream ()
233
- .filter (d -> client .isLeaseValid ())
234
- .peek (d -> {
235
- try {
236
- d .client ().getCB ()
237
- .cbDelegationRecall (new nfs_fh4 (inode .toNfsHandle ()), d .stateid (), false );
238
- } catch (IOException e ) {
239
- // ignore
240
- }
241
- }).count () > 0 ;
242
-
243
- if (haveRecalled ) {
244
- throw new DelayException ("Recalling read delegations" );
245
- }
246
- }
247
-
248
254
NFS4State state = client .createState (owner );
249
255
stateid = state .stateid ();
250
256
OpenState openState = new OpenState (client , owner , stateid , shareAccess , shareDeny );
@@ -256,12 +262,12 @@ public OpenRecord addOpen(NFS4Client client, StateOwner owner, Inode inode, int
256
262
var openStateid = new stateid4 (stateid .other , stateid .seqid );
257
263
258
264
// REVISIT: currently only read-delegations are supported
259
- if (wantReadDelegation && canDelegate ) {
265
+ if (wantReadDelegation && canDelegateRead ) {
260
266
// REVISIT: currently only read-delegations are supported
261
- stateid4 delegationStateid = client .createState (state .getStateOwner (), state ). stateid ( );
267
+ var delegationStateid = client .createState (state .getStateOwner (), state );
262
268
delegations .computeIfAbsent (fileId , x -> new ArrayList <>(1 ))
263
- .add (new DelegationState (client , openStateid , delegationStateid , open_delegation_type4 .OPEN_DELEGATE_READ ));
264
- return new OpenRecord (openStateid , delegationStateid , true );
269
+ .add (new DelegationState (client , delegationStateid , open_delegation_type4 .OPEN_DELEGATE_READ ));
270
+ return new OpenRecord (openStateid , delegationStateid . stateid () , true );
265
271
} else {
266
272
//we need to return copy to avoid modification by concurrent opens
267
273
return new OpenRecord (openStateid , null , false );
@@ -332,7 +338,7 @@ public stateid4 downgradeOpen(NFS4Client client, stateid4 stateid, Inode inode,
332
338
* @param inode the inode of the delegated file.
333
339
*/
334
340
public void delegationReturn (NFS4Client client , stateid4 stateid , Inode inode )
335
- throws StaleException {
341
+ throws ChimeraNFSException {
336
342
337
343
Opaque fileId = new Opaque (inode .getFileId ());
338
344
Lock lock = filesLock .get (fileId );
@@ -345,11 +351,12 @@ public void delegationReturn(NFS4Client client, stateid4 stateid, Inode inode)
345
351
}
346
352
347
353
DelegationState delegation = fileDelegations .stream ()
348
- .filter (d -> d .client ().getId () == client .getId ())
349
- .filter (d -> d .stateid ().equals (stateid ))
354
+ .filter (d -> d .client ().getId (). equals ( client .getId () ))
355
+ .filter (d -> d .delegationStateid (). stateid ().equals (stateid ))
350
356
.findFirst ()
351
357
.orElseThrow (StaleException ::new );
352
358
359
+ delegation .delegationStateid ().tryDispose ();
353
360
fileDelegations .remove (delegation );
354
361
if (fileDelegations .isEmpty ()) {
355
362
delegations .remove (fileId );
@@ -392,22 +399,6 @@ void removeOpen(Inode inode, stateid4 stateid) {
392
399
}
393
400
}
394
401
395
- var existingDelegations = delegations .get (fileId );
396
- if (existingDelegations != null ) {
397
- Iterator <DelegationState > dsi = existingDelegations .iterator ();
398
- while (dsi .hasNext ()) {
399
- stateid4 os = dsi .next ().openStateId ();
400
- if (os .equals (stateid )) {
401
- dsi .remove ();
402
- break ;
403
- }
404
- }
405
-
406
- if (existingDelegations .isEmpty ()) {
407
- delegations .remove (fileId );
408
- }
409
- }
410
-
411
402
} finally {
412
403
lock .unlock ();
413
404
}
0 commit comments