2020package org .dcache .nfs .v4 ;
2121
2222import com .google .common .util .concurrent .Striped ;
23+
24+ import java .io .IOException ;
2325import java .util .ArrayList ;
2426import java .util .Collection ;
2527import java .util .Iterator ;
3032import java .util .stream .Collectors ;
3133import org .dcache .nfs .ChimeraNFSException ;
3234import org .dcache .nfs .status .BadStateidException ;
35+ import org .dcache .nfs .status .DelayException ;
3336import org .dcache .nfs .status .InvalException ;
3437import org .dcache .nfs .status .ShareDeniedException ;
38+ import org .dcache .nfs .status .StaleException ;
3539import org .dcache .nfs .v4 .xdr .nfs4_prot ;
40+ import org .dcache .nfs .v4 .xdr .nfs_fh4 ;
41+ import org .dcache .nfs .v4 .xdr .open_delegation_type4 ;
3642import org .dcache .nfs .v4 .xdr .stateid4 ;
3743import org .dcache .nfs .vfs .Inode ;
3844import org .dcache .nfs .util .Opaque ;
@@ -55,6 +61,11 @@ public class FileTracker {
5561 private final Striped <Lock > filesLock = Striped .lock (Runtime .getRuntime ().availableProcessors ()*4 );
5662 private final Map <Opaque , List <OpenState >> files = new ConcurrentHashMap <>();
5763
64+ /**
65+ * Delegation records associated with open files.
66+ */
67+ private final Map <Opaque , List <DelegationState >> delegations = new ConcurrentHashMap <>();
68+
5869 private static class OpenState {
5970
6071 private final NFS4Client client ;
@@ -118,6 +129,27 @@ public NFS4Client getClient() {
118129 }
119130 }
120131
132+ /**
133+ * Record associated with open-delegation.
134+ * @param client
135+ * @param stateid
136+ * @param delegationType
137+ */
138+ record DelegationState (NFS4Client client , stateid4 openStateId , stateid4 stateid , int delegationType ) {
139+
140+ }
141+
142+ /**
143+ * Record associated with an open file.
144+ *
145+ * @param openStateId
146+ * @param delegationStateId
147+ * @param hasDelegation
148+ */
149+ public record OpenRecord (stateid4 openStateId , stateid4 delegationStateId , boolean hasDelegation ) {
150+
151+ }
152+
121153 /**
122154 * Add a new open to the list of open files. If provided {@code shareAccess}
123155 * and {@code shareDeny} conflicts with existing opens, @{link ShareDeniedException}
@@ -127,11 +159,14 @@ public NFS4Client getClient() {
127159 * @param inode of opened file.
128160 * @param shareAccess type of access required.
129161 * @param shareDeny type of access to deny others.
130- * @return a snapshot of the stateid associated with open.
162+ * @return a snapshot of an OpenRecord associated with open.
131163 * @throws ShareDeniedException if share reservation conflicts with an existing open.
132164 * @throws ChimeraNFSException
133165 */
134- public stateid4 addOpen (NFS4Client client , StateOwner owner , Inode inode , int shareAccess , int shareDeny ) throws ChimeraNFSException {
166+ public OpenRecord addOpen (NFS4Client client , StateOwner owner , Inode inode , int shareAccess , int shareDeny ) throws ChimeraNFSException {
167+
168+ boolean wantReadDelegation = (shareAccess & nfs4_prot .OPEN4_SHARE_ACCESS_WANT_READ_DELEG ) != 0 ;
169+ boolean wantWriteDelegation = (shareAccess & nfs4_prot .OPEN4_SHARE_ACCESS_WANT_WRITE_DELEG ) != 0 ;
135170
136171 Opaque fileId = new Opaque (inode .getFileId ());
137172 Lock lock = filesLock .get (fileId );
@@ -170,7 +205,43 @@ public stateid4 addOpen(NFS4Client client, StateOwner owner, Inode inode, int sh
170205
171206 os .stateid .seqid ++;
172207 //we need to return copy to avoid modification by concurrent opens
173- return new stateid4 (os .stateid .other , os .stateid .seqid );
208+ var openStateid = new stateid4 (os .stateid .other , os .stateid .seqid );
209+ return new OpenRecord (openStateid , null , false );
210+ }
211+ }
212+
213+ /*
214+ * REVISIT: currently only read-delegations are supported
215+ */
216+ var existingDelegations = delegations .get (fileId );
217+
218+ /*
219+ * delegation is possible if:
220+ * - client has a callback channel
221+ * - client does not have a delegation for this file
222+ * - no other open has write access
223+ */
224+ boolean canDelegate = client .getCB () != null &&
225+ (existingDelegations == null || existingDelegations .stream ().noneMatch (d -> d .client ().getId () == client .getId ())) &&
226+ opens .stream ().noneMatch (os -> (os .shareAccess & nfs4_prot .OPEN4_SHARE_ACCESS_WRITE ) != 0 );
227+
228+ // recall any read delegations if write
229+ if ((existingDelegations != null ) && (shareAccess & nfs4_prot .OPEN4_SHARE_ACCESS_WRITE ) != 0 ) {
230+
231+ // REVISIT: usage of Stream#peek is an anti-pattern
232+ boolean haveRecalled = existingDelegations .stream ()
233+ .filter (d -> client .isLeaseValid ())
234+ .peek (d -> {
235+ try {
236+ d .client ().getCB ()
237+ .cbDelegationRecall (new nfs_fh4 (inode .toNfsHandle ()), d .stateid (), false );
238+ } catch (IOException e ) {
239+ // ignore
240+ }
241+ }).count () > 0 ;
242+
243+ if (haveRecalled ) {
244+ throw new DelayException ("Recalling read delegations" );
174245 }
175246 }
176247
@@ -180,8 +251,21 @@ public stateid4 addOpen(NFS4Client client, StateOwner owner, Inode inode, int sh
180251 opens .add (openState );
181252 state .addDisposeListener (s -> removeOpen (inode , stateid ));
182253 stateid .seqid ++;
254+
183255 //we need to return copy to avoid modification by concurrent opens
184- return new stateid4 (stateid .other , stateid .seqid );
256+ var openStateid = new stateid4 (stateid .other , stateid .seqid );
257+
258+ // REVISIT: currently only read-delegations are supported
259+ if (wantReadDelegation && canDelegate ) {
260+ // REVISIT: currently only read-delegations are supported
261+ stateid4 delegationStateid = client .createState (state .getStateOwner (), state ).stateid ();
262+ delegations .computeIfAbsent (fileId , x -> new ArrayList <>(1 ))
263+ .add (new DelegationState (client , openStateid , delegationStateid , open_delegation_type4 .OPEN_DELEGATE_READ ));
264+ return new OpenRecord (openStateid , delegationStateid , true );
265+ } else {
266+ //we need to return copy to avoid modification by concurrent opens
267+ return new OpenRecord (openStateid , null , false );
268+ }
185269 } finally {
186270 lock .unlock ();
187271 }
@@ -241,6 +325,42 @@ public stateid4 downgradeOpen(NFS4Client client, stateid4 stateid, Inode inode,
241325 }
242326 }
243327
328+ /**
329+ * Return delegation for the given file
330+ * @param client nfs client who returns the delegation.
331+ * @param stateid delegation stateid
332+ * @param inode the inode of the delegated file.
333+ */
334+ public void delegationReturn (NFS4Client client , stateid4 stateid , Inode inode )
335+ throws StaleException {
336+
337+ Opaque fileId = new Opaque (inode .getFileId ());
338+ Lock lock = filesLock .get (fileId );
339+ lock .lock ();
340+ try {
341+
342+ var fileDelegations = delegations .get (fileId );
343+ if (fileDelegations == null ) {
344+ throw new StaleException ("no delegation found" );
345+ }
346+
347+ DelegationState delegation = fileDelegations .stream ()
348+ .filter (d -> d .client ().getId () == client .getId ())
349+ .filter (d -> d .stateid ().equals (stateid ))
350+ .findFirst ()
351+ .orElseThrow (StaleException ::new );
352+
353+ fileDelegations .remove (delegation );
354+ if (fileDelegations .isEmpty ()) {
355+ delegations .remove (fileId );
356+ }
357+
358+ } finally {
359+ lock .unlock ();
360+ }
361+ }
362+
363+
244364 /**
245365 * Remove an open from the list.
246366 * @param inode of the opened file
@@ -271,6 +391,23 @@ void removeOpen(Inode inode, stateid4 stateid) {
271391 files .remove (fileId );
272392 }
273393 }
394+
395+ var existingDelegations = delegations .get (fileId );
396+ if (existingDelegations != null ) {
397+ Iterator <DelegationState > dsi = existingDelegations .iterator ();
398+ while (dsi .hasNext ()) {
399+ stateid4 os = dsi .next ().openStateId ();
400+ if (os .equals (stateid )) {
401+ dsi .remove ();
402+ break ;
403+ }
404+ }
405+
406+ if (existingDelegations .isEmpty ()) {
407+ delegations .remove (fileId );
408+ }
409+ }
410+
274411 } finally {
275412 lock .unlock ();
276413 }
0 commit comments