Skip to content

Commit e682f7d

Browse files
authored
Merge pull request cfpb#1091 from jmarin/dispatchers
Dispatchers
2 parents bc7680c + 6080abf commit e682f7d

File tree

25 files changed

+156
-58
lines changed

25 files changed

+156
-58
lines changed

api/src/main/resources/application.conf

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,4 +70,21 @@ hmda {
7070
zookeeperPort = ${?ZOOKEEPER_PORT}
7171
}
7272

73+
api-dispatcher {
74+
type = Dispatcher
75+
executor = "fork-join-executor"
76+
fork-join-executor {
77+
# Min number of threads to cap factor-based parallelism number to
78+
parallelism-min = 2
79+
# Parallelism (threads) ... ceil(available processors * factor)
80+
parallelism-factor = 2.0
81+
# Max number of threads to cap factor-based parallelism number to
82+
parallelism-max = 10
83+
}
84+
# Throughput defines the maximum number of messages to be
85+
# processed per actor before the thread jumps to the next actor.
86+
# Set to 1 for as fair as possible.
87+
throughput = 100
88+
}
89+
7390

api/src/main/scala/hmda/api/HmdaPlatform.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ object HmdaPlatform {
3434
val system = ActorSystem(configuration.getString("clustering.name"), configuration)
3535
val supervisor = createSupervisor(system)
3636
val querySupervisor = createQuerySupervisor(system)
37-
implicit val ec = system.dispatcher
37+
implicit val ec = system.dispatchers.lookup("api-dispatcher")
3838

3939
startActors(system, supervisor, querySupervisor)
4040
startApi(system, querySupervisor)

persistence-model/src/main/scala/hmda/persistence/model/HmdaActor.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ trait HmdaActor extends Actor with ActorLogging {
66

77
override def preStart(): Unit = {
88
log.info(s"Actor started at ${self.path}")
9+
log.info("Thread name for actor: " + Thread.currentThread().getName)
910
}
1011

1112
override def postStop(): Unit = {
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,20 @@
11
hmda {
22
actor-flow-parallelism = 4
33
}
4+
5+
persistence-dispatcher {
6+
type = Dispatcher
7+
executor = "fork-join-executor"
8+
fork-join-executor {
9+
# Min number of threads to cap factor-based parallelism number to
10+
parallelism-min = 2
11+
# Parallelism (threads) ... ceil(available processors * factor)
12+
parallelism-factor = 2.0
13+
# Max number of threads to cap factor-based parallelism number to
14+
parallelism-max = 10
15+
}
16+
# Throughput defines the maximum number of messages to be
17+
# processed per actor before the thread jumps to the next actor.
18+
# Set to 1 for as fair as possible.
19+
throughput = 100
20+
}

persistence/src/main/scala/hmda/persistence/HmdaSupervisor.scala

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ object HmdaSupervisor {
1616
def props(): Props = Props(new HmdaSupervisor)
1717

1818
def createSupervisor(system: ActorSystem): ActorRef = {
19-
system.actorOf(HmdaSupervisor.props(), "supervisor")
19+
system.actorOf(HmdaSupervisor.props().withDispatcher("persistence-dispatcher"), "supervisor")
2020
}
2121
}
2222

@@ -54,47 +54,52 @@ class HmdaSupervisor extends HmdaSupervisorActor {
5454

5555
override def createActor(name: String): ActorRef = name match {
5656
case id @ SingleLarValidation.name =>
57-
val actor = context.actorOf(SingleLarValidation.props, id)
57+
val actor = context.actorOf(SingleLarValidation.props.withDispatcher("persistence-dispatcher"), id)
5858
supervise(actor, id)
5959
case id @ InstitutionPersistence.name =>
60-
val actor = context.actorOf(InstitutionPersistence.props, id)
60+
val actor = context.actorOf(InstitutionPersistence.props.withDispatcher("persistence-dispatcher"), id)
6161
supervise(actor, id)
6262

6363
}
6464

6565
private def createHmdaFiling(filingPeriod: String): ActorRef = {
66-
val actor = context.actorOf(HmdaFiling.props(filingPeriod), s"${HmdaFiling.name}-$filingPeriod")
66+
val actor = context.actorOf(
67+
HmdaFiling.props(filingPeriod).withDispatcher("persistence-dispatcher"),
68+
s"${HmdaFiling.name}-$filingPeriod"
69+
)
6770
supervise(actor, HmdaFiling.name)
6871
}
6972

7073
private def createFilings(name: String, id: String): ActorRef = {
7174
val filingsId = s"$name-$id"
72-
val actor = context.actorOf(FilingPersistence.props(id), filingsId)
75+
val actor = context.actorOf(FilingPersistence.props(id).withDispatcher("persistence-dispatcher"), filingsId)
7376
supervise(actor, filingsId)
7477
}
7578

7679
private def createSubmissions(name: String, institutionId: String, period: String): ActorRef = {
7780
val sId = s"$name-$institutionId-$period"
78-
val actor = context.actorOf(SubmissionPersistence.props(institutionId, period), sId)
81+
val actor = context.actorOf(SubmissionPersistence
82+
.props(institutionId, period)
83+
.withDispatcher("persistence-dispatcher"), sId)
7984
supervise(actor, sId)
8085
}
8186

8287
private def createProcessingActor(name: String, submissionId: SubmissionId): ActorRef = name match {
8388
case id @ HmdaRawFile.name =>
8489
val actorId = s"$id-${submissionId.toString}"
85-
val actor = context.actorOf(HmdaRawFile.props(submissionId), actorId)
90+
val actor = context.actorOf(HmdaRawFile.props(submissionId).withDispatcher("persistence-dispatcher"), actorId)
8691
supervise(actor, actorId)
8792
case id @ HmdaFileParser.name =>
8893
val actorId = s"$id-${submissionId.toString}"
89-
val actor = context.actorOf(HmdaFileParser.props(submissionId), actorId)
94+
val actor = context.actorOf(HmdaFileParser.props(submissionId).withDispatcher("persistence-dispatcher"), actorId)
9095
supervise(actor, actorId)
9196
case id @ HmdaFileValidator.name =>
9297
val actorId = s"$id-${submissionId.toString}"
93-
val actor = context.actorOf(HmdaFileValidator.props(submissionId), actorId)
98+
val actor = context.actorOf(HmdaFileValidator.props(submissionId).withDispatcher("persistence-dispatcher"), actorId)
9499
supervise(actor, actorId)
95100
case id @ SubmissionManager.name =>
96101
val actorId = s"$id-${submissionId.toString}"
97-
val actor = context.actorOf(SubmissionManager.props(submissionId), actorId)
102+
val actor = context.actorOf(SubmissionManager.props(submissionId).withDispatcher("persistence-dispatcher"), actorId)
98103
supervise(actor, actorId)
99104
}
100105

persistence/src/main/scala/hmda/persistence/institutions/FilingPersistence.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ object FilingPersistence {
1818
def props(institutionId: String): Props = Props(new FilingPersistence(institutionId))
1919

2020
def createFilings(institutionId: String, system: ActorSystem): ActorRef = {
21-
system.actorOf(FilingPersistence.props(institutionId))
21+
system.actorOf(FilingPersistence.props(institutionId).withDispatcher("persistence-dispatcher"))
2222
}
2323

2424
case class FilingState(filings: Seq[Filing] = Nil) {

persistence/src/main/scala/hmda/persistence/institutions/InstitutionPersistence.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ object InstitutionPersistence {
1818
def props: Props = Props(new InstitutionPersistence)
1919

2020
def createInstitutions(system: ActorSystem): ActorRef = {
21-
system.actorOf(InstitutionPersistence.props, "institutions")
21+
system.actorOf(InstitutionPersistence.props.withDispatcher("persistence-dispatcher"), "institutions")
2222
}
2323

2424
case class InstitutionPersistenceState(institutions: Set[Institution] = Set.empty[Institution]) {

persistence/src/main/scala/hmda/persistence/institutions/SubmissionPersistence.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ object SubmissionPersistence {
1919
def props(institutionId: String, period: String): Props = Props(new SubmissionPersistence(institutionId, period))
2020

2121
def createSubmissions(institutionId: String, period: String, system: ActorSystem): ActorRef = {
22-
system.actorOf(SubmissionPersistence.props(institutionId, period))
22+
system.actorOf(SubmissionPersistence.props(institutionId, period).withDispatcher("persistence-dispatcher"))
2323
}
2424

2525
case class SubmissionState(submissions: Seq[Submission] = Nil) {

persistence/src/main/scala/hmda/persistence/processing/HmdaFileParser.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ object HmdaFileParser {
3333
def props(id: SubmissionId): Props = Props(new HmdaFileParser(id))
3434

3535
def createHmdaFileParser(system: ActorSystem, submissionId: SubmissionId): ActorRef = {
36-
system.actorOf(HmdaFileParser.props(submissionId))
36+
system.actorOf(HmdaFileParser.props(submissionId).withDispatcher("persistence-dispatcher"))
3737
}
3838

3939
case class PaginatedFileParseState(tsParsingErrors: Seq[String], larParsingErrors: Seq[LarParsingError], totalErroredLines: Int)

persistence/src/main/scala/hmda/persistence/processing/HmdaFileValidator.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ object HmdaFileValidator {
5151
def props(id: SubmissionId): Props = Props(new HmdaFileValidator(id))
5252

5353
def createHmdaFileValidator(system: ActorSystem, id: SubmissionId): ActorRef = {
54-
system.actorOf(HmdaFileValidator.props(id))
54+
system.actorOf(HmdaFileValidator.props(id).withDispatcher("persistence-dispatcher"))
5555
}
5656

5757
case class HmdaFileValidationState(

0 commit comments

Comments
 (0)