34
34
import java .time .Instant ;
35
35
import java .util .function .Function ;
36
36
import java .util .function .Predicate ;
37
+ import java .util .Map ;
38
+ import java .util .OptionalInt ;
39
+ import java .util .HashMap ;
37
40
import javax .annotation .Nonnull ;
38
41
import javax .security .auth .Subject ;
42
+ import org .dcache .auth .Subjects ;
39
43
import org .dcache .net .FlowMarker .FlowMarkerBuilder ;
40
44
import org .dcache .util .IPMatcher ;
41
45
import org .slf4j .Logger ;
@@ -60,6 +64,8 @@ public class TransferLifeCycle {
60
64
61
65
private boolean enabled ;
62
66
67
+ private Map <String , Integer > voToExpId = new HashMap <>();
68
+
63
69
/**
64
70
* Mark transfer start.
65
71
* @param src remote client endpoint
@@ -82,9 +88,14 @@ public void onStart(InetSocketAddress src, InetSocketAddress dst, ProtocolInfo p
82
88
return ;
83
89
}
84
90
91
+ var optionalExpId = getExperimentId (protocolInfo , subject );
92
+ if (optionalExpId .isEmpty ()) {
93
+ return ;
94
+ }
95
+
85
96
var data = new FlowMarkerBuilder ()
86
97
.withStartedAt (Instant .now ())
87
- .withExperimentId (getExperimentId ( protocolInfo ))
98
+ .withExperimentId (optionalExpId . getAsInt ( ))
88
99
.withActivityId (getActivity (protocolInfo ))
89
100
.wittApplication (getApplication (protocolInfo ))
90
101
.withProtocol ("tcp" )
@@ -118,10 +129,15 @@ public void onEnd(InetSocketAddress src, InetSocketAddress dst, ProtocolInfo pro
118
129
return ;
119
130
}
120
131
132
+ var optionalExpId = getExperimentId (protocolInfo , subject );
133
+ if (optionalExpId .isEmpty ()) {
134
+ return ;
135
+ }
136
+
121
137
var data = new FlowMarkerBuilder ()
122
138
.withStartedAt (Instant .now ())
123
139
.withFinishedAt (Instant .now ())
124
- .withExperimentId (getExperimentId ( protocolInfo ))
140
+ .withExperimentId (optionalExpId . getAsInt ( ))
125
141
.withActivityId (getActivity (protocolInfo ))
126
142
.wittApplication (getApplication (protocolInfo ))
127
143
.withProtocol ("tcp" )
@@ -153,6 +169,29 @@ public void setEnabled(boolean isEnabled) {
153
169
enabled = isEnabled ;
154
170
}
155
171
172
+ /**
173
+ * Configures VO (Virtual Organization) to Experiment ID mapping.
174
+ *
175
+ * @param voMap A comma-separated string of VO mapping entries in the format
176
+ * "voName:expId".
177
+ */
178
+ public void setVoMapping (String voMap ) {
179
+ voToExpId .clear ();
180
+
181
+ for (String entry : voMap .split ("," )) {
182
+ String [] parts = entry .split (":" );
183
+ if (parts .length == 2 ) {
184
+ try {
185
+ voToExpId .put (parts [0 ].trim ().toLowerCase (), Integer .parseInt (parts [1 ].trim ()));
186
+ } catch (NumberFormatException e ) {
187
+ LOGGER .warn ("Invalid VO mapping entry: {}" , entry );
188
+ }
189
+ } else {
190
+ LOGGER .warn ("Invalid VO mapping entry: {}" , entry );
191
+ }
192
+ }
193
+ }
194
+
156
195
/**
157
196
* Send flow marker.
158
197
*
@@ -175,21 +214,6 @@ private void send(InetSocketAddress dst, @Nonnull String payload)
175
214
176
215
private boolean needMarker (ProtocolInfo protocolInfo ) {
177
216
178
- if (protocolInfo .getTransferTag ().isEmpty ()) {
179
- return false ;
180
- }
181
-
182
- try {
183
- int transferTag = Integer .parseInt (protocolInfo .getTransferTag ());
184
- if (transferTag <= 64 || transferTag >= 65536 ) {
185
- LOGGER .warn ("Invalid integer range for transfer tag: {}" , protocolInfo .getTransferTag ());
186
- return false ;
187
- }
188
- } catch (NumberFormatException e ) {
189
- LOGGER .warn ("Invalid transfer tag: {}" , protocolInfo .getTransferTag ());
190
- return false ;
191
- }
192
-
193
217
switch (protocolInfo .getProtocol ().toLowerCase ()) {
194
218
case "xrootd" :
195
219
case "http" :
@@ -204,9 +228,38 @@ private String getApplication(ProtocolInfo protocolInfo) {
204
228
return protocolInfo .getProtocol ().toLowerCase ();
205
229
}
206
230
207
- private int getExperimentId (ProtocolInfo protocolInfo ) {
208
- // scitag = exp_id << 6 | act_id
209
- return Integer .parseInt (protocolInfo .getTransferTag ()) >> 6 ;
231
+ /**
232
+ * Determine experiment ID, initially from the ProtocolInfo (xroot/http),
233
+ * if that fails then fallback to the Subject's primary FQAN.
234
+ *
235
+ * @param protocolInfo the ProtocolInfo object containing transfer-related metadata
236
+ * @param subject the Subject representing the user or entity associated with the transfer
237
+ * @return the experiment ID, or -1 if it cannot be determined
238
+ */
239
+ private OptionalInt getExperimentId (ProtocolInfo protocolInfo , Subject subject ) {
240
+ if (protocolInfo .getTransferTag () != null && !protocolInfo .getTransferTag ().isEmpty ()) {
241
+ try {
242
+ int transferTag = Integer .parseInt (protocolInfo .getTransferTag ());
243
+ if (transferTag <= 64 || transferTag >= 65536 ) {
244
+ LOGGER .warn ("Invalid integer range for transfer tag: {}" , protocolInfo .getTransferTag ());
245
+ return OptionalInt .empty ();
246
+ }
247
+ // scitag = exp_id << 6 | act_id
248
+ return OptionalInt .of (transferTag >> 6 );
249
+ } catch (NumberFormatException e ) {
250
+ LOGGER .warn ("Invalid transfer tag: {}" , protocolInfo .getTransferTag ());
251
+ return OptionalInt .empty ();
252
+ }
253
+ }
254
+
255
+ var vo = Subjects .getPrimaryFqan (subject );
256
+ if (vo == null ) {
257
+ return OptionalInt .empty ();
258
+ }
259
+
260
+ return voToExpId .containsKey (vo .getGroup ().toLowerCase ())
261
+ ? OptionalInt .of (voToExpId .get (vo .getGroup ().toLowerCase ()))
262
+ : OptionalInt .empty ();
210
263
}
211
264
212
265
private boolean isLocalTransfer (InetSocketAddress dst ) {
@@ -215,8 +268,13 @@ private boolean isLocalTransfer(InetSocketAddress dst) {
215
268
}
216
269
217
270
private int getActivity (ProtocolInfo protocolInfo ) {
218
- // scitag = exp_id << 6 | act_id
219
- return Integer .parseInt (protocolInfo .getTransferTag ()) & 0x3F ;
271
+ if (!protocolInfo .getTransferTag ().isEmpty ()) {
272
+ // scitag = exp_id << 6 | act_id
273
+ return Integer .parseInt (protocolInfo .getTransferTag ()) & 0x3F ;
274
+ } else {
275
+ // default activity id = 1
276
+ return 1 ;
277
+ }
220
278
}
221
279
222
280
private String toAFI (InetSocketAddress dst ) {
0 commit comments