Skip to content

Commit 61b7dbf

Browse files
authored
Scitags: Firefly usage and stats (#7778)
* Introduces usage and storage statistics in fireflies (FlowMaker), which are used to track the data transfer(s) direction and estimate storage performance wrt. network usage. Signed-off-by: Marian Babik <[email protected]>
1 parent cc1b65a commit 61b7dbf

File tree

6 files changed

+82
-9
lines changed

6 files changed

+82
-9
lines changed

modules/dcache-vehicles/src/main/java/diskCacheV111/vehicles/MoverInfoMessage.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ public class MoverInfoMessage extends PnfsFileInfoMessage {
2222
private Duration _readIdle;
2323
private Duration _writeActive;
2424
private Duration _writeIdle;
25+
private double bytesRead = Double.NaN;
26+
private double bytesWritten = Double.NaN;
2527

2628
private static final long serialVersionUID = -7013160118909496211L;
2729
private String _transferPath;
@@ -140,6 +142,23 @@ public void setLocalEndpoint(InetSocketAddress endpoint) {
140142
public Optional<InetSocketAddress> getLocalEndpoint() {
141143
return Optional.ofNullable(_localEndpoint);
142144
}
145+
146+
public double getBytesRead() {
147+
return bytesRead;
148+
}
149+
150+
public void setBytesRead(double bytesRead) {
151+
this.bytesRead = bytesRead;
152+
}
153+
154+
public double getBytesWritten() {
155+
return bytesWritten;
156+
}
157+
158+
public void setBytesWritten(double bytesWritten) {
159+
this.bytesWritten = bytesWritten;
160+
}
161+
143162
@Override
144163
public String toString() {
145164
return "MoverInfoMessage{" +

modules/dcache/src/main/java/org/dcache/net/FlowMarker.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,13 @@
1818
package org.dcache.net;
1919

2020
import com.google.common.net.InetAddresses;
21+
22+
import diskCacheV111.vehicles.MoverInfoMessage;
23+
2124
import java.net.InetSocketAddress;
2225
import java.time.Instant;
2326
import java.time.format.DateTimeFormatter;
27+
2428
import org.json.JSONObject;
2529
import org.slf4j.Logger;
2630
import org.slf4j.LoggerFactory;
@@ -40,6 +44,8 @@ public static class FlowMarkerBuilder {
4044
private final JSONObject lifecycle = new JSONObject();
4145
private final JSONObject context = new JSONObject();
4246
private final JSONObject flow = new JSONObject();
47+
private final JSONObject usage = new JSONObject();
48+
private final JSONObject stats = new JSONObject();
4349

4450
public FlowMarkerBuilder withStartedAt(Instant startTime) {
4551
lifecycle.put("start-time", DateTimeFormatter.ISO_INSTANT.format(startTime));
@@ -104,6 +110,31 @@ public FlowMarkerBuilder withProtocol(String proto) {
104110
return this;
105111
}
106112

113+
public FlowMarkerBuilder withUsage(double bytesRead, double bytesWritten) {
114+
// approx. - assumes bytesWritten to disk are bytes "received" over the network
115+
// and bytesRead from disk are bytes "sent" to the network
116+
usage.put("received", Double.isFinite(bytesWritten) ? (long) bytesWritten : 0L);
117+
usage.put("sent", Double.isFinite(bytesRead) ? (long) bytesRead : 0L);
118+
return this;
119+
}
120+
121+
public FlowMarkerBuilder withStats(MoverInfoMessage message) {
122+
stats.put("bytes-transferred", message.getDataTransferred());
123+
stats.put("connection-time", message.getConnectionTime());
124+
stats.put("read-bw", Double.isFinite(message.getMeanReadBandwidth()) ? (long) message.getMeanReadBandwidth() : 0L);
125+
stats.put("write-bw", Double.isFinite(message.getMeanWriteBandwidth()) ? (long) message.getMeanWriteBandwidth() : 0L);
126+
stats.put("read-active", message.getReadActive().isPresent() ? message.getReadActive().get().toString() : "0");
127+
stats.put("read-idle", message.getReadIdle().isPresent() ? message.getReadIdle().get().toString() : "0");
128+
stats.put("write-active", message.getWriteActive().isPresent() ? message.getWriteActive().get().toString() : "0");
129+
stats.put("write-idle", message.getWriteIdle().isPresent() ? message.getWriteIdle().get().toString() : "0");
130+
return this;
131+
}
132+
133+
public FlowMarkerBuilder withFlowId(String id) {
134+
flow.put("flow-id", id);
135+
return this;
136+
}
137+
107138
public String build(String state) {
108139

109140
switch (state) {
@@ -119,6 +150,9 @@ public String build(String state) {
119150
payload.put("flow-lifecycle", lifecycle);
120151
payload.put("context", context);
121152
payload.put("flow-id", flow);
153+
payload.put("usage", usage);
154+
if(!stats.isEmpty())
155+
payload.put("storage-stats", stats);
122156

123157
lifecycle.put("state", state);
124158
lifecycle.put("current-time", DateTimeFormatter.ISO_INSTANT.format(Instant.now()));

modules/dcache/src/main/java/org/dcache/pool/classic/DefaultPostTransferService.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,12 +208,14 @@ private static MoverInfoMessage updateIoStatistics(MoverInfoMessage info,
208208
SnapshotStatistics writeStats = writes.statistics();
209209

210210
if (readStats.requestedBytes().getN() > 0) {
211+
info.setBytesRead(readStats.transferredBytes().getSum());
211212
info.setMeanReadBandwidth(readStats.instantaneousBandwidth().getMean());
212213
info.setReadActive(reads.active());
213214
info.setReadIdle(reads.idle());
214215
}
215216

216217
if (writeStats.requestedBytes().getN() > 0) {
218+
info.setBytesWritten(writeStats.transferredBytes().getSum());
217219
info.setMeanWriteBandwidth(writeStats.instantaneousBandwidth().getMean());
218220
info.setWriteActive(writes.active());
219221
info.setWriteIdle(writes.idle());
@@ -240,8 +242,7 @@ public void sendFinished(Mover<?> mover, MoverInfoMessage moverInfoMessage) {
240242
mover.getLocalEndpoint().ifPresent(e ->
241243
transferLifeCycle.onEnd(((IpProtocolInfo) mover.getProtocolInfo()).getSocketAddress(),
242244
e,
243-
mover.getProtocolInfo(),
244-
mover.getSubject()));
245+
moverInfoMessage));
245246

246247
_door.notify(mover.getPathToDoor(), finished);
247248
}

modules/dcache/src/main/java/org/dcache/pool/movers/TransferLifeCycle.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222

2323
import com.google.common.base.Strings;
2424
import com.google.common.net.HostAndPort;
25+
26+
import diskCacheV111.vehicles.MoverInfoMessage;
2527
import diskCacheV111.vehicles.ProtocolInfo;
2628
import java.io.IOException;
2729
import java.net.DatagramPacket;
@@ -63,6 +65,7 @@ public class TransferLifeCycle {
6365
private Predicate<InetAddress> localSubnet = a -> false;
6466

6567
private boolean enabled;
68+
private boolean storageStatisticsEnabled;
6669

6770
private Map<String, Integer> voToExpId = new HashMap<>();
6871

@@ -75,7 +78,7 @@ public class TransferLifeCycle {
7578
*/
7679
public void onStart(InetSocketAddress src, InetSocketAddress dst, ProtocolInfo protocolInfo,
7780
Subject subject) {
78-
81+
7982
if (!enabled) {
8083
return;
8184
}
@@ -114,9 +117,11 @@ public void onStart(InetSocketAddress src, InetSocketAddress dst, ProtocolInfo p
114117
* @param protocolInfo access protocol information
115118
* @param subject associated with the transfer
116119
*/
117-
public void onEnd(InetSocketAddress src, InetSocketAddress dst, ProtocolInfo protocolInfo,
118-
Subject subject) {
120+
public void onEnd(InetSocketAddress src, InetSocketAddress dst, MoverInfoMessage mover) {
121+
ProtocolInfo protocolInfo = mover.getProtocolInfo();
122+
Subject subject = mover.getSubject();
119123

124+
120125
if (!enabled) {
121126
return;
122127
}
@@ -140,13 +145,17 @@ public void onEnd(InetSocketAddress src, InetSocketAddress dst, ProtocolInfo pro
140145
.withExperimentId(optionalExpId.getAsInt())
141146
.withActivityId(getActivity(protocolInfo))
142147
.wittApplication(getApplication(protocolInfo))
148+
.withUsage(mover.getBytesRead(), mover.getBytesWritten())
143149
.withProtocol("tcp")
144150
.withAFI(toAFI(dst))
145151
.withDestination(dst)
146-
.withSource(src)
147-
.build("end");
152+
.withSource(src);
153+
if (storageStatisticsEnabled) {
154+
data.withStats(mover);
155+
}
156+
var firefly = data.build("end");
148157

149-
send(toFireflyDestination.apply(src), data);
158+
send(toFireflyDestination.apply(src), firefly);
150159
}
151160

152161
public void setFireflyDestination(String addr) {
@@ -169,6 +178,10 @@ public void setEnabled(boolean isEnabled) {
169178
enabled = isEnabled;
170179
}
171180

181+
public void setStorageStatisticsEnabled(boolean isEnabled) {
182+
storageStatisticsEnabled = isEnabled;
183+
}
184+
172185
/**
173186
* Configures VO (Virtual Organization) to Experiment ID mapping.
174187
*

modules/dcache/src/main/resources/org/dcache/pool/classic/pool.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,7 @@
351351
<property name="fireflyDestination" value="${pool.firefly.destination}" />
352352
<property name="excludes" value="${pool.firefly.excludes}" />
353353
<property name="voMapping" value="${pool.firefly.vo-mapping}" />
354+
<property name="storageStatisticsEnabled" value="${pool.firefly.storage-statistics}" />
354355
</bean>
355356

356357
<bean id="default-transfer-service" class="org.dcache.pool.classic.MoverMapTransferService"

skel/share/defaults/pool.properties

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -948,4 +948,9 @@ pool.firefly.excludes=
948948
#
949949
# example: atlas:2, cms:3,etc.
950950
#
951-
pool.firefly.vo-mapping=atlas:2, cms:3, lhcb:4, alice:5, belleii:6, ska:7, dune:8, lsst:9, ilc:10, auger:11, juno:12, nova:13, xenon:14
951+
pool.firefly.vo-mapping=atlas:2, cms:3, lhcb:4, alice:5, belleii:6, ska:7, dune:8, lsst:9, ilc:10, auger:11, juno:12, nova:13, xenon:14
952+
953+
#
954+
# Enable sending storage statistics as part of the firefly packets
955+
#
956+
pool.firefly.storage-statistics=false

0 commit comments

Comments
 (0)