@@ -8,14 +8,12 @@ import misk.clustering.DefaultCluster
88import misk.clustering.weights.ClusterWeightProvider
99import misk.tasks.RepeatedTaskQueue
1010import misk.tasks.Status
11- import misk.time.timed
1211import software.amazon.awssdk.enhanced.dynamodb.DynamoDbEnhancedClient
1312import software.amazon.awssdk.enhanced.dynamodb.Expression
1413import software.amazon.awssdk.enhanced.dynamodb.TableSchema
1514import software.amazon.awssdk.enhanced.dynamodb.internal.AttributeValues.numberValue
1615import software.amazon.awssdk.enhanced.dynamodb.model.ScanEnhancedRequest
1716import software.amazon.awssdk.services.dynamodb.DynamoDbClient
18- import wisp.logging.getLogger
1917import java.time.Clock
2018import java.time.Duration
2119
@@ -57,49 +55,44 @@ internal class DynamoClusterWatcherTask @Inject constructor(
5755 }
5856
5957 private fun updateOurselfInDynamo () {
60- val (duration) = timed {
61- val self = cluster.snapshot.self.name
62- val member = DyClusterMember ()
63- member.name = self
64- member.updated_at = clock.instant().toEpochMilli()
65- // TTL should be in seconds
66- member.expires_at = clock.instant().plus(Duration .ofDays(1 )).toEpochMilli() / 1000
67- podName?.let { member.pod_name = it }
68- table.putItem(member)
69- }
70-
71- logger.info { " Updated dynamodb with my information in ${duration.toMillis()} ms" }
58+ val self = cluster.snapshot.self.name
59+ val member = DyClusterMember ()
60+ member.name = self
61+ member.updated_at = clock.instant().toEpochMilli()
62+ // TTL should be in seconds
63+ member.expires_at = clock.instant().plus(Duration .ofDays(1 )).toEpochMilli() / 1000
64+ podName?.let { member.pod_name = it }
65+ table.putItem(member)
7266 }
7367
7468 internal fun recordCurrentDynamoCluster () {
75- val (duration) = timed {
76- val members = mutableSetOf<Member >()
77- val threshold = clock.instant().minusSeconds(dynamoClusterConfig.stale_threshold_seconds).toEpochMilli()
78- val request = ScanEnhancedRequest .builder()
79- .consistentRead(true )
80- .filterExpression(
81- Expression .builder()
82- .expression(" updated_at >= :threshold" )
83- .expressionValues(mapOf (" :threshold" to numberValue(threshold)))
84- .build()
85- )
86- .build()
87- for (page in table.scan(request).stream()) {
88- for (item in page.items()) {
89- members.add(Member (item.name!! , " invalid-ip" ))
90- }
69+ val members = mutableSetOf<Member >()
70+ val threshold =
71+ clock.instant().minusSeconds(dynamoClusterConfig.stale_threshold_seconds).toEpochMilli()
72+ val request = ScanEnhancedRequest .builder()
73+ .consistentRead(true )
74+ .filterExpression(
75+ Expression .builder()
76+ .expression(" updated_at >= :threshold" )
77+ .expressionValues(mapOf (" :threshold" to numberValue(threshold)))
78+ .build()
79+ )
80+ .build()
81+ for (page in table.scan(request).stream()) {
82+ for (item in page.items()) {
83+ members.add(Member (item.name!! , " invalid-ip" ))
9184 }
92- cluster.clusterChanged(membersBecomingReady = members, membersBecomingNotReady = prevMembers - members)
93- prevMembers = members
9485 }
95-
96- logger.info { " Updated cluster information from dynamodb in ${duration.toMillis()} ms" }
86+ cluster.clusterChanged(
87+ membersBecomingReady = members,
88+ membersBecomingNotReady = prevMembers - members
89+ )
90+ prevMembers = members
9791 }
9892
9993 override fun shutDown () {}
10094
10195 companion object {
10296 internal val TABLE_SCHEMA = TableSchema .fromClass(DyClusterMember ::class .java)
103- private val logger = getLogger<DynamoClusterWatcherTask >()
10497 }
10598}
0 commit comments