Skip to content

Commit a9f5e77

Browse files
committed
pool: implememt hot file replication
Motivation: ----------- Thousands of simultaneous accesses to the same set of files (so called popular files) result in hot pools that dCache previously mitigated by hot pool replication that triggered p2p of any files accessed on the hot pool when certain, configurable, cost parameters were met. Proper tuning of this feature is non-trivial and often hot pool replication only further aggravates the situation. Modification: ------------- Introduce hot file replication mechanism on pools, where the pool itself tiggers replication of hot (popular) files. When the number of in-flight transfers for a file reaches certain configurable threshold, the pool invokes the migration module to create a pre-defined configurable number of replicas on other pools in the same pool group. Result: ------- Hot spots are eliminated by spreading the hot (popular) files across multiple pools, thus spreading the load across multiple pools. Ticket: - Acked-by: Dmitry Litvintsev, Tigran Mkrtchyan Patch: https://rb.dcache.org/r/14583/diff/raw/ Commit: Target: trunk Request: - Require-book: yes Require-notes: yes
1 parent 924ef93 commit a9f5e77

31 files changed

+3037
-353
lines changed

docs/TheBook/src/main/markdown/rf-cc-pool.md

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -485,15 +485,21 @@ The task completed successfully
485485

486486
## migration ls
487487

488-
migration ls - Lists all migration jobs.
488+
migration ls - List migration jobs.
489489

490490
### synopsis
491491

492492
migration ls
493493

494494
### Description
495495

496-
Lists all migration jobs.
496+
Lists migration jobs.
497+
498+
The list of migration jobs returned includes:
499+
500+
- All non-hot file migration jobs
501+
- Running hot file migration jobs
502+
- A limited number of finished hot file migration jobs (sorted by creation time).
497503

498504
## migration move
499505

@@ -531,4 +537,58 @@ migration resume job
531537

532538
Resumes a suspended migration job.
533539

540+
## hotfile set replicas
541+
542+
hotfile set replicas - Set the number of replicas to create for hot files.
543+
544+
### synopsis
545+
546+
hotfile set replicas <replicas>
547+
548+
<replicas>
549+
The number of replicas to maintain for files identified as hot.
550+
551+
### Description
552+
553+
Sets the target number of replicas for files that exceed the hot file threshold. The system will attempt to create this many copies of the file on different pools (and preferably different hosts) to distribute the load.
554+
555+
## hotfile get replicas
556+
557+
hotfile get replicas - Get the current hot file replication factor.
558+
559+
### synopsis
560+
561+
hotfile get replicas
562+
563+
### Description
564+
565+
Returns the currently configured number of replicas for hot files.
566+
567+
## hotfile set threshold
568+
569+
hotfile set threshold - Set the access frequency threshold for identifying hot files.
570+
571+
### synopsis
572+
573+
hotfile set threshold <threshold>
574+
575+
<threshold>
576+
The threshold value (e.g., accesses per minute) that triggers hot file replication
577+
578+
### Description
579+
580+
Sets the threshold for classifying a file as "hot". Files accessed more frequently than this threshold will be candidates for automatic replication.
581+
582+
## hotfile get threshold
583+
584+
hotfile get threshold - Get the current threshold for triggering hot file replication
585+
586+
### synopsis
587+
588+
hotfile get threshold
589+
590+
### Description
591+
592+
Returns the current threshold for triggering hot file replication
593+
534594
[???]: #cf-pm-classic-space

modules/dcache-resilience/src/main/java/org/dcache/resilience/util/CopyLocationExtractor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
6262
import java.util.Collection;
6363
import java.util.Map;
6464
import org.dcache.resilience.data.PoolInfoMap;
65+
import org.dcache.util.pool.AbstractLocationExtractor;
6566

6667
/**
6768
* <p>Implementation of the {@link AbstractLocationExtractor}

modules/dcache-resilience/src/main/java/org/dcache/resilience/util/CostModuleLocationExtractor.java

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -59,29 +59,23 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
5959
*/
6060
package org.dcache.resilience.util;
6161

62-
import com.google.common.collect.ImmutableMap;
6362
import diskCacheV111.poolManager.CostModule;
6463
import java.util.Collection;
65-
import java.util.Map;
66-
import org.dcache.poolmanager.PoolInfo;
64+
import org.dcache.util.pool.CostModuleTagProvider;
65+
import org.dcache.util.pool.PoolTagBasedExtractor;
6766

6867
/**
69-
* <p>Implementation of the {@link AbstractLocationExtractor} which uses
70-
* {@link CostModule} to extract tags.</p>
68+
* <p>Implementation of the {@link PoolTagBasedExtractor} which uses
69+
* {@link CostModule} to extract tags for resilience operations.</p>
7170
*/
72-
public final class CostModuleLocationExtractor extends AbstractLocationExtractor {
73-
74-
private CostModule module;
71+
public final class CostModuleLocationExtractor extends PoolTagBasedExtractor {
7572

7673
public CostModuleLocationExtractor(Collection<String> onlyOneCopyPer,
77-
CostModule module) {
78-
super(onlyOneCopyPer);
79-
this.module = module;
74+
CostModule costModule) {
75+
super(onlyOneCopyPer, createTagProvider(costModule));
8076
}
8177

82-
@Override
83-
protected Map<String, String> getPoolTagsFor(String location) {
84-
PoolInfo poolinfo = module.getPoolInfo(location);
85-
return poolinfo == null ? ImmutableMap.of() : poolinfo.getTags();
78+
private static CostModuleTagProvider createTagProvider(CostModule costModule) {
79+
return new CostModuleTagProvider(costModule);
8680
}
8781
}

modules/dcache-resilience/src/main/java/org/dcache/resilience/util/RemoveLocationExtractor.java

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -69,23 +69,19 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
6969
import java.util.Set;
7070
import java.util.stream.Collectors;
7171
import org.dcache.resilience.data.PoolInfoMap;
72+
import org.dcache.util.pool.AbstractLocationExtractor;
7273

7374
/**
74-
* <p> Implementation of the {@link PoolTagConstraintDiscriminator} which returns
75-
* a list containing the pools or locations with maximum weight. The weights are computed in terms
76-
* of the sum of the size of the value partitions to which they belong. For instance, if pool1 has
77-
* tag1 with value A and tag2 with value X, and A is shared by 3 other pools but X by 2, then pool1
78-
* has a weight of 5.</p>
75+
* <p> Implementation of the {@link AbstractLocationExtractor}
76+
* which uses {@link PoolInfoMap} to get the pool tags.</p>
7977
*
80-
* <p>The use case envisaged here is as follows: a pnfsId/file has a set
81-
* of locations which exceeds the maximum by some number K. The caller responsible for removal will
82-
* run getCandidateLocations() on this list up to K times, each time picking a location from the
83-
* returned list using some selection algorithm, and removing the selected location from the
84-
* original list, which is then passed in to the i+1 iteration. The set of weights is recomputed at
85-
* each iteration based on the remaining locations.</p>
78+
* <p>A RemoveLocationExtractor can receive notification that one or more locations are no longer
79+
* hosting or should no longer host a given replica. These locations are added to the excluded set,
80+
* and then subtracted from the candidate locations returned by the call to
81+
* getCandidateLocations().</p>
8682
*/
8783
public final class RemoveLocationExtractor
88-
extends PoolTagConstraintDiscriminator {
84+
extends AbstractLocationExtractor {
8985

9086
class WeightedLocation {
9187

modules/dcache/pom.xml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,13 @@
262262
<artifactId>spring-tx</artifactId>
263263
</dependency>
264264

265+
<dependency>
266+
<groupId>org.springframework</groupId>
267+
<artifactId>spring-test</artifactId>
268+
<version>${version.spring}</version>
269+
<scope>test</scope>
270+
</dependency>
271+
265272
<dependency>
266273
<groupId>org.aspectj</groupId>
267274
<artifactId>aspectjrt</artifactId>
@@ -382,6 +389,13 @@
382389
<artifactId>everit-json-schema</artifactId>
383390
<scope>test</scope>
384391
</dependency>
392+
<!-- Mockito for unit testing -->
393+
<dependency>
394+
<groupId>org.mockito</groupId>
395+
<artifactId>mockito-core</artifactId>
396+
<version>3.2.4</version>
397+
<!-- <scope>test</scope> Fails if this is uncommented; no idea why -->
398+
</dependency>
385399
</dependencies>
386400

387401
<build>
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package org.dcache.pool.classic;
2+
3+
import diskCacheV111.util.PnfsId;
4+
5+
/**
6+
* Abstract interface for monitoring file requests in the pool.
7+
*/
8+
public interface FileRequestMonitor {
9+
10+
/**
11+
* Report a file request for the given pnfsId and number of requests.
12+
*
13+
* @param pnfsId the file identifier
14+
* @param numberOfRequests the number of requests for this file
15+
*/
16+
void reportFileRequest(PnfsId pnfsId, long numberOfRequests);
17+
}
18+

modules/dcache/src/main/java/org/dcache/pool/classic/IoQueueManager.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import static java.util.stream.Collectors.joining;
66

77
import diskCacheV111.util.CacheException;
8+
import diskCacheV111.util.PnfsId;
89
import diskCacheV111.vehicles.IoJobInfo;
910
import dmg.cells.nucleus.CellCommandListener;
1011
import dmg.cells.nucleus.CellSetupProvider;
@@ -213,12 +214,16 @@ private static void toMoverString(MoverRequestScheduler.PrioritizedRequest j,
213214
sb.append(j.getId()).append(" : ").append(j).append('\n');
214215
if (displaySubject) {
215216
sb.append(
216-
j.getMover().getSubject().getPrincipals().stream().map(Objects::toString).collect(
217-
Collectors.joining(",", " <", ">")))
218-
.append("\n");
217+
j.getMover().getSubject().getPrincipals().stream().map(Objects::toString).collect(
218+
Collectors.joining(",", " <", ">")))
219+
.append("\n");
219220
}
220221
}
221222

223+
public long numberOfRequestsFor(PnfsId pnfsId) {
224+
return queues().stream().mapToLong((q) -> q.numberOfRequestsFor(pnfsId)).sum();
225+
}
226+
222227
@AffectsSetup
223228
@Command(name = "mover set max active",
224229
hint = "set the maximum number of active client transfers",

modules/dcache/src/main/java/org/dcache/pool/classic/MoverRequestScheduler.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import diskCacheV111.pools.PoolCostInfo.NamedPoolQueueInfo;
1313
import diskCacheV111.util.CacheException;
1414
import diskCacheV111.util.DiskErrorCacheException;
15+
import diskCacheV111.util.PnfsId;
1516
import diskCacheV111.vehicles.IoJobInfo;
1617
import diskCacheV111.vehicles.JobInfo;
1718
import diskCacheV111.vehicles.ProtocolInfo;
@@ -56,6 +57,11 @@ public class MoverRequestScheduler {
5657
private static final long DEFAULT_LAST_ACCESSED = 0;
5758
private static final long DEFAULT_TOTAL = 0;
5859

60+
public long numberOfRequestsFor(PnfsId pnfsId) {
61+
return _jobs.values().stream()
62+
.filter((pr) -> pr.getMover().getFileAttributes().getPnfsId().equals(pnfsId)).count();
63+
}
64+
5965
/**
6066
* A RuntimeException that wraps a CacheException.
6167
*/
@@ -184,8 +190,9 @@ public synchronized void setOrder(Order order) {
184190
}
185191

186192
/**
187-
* Get mover id for given door request. If there is no mover associated with {@code
188-
* doorUniqueueRequest} a new mover will be created by using provided {@code moverSupplier}.
193+
* Get mover id for given door request. If there is no mover associated with
194+
* {@code doorUniqueueRequest} a new mover will be created by using provided
195+
* {@code moverSupplier}.
189196
* <p>
190197
* The returned mover id generated with following encoding: | 31- queue id -24|23- job id -0|
191198
*
@@ -517,16 +524,17 @@ public void failed(Throwable exc, Void attachment) {
517524
FaultAction faultAction = null;
518525
//TODO this is done because the FileStoreState is in another module
519526
// to be improved
520-
switch (((DiskErrorCacheException) exc).checkStatus(exc.getMessage())){
527+
switch (((DiskErrorCacheException) exc).checkStatus(
528+
exc.getMessage())) {
521529
case READ_ONLY:
522530
faultAction = FaultAction.READONLY;
523-
break;
531+
break;
524532
default:
525533
faultAction = FaultAction.DISABLED;
526-
break;
534+
break;
527535
}
528536
FaultEvent faultEvent = new FaultEvent("transfer",
529-
faultAction, exc.getMessage(), exc);
537+
faultAction, exc.getMessage(), exc);
530538
_faultListeners.forEach(l -> l.faultOccurred(faultEvent));
531539
} else if (exc instanceof OutOfDiskException) {
532540
FaultEvent faultEvent = new FaultEvent(
@@ -552,7 +560,8 @@ public void completed(Void result, Void attachment) {
552560
public void failed(Throwable exc, Void attachment) {
553561
if (exc instanceof DiskErrorCacheException) {
554562
FaultAction faultAction = null;
555-
switch (((DiskErrorCacheException) exc).checkStatus(exc.getMessage())){
563+
switch (((DiskErrorCacheException) exc).checkStatus(
564+
exc.getMessage())) {
556565
case READ_ONLY:
557566
faultAction = FaultAction.READONLY;
558567
break;
@@ -562,7 +571,7 @@ public void failed(Throwable exc, Void attachment) {
562571
}
563572
FaultEvent faultEvent = new FaultEvent(
564573
"post-processing",
565-
faultAction,
574+
faultAction,
566575
exc.getMessage(), exc);
567576
_faultListeners.forEach(
568577
l -> l.faultOccurred(faultEvent));

modules/dcache/src/main/java/org/dcache/pool/classic/PoolV4.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,12 +214,31 @@ public class PoolV4
214214

215215
private ThreadFactory _threadFactory;
216216

217+
// Hot file monitoring
218+
private FileRequestMonitor _fileRequestMonitor;
219+
private boolean _hotFileReplicationEnabled = true;
220+
221+
public void setFileRequestMonitor(FileRequestMonitor fileRequestMonitor) {
222+
_fileRequestMonitor = fileRequestMonitor;
223+
}
224+
225+
public FileRequestMonitor getFileRequestMonitor() {
226+
return _fileRequestMonitor;
227+
}
228+
229+
@Required
230+
public void setHotFileReplicationEnabled(boolean hotFileReplicationEnabled) {
231+
_hotFileReplicationEnabled = hotFileReplicationEnabled;
232+
}
233+
234+
public boolean getHotFileReplicationEnabled() {
235+
return _hotFileReplicationEnabled;
236+
}
217237

218238
protected void assertNotRunning(String error) {
219239
checkState(!_running, error);
220240
}
221241

222-
223242
@Autowired(required = false)
224243
@Qualifier("remove")
225244
public void setKafkaTemplate(KafkaTemplate kafkaTemplate) {
@@ -581,7 +600,8 @@ public void stateChanged(StateChangeEvent event) {
581600
try {
582601
_kafkaSender.accept(msg);
583602
} catch (KafkaException | org.apache.kafka.common.KafkaException e) {
584-
LOGGER.warn("Failed to send message to kafka: {} ", Throwables.getRootCause(e).getMessage());
603+
LOGGER.warn("Failed to send message to kafka: {} ",
604+
Throwables.getRootCause(e).getMessage());
585605
}
586606
}
587607
}
@@ -732,6 +752,12 @@ private int queueIoRequest(CellMessage envelope, PoolIoFileMessage message)
732752
private void ioFile(CellMessage envelope, PoolIoFileMessage message) {
733753
try {
734754
message.setMoverId(queueIoRequest(envelope, message));
755+
LOGGER.debug("moverId {} received request for pnfsId {}", message.getMoverId(),
756+
message.getPnfsId());
757+
if (_hotFileReplicationEnabled) {
758+
_fileRequestMonitor.reportFileRequest(message.getPnfsId(),
759+
_ioQueue.numberOfRequestsFor(message.getPnfsId()));
760+
}
735761
message.setSucceeded();
736762
} catch (OutOfDateCacheException e) {
737763
if (_pingLimiter.tryAcquire()) {

0 commit comments

Comments
 (0)