Skip to content

Commit c4bf70e

Browse files
vskjefstJMLindseth
andcommitted
Sett initial delay på liveness- og readiness-probes
Appen starter ikke opp i dev, vi håper at det er på grunn av en K8S probe som ikke får svar, og sender shutdown Co-authored-by: John Martin Lindseth <[email protected]>
1 parent e6e130a commit c4bf70e

File tree

1 file changed

+13
-10
lines changed

1 file changed

+13
-10
lines changed

dp-inntekt-api/src/main/kotlin/no/nav/dagpenger/inntekt/subsumsjonbrukt/KafkaSubsumsjonBruktDataConsumer.kt

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ internal class KafkaSubsumsjonBruktDataConsumer(
2929
private val config: InntektApiConfig,
3030
private val inntektStore: InntektStore,
3131
private val graceDuration: Duration = Duration.ofHours(3),
32-
) : CoroutineScope, HealthCheck {
32+
) : CoroutineScope,
33+
HealthCheck {
3334
private val logger = KotlinLogging.logger { }
3435
override val coroutineContext: CoroutineContext
3536
get() = Dispatchers.IO + job
@@ -62,7 +63,8 @@ internal class KafkaSubsumsjonBruktDataConsumer(
6263
while (job.isActive) {
6364
val records = consumer.poll(Duration.ofMillis(100))
6465
val ids =
65-
records.asSequence()
66+
records
67+
.asSequence()
6668
.map { record -> record.value() }
6769
.map { jacksonObjectMapper.readTree(it) }
6870
.filter { packet ->
@@ -71,8 +73,7 @@ internal class KafkaSubsumsjonBruktDataConsumer(
7173
packet.has("inntektsId") &&
7274
packet.has("kontekst") &&
7375
packet.get("@event_name").asText() == "brukt_inntekt"
74-
}
75-
.map { packet -> InntektId(packet.get("inntektsId").asText()) }
76+
}.map { packet -> InntektId(packet.get("inntektsId").asText()) }
7677
.toList()
7778

7879
try {
@@ -104,11 +105,15 @@ internal class KafkaSubsumsjonBruktDataConsumer(
104105

105106
override fun status(): HealthStatus {
106107
return if (job.isActive) {
108+
logger.info("Kafka job.isActive=${job.isActive}")
107109
HealthStatus.UP
108110
} else {
111+
logger.info("Kafka job.isActive=${job.isActive}")
109112
return if (grace.expired()) {
113+
logger.info("Kafka grace.expired()=${grace.expired()}")
110114
HealthStatus.DOWN
111115
} else {
116+
logger.info("Kafka grace.expired()=${grace.expired()}")
112117
HealthStatus.UP
113118
}
114119
}
@@ -138,8 +143,8 @@ internal class KafkaSubsumsjonBruktDataConsumer(
138143
internal fun commonConfig(
139144
bootstrapServers: String,
140145
credential: Config.KafkaAivenCredentials? = null,
141-
): Properties {
142-
return Properties().apply {
146+
): Properties =
147+
Properties().apply {
143148
put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
144149
credential?.let { creds ->
145150
putAll(
@@ -159,19 +164,17 @@ internal class KafkaSubsumsjonBruktDataConsumer(
159164
)
160165
}
161166
}
162-
}
163167
}
164168

165169
private fun consumerConfig(
166170
groupId: String,
167171
bootstrapServerUrl: String,
168172
credential: Config.KafkaAivenCredentials? = null,
169173
properties: Properties = defaultConsumerConfig,
170-
): Properties {
171-
return Properties().apply {
174+
): Properties =
175+
Properties().apply {
172176
putAll(properties)
173177
putAll(commonConfig(bootstrapServerUrl, credential))
174178
put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
175179
}
176-
}
177180
}

0 commit comments

Comments
 (0)