Skip to content

Commit 67ac0ed

Browse files
committed
Revert "Revert "Improve concurrency (#62)""
This reverts commit 9383602.
1 parent b74e2b0 commit 67ac0ed

File tree

2 files changed

+33
-19
lines changed

2 files changed

+33
-19
lines changed

src/main/kotlin/app/GhService.kt

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ import org.eclipse.egit.github.core.service.RepositoryService
77
import org.eclipse.egit.github.core.service.UserService
88
import org.eclipse.egit.github.core.service.WatcherService
99
import org.slf4j.LoggerFactory
10-
import java.util.*
10+
import java.io.IOException
11+
import java.util.concurrent.ConcurrentHashMap
12+
import java.util.concurrent.Executors
1113
import java.util.concurrent.TimeUnit
1214

1315
object GhService {
@@ -16,8 +18,11 @@ object GhService {
1618

1719
private val log = LoggerFactory.getLogger(GhService.javaClass)
1820

21+
// Allows for parallel iteration and O(1) put/remove
22+
private val clientSessions = ConcurrentHashMap<WsSession, Boolean>()
23+
1924
private val tokens = Config.getApiTokens()?.split(",") ?: listOf("") // empty token is limited to 60 requests
20-
private val clients = tokens.map { token -> GitHubClient().apply { setOAuth2Token(token) } }
25+
private val clients = tokens.map { GitHubClient().apply { setOAuth2Token(it) } }
2126
private val repoServices = clients.map { RepositoryService(it) }
2227
private val commitServices = clients.map { CommitService(it) }
2328
private val userServices = clients.map { UserService(it) }
@@ -30,9 +35,15 @@ object GhService {
3035

3136
val remainingRequests: Int get() = clients.sumBy { it.remainingRequests }
3237

33-
init { // create timer to ping clients every other minute to make sure remainingRequests is correct
34-
Timer().scheduleAtFixedRate(object : TimerTask() {
35-
override fun run() {
38+
fun registerClient(ws: WsSession) = clientSessions.put(ws, true) == true
39+
40+
fun unregisterClient(ws: WsSession) = clientSessions.remove(ws) == true
41+
42+
init {
43+
Executors.newScheduledThreadPool(2).apply {
44+
45+
// ping clients every other minute to make sure remainingRequests is correct
46+
scheduleAtFixedRate({
3647
repoServices.forEach {
3748
try {
3849
it.getRepository("tipsy", "github-profile-summary")
@@ -41,18 +52,22 @@ object GhService {
4152
log.info("Pinged client ${clients.indexOf(it.client)} - was rate-limited")
4253
}
4354
}
44-
}
45-
}, 0, TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES))
46-
}
55+
}, 0, 2, TimeUnit.MINUTES)
56+
57+
// update all connected clients with remainingRequests twice per second
58+
scheduleAtFixedRate({
59+
val payload = remainingRequests.toString()
60+
clientSessions.forEachKey(1) {
61+
try {
62+
if (it.isOpen)
63+
it.send(payload)
64+
} catch (e: IOException) {
65+
log?.error(e.toString())
66+
}
67+
}
68+
}, 0, 500, TimeUnit.MILLISECONDS)
4769

48-
fun broadcastRemainingRequests(session: WsSession) = object : TimerTask() {
49-
override fun run() {
50-
if (session.isOpen) {
51-
return session.send(GhService.remainingRequests.toString())
52-
}
53-
this.cancel()
5470
}
5571
}
5672

5773
}
58-

src/main/kotlin/app/Main.kt

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import org.eclipse.jetty.server.Server
1212
import org.eclipse.jetty.server.ServerConnector
1313
import org.eclipse.jetty.util.thread.QueuedThreadPool
1414
import org.slf4j.LoggerFactory
15-
import java.util.*
1615

1716
fun main(args: Array<String>) {
1817

@@ -62,9 +61,9 @@ fun main(args: Array<String>) {
6261
}
6362

6463
ws("/rate-limit-status") { ws ->
65-
ws.onConnect { session ->
66-
Timer().scheduleAtFixedRate(GhService.broadcastRemainingRequests(session), 0, 1000)
67-
}
64+
ws.onConnect { GhService.registerClient(it) }
65+
ws.onClose { session, _, _ -> GhService.unregisterClient(session) }
66+
ws.onError { session, _ -> GhService.unregisterClient(session) }
6867
}
6968

7069
}

0 commit comments

Comments
 (0)