Skip to content

Commit 1bf6c3c

Browse files
committed
Merge branch 'master' into cluster-management
2 parents 149e9c9 + bd9f5df commit 1bf6c3c

File tree

36 files changed

+236
-101
lines changed

36 files changed

+236
-101
lines changed

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,10 @@ export ZOOKEEPER_HOST=192.168.99.100
144144
export ZOOKEEPER_PORT=2181
145145
```
146146

147+
Alternatively, these dependencies (`Cassandra`, `Zookeeper` and `PostgreSQL`) can be started from `docker` providing default resources for the `HMDA Platform`:
148+
149+
`docker-compose -f docker-dev.yml up`
150+
147151
* If you want to use the sample files in this repo for testing the app, run the edits in demo mode. Otherwise, edit S025 will trigger for all files.
148152

149153
```shell

api/src/main/resources/application.conf

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,4 +77,21 @@ hmda {
7777
zookeeperPort = ${?ZOOKEEPER_PORT}
7878
}
7979

80+
api-dispatcher {
81+
type = Dispatcher
82+
executor = "fork-join-executor"
83+
fork-join-executor {
84+
# Min number of threads to cap factor-based parallelism number to
85+
parallelism-min = 2
86+
# Parallelism (threads) ... ceil(available processors * factor)
87+
parallelism-factor = 2.0
88+
# Max number of threads to cap factor-based parallelism number to
89+
parallelism-max = 10
90+
}
91+
# Throughput defines the maximum number of messages to be
92+
# processed per actor before the thread jumps to the next actor.
93+
# Set to 1 for as fair as possible.
94+
throughput = 100
95+
}
96+
8097

api/src/main/scala/hmda/api/HmdaPlatform.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ object HmdaPlatform {
3838
ClusterHttpManagement(cluster).start()
3939
val supervisor = createSupervisor(system)
4040
val querySupervisor = createQuerySupervisor(system)
41-
implicit val ec = system.dispatcher
41+
implicit val ec = system.dispatchers.lookup("api-dispatcher")
4242

4343
startActors(system, supervisor, querySupervisor)
4444
startApi(system, querySupervisor)

docker-dev.yml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
version: '2'
2+
3+
services:
4+
zookeeper:
5+
image: jplock/zookeeper
6+
ports:
7+
- '2181:2181'
8+
9+
cassandra:
10+
image: cassandra
11+
ports:
12+
- '9042:9042'
13+
- '7000:7000'
14+
- '7199:7199'
15+
16+
query_db:
17+
image: postgres:9.6.1
18+
ports:
19+
- '54321:5432'
20+
environment:
21+
POSTGRES_DB: hmda
22+
POSTGRES_USER: postgres
23+
POSTGRES_PASSWORD: postgres

persistence-model/src/main/scala/hmda/persistence/model/HmdaActor.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ trait HmdaActor extends Actor with ActorLogging {
66

77
override def preStart(): Unit = {
88
log.info(s"Actor started at ${self.path}")
9+
log.info("Thread name for actor: " + Thread.currentThread().getName)
910
}
1011

1112
override def postStop(): Unit = {
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,20 @@
11
hmda {
22
actor-flow-parallelism = 4
33
}
4+
5+
persistence-dispatcher {
6+
type = Dispatcher
7+
executor = "fork-join-executor"
8+
fork-join-executor {
9+
# Min number of threads to cap factor-based parallelism number to
10+
parallelism-min = 2
11+
# Parallelism (threads) ... ceil(available processors * factor)
12+
parallelism-factor = 2.0
13+
# Max number of threads to cap factor-based parallelism number to
14+
parallelism-max = 10
15+
}
16+
# Throughput defines the maximum number of messages to be
17+
# processed per actor before the thread jumps to the next actor.
18+
# Set to 1 for as fair as possible.
19+
throughput = 100
20+
}

persistence/src/main/scala/hmda/persistence/HmdaSupervisor.scala

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ object HmdaSupervisor {
1616
def props(): Props = Props(new HmdaSupervisor)
1717

1818
def createSupervisor(system: ActorSystem): ActorRef = {
19-
system.actorOf(HmdaSupervisor.props(), "supervisor")
19+
system.actorOf(HmdaSupervisor.props().withDispatcher("persistence-dispatcher"), "supervisor")
2020
}
2121
}
2222

@@ -54,47 +54,52 @@ class HmdaSupervisor extends HmdaSupervisorActor {
5454

5555
override def createActor(name: String): ActorRef = name match {
5656
case id @ SingleLarValidation.name =>
57-
val actor = context.actorOf(SingleLarValidation.props, id)
57+
val actor = context.actorOf(SingleLarValidation.props.withDispatcher("persistence-dispatcher"), id)
5858
supervise(actor, id)
5959
case id @ InstitutionPersistence.name =>
60-
val actor = context.actorOf(InstitutionPersistence.props, id)
60+
val actor = context.actorOf(InstitutionPersistence.props.withDispatcher("persistence-dispatcher"), id)
6161
supervise(actor, id)
6262

6363
}
6464

6565
private def createHmdaFiling(filingPeriod: String): ActorRef = {
66-
val actor = context.actorOf(HmdaFiling.props(filingPeriod), s"${HmdaFiling.name}-$filingPeriod")
66+
val actor = context.actorOf(
67+
HmdaFiling.props(filingPeriod).withDispatcher("persistence-dispatcher"),
68+
s"${HmdaFiling.name}-$filingPeriod"
69+
)
6770
supervise(actor, HmdaFiling.name)
6871
}
6972

7073
private def createFilings(name: String, id: String): ActorRef = {
7174
val filingsId = s"$name-$id"
72-
val actor = context.actorOf(FilingPersistence.props(id), filingsId)
75+
val actor = context.actorOf(FilingPersistence.props(id).withDispatcher("persistence-dispatcher"), filingsId)
7376
supervise(actor, filingsId)
7477
}
7578

7679
private def createSubmissions(name: String, institutionId: String, period: String): ActorRef = {
7780
val sId = s"$name-$institutionId-$period"
78-
val actor = context.actorOf(SubmissionPersistence.props(institutionId, period), sId)
81+
val actor = context.actorOf(SubmissionPersistence
82+
.props(institutionId, period)
83+
.withDispatcher("persistence-dispatcher"), sId)
7984
supervise(actor, sId)
8085
}
8186

8287
private def createProcessingActor(name: String, submissionId: SubmissionId): ActorRef = name match {
8388
case id @ HmdaRawFile.name =>
8489
val actorId = s"$id-${submissionId.toString}"
85-
val actor = context.actorOf(HmdaRawFile.props(submissionId), actorId)
90+
val actor = context.actorOf(HmdaRawFile.props(submissionId).withDispatcher("persistence-dispatcher"), actorId)
8691
supervise(actor, actorId)
8792
case id @ HmdaFileParser.name =>
8893
val actorId = s"$id-${submissionId.toString}"
89-
val actor = context.actorOf(HmdaFileParser.props(submissionId), actorId)
94+
val actor = context.actorOf(HmdaFileParser.props(submissionId).withDispatcher("persistence-dispatcher"), actorId)
9095
supervise(actor, actorId)
9196
case id @ HmdaFileValidator.name =>
9297
val actorId = s"$id-${submissionId.toString}"
93-
val actor = context.actorOf(HmdaFileValidator.props(submissionId), actorId)
98+
val actor = context.actorOf(HmdaFileValidator.props(submissionId).withDispatcher("persistence-dispatcher"), actorId)
9499
supervise(actor, actorId)
95100
case id @ SubmissionManager.name =>
96101
val actorId = s"$id-${submissionId.toString}"
97-
val actor = context.actorOf(SubmissionManager.props(submissionId), actorId)
102+
val actor = context.actorOf(SubmissionManager.props(submissionId).withDispatcher("persistence-dispatcher"), actorId)
98103
supervise(actor, actorId)
99104
}
100105

persistence/src/main/scala/hmda/persistence/institutions/FilingPersistence.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ object FilingPersistence {
1818
def props(institutionId: String): Props = Props(new FilingPersistence(institutionId))
1919

2020
def createFilings(institutionId: String, system: ActorSystem): ActorRef = {
21-
system.actorOf(FilingPersistence.props(institutionId))
21+
system.actorOf(FilingPersistence.props(institutionId).withDispatcher("persistence-dispatcher"))
2222
}
2323

2424
case class FilingState(filings: Seq[Filing] = Nil) {

persistence/src/main/scala/hmda/persistence/institutions/InstitutionPersistence.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ object InstitutionPersistence {
1818
def props: Props = Props(new InstitutionPersistence)
1919

2020
def createInstitutions(system: ActorSystem): ActorRef = {
21-
system.actorOf(InstitutionPersistence.props, "institutions")
21+
system.actorOf(InstitutionPersistence.props.withDispatcher("persistence-dispatcher"), "institutions")
2222
}
2323

2424
case class InstitutionPersistenceState(institutions: Set[Institution] = Set.empty[Institution]) {

persistence/src/main/scala/hmda/persistence/institutions/SubmissionPersistence.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ object SubmissionPersistence {
1919
def props(institutionId: String, period: String): Props = Props(new SubmissionPersistence(institutionId, period))
2020

2121
def createSubmissions(institutionId: String, period: String, system: ActorSystem): ActorRef = {
22-
system.actorOf(SubmissionPersistence.props(institutionId, period))
22+
system.actorOf(SubmissionPersistence.props(institutionId, period).withDispatcher("persistence-dispatcher"))
2323
}
2424

2525
case class SubmissionState(submissions: Seq[Submission] = Nil) {

0 commit comments

Comments
 (0)