Skip to content

Commit ea76d98

Browse files
geiralundkmriiseAndreas Nygård Bergh
authored
Feature/aiven (#116)
* gått over til aiven topic, og nye parametre på packet navikt/dagpenger#692 navikt/dagpenger#723 Co-authored-by: Andreas Nygård Bergh <[email protected]> Co-authored-by: Geir A. Lund <[email protected]> * Oppdatere dagpenger-streams * Lytte på aiven bruktinntekt.v1 topic for å markere inntekt som brukt navikt/dagpenger#692 navikt/dagpenger#723 Co-authored-by: Andreas Nygård Bergh <[email protected]> Co-authored-by: Knut Magne Riise <[email protected]> Co-authored-by: Kriise <[email protected]> Co-authored-by: Andreas Nygård Bergh <[email protected]>
1 parent fbf39f3 commit ea76d98

File tree

8 files changed

+56
-66
lines changed

8 files changed

+56
-66
lines changed

buildSrc/src/main/kotlin/Constants.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ object Dagpenger {
4545
}
4646
}
4747

48-
const val Streams = "com.github.navikt:dagpenger-streams:2021.03.22-19.09.82a577cfa379"
48+
const val Streams = "com.github.navikt:dagpenger-streams:2021.03.23-12.06.6e6e22acb0ab"
4949
const val Events = "com.github.navikt:dagpenger-events:2021.02.19-08.31.cfd52901bc9f"
5050
}
5151

dp-inntekt-api/src/main/kotlin/no/nav/dagpenger/inntekt/Configuration.kt

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import com.natpryce.konfig.intType
88
import com.natpryce.konfig.overriding
99
import com.natpryce.konfig.stringType
1010
import no.finn.unleash.util.UnleashConfig
11-
import no.nav.dagpenger.streams.KafkaCredential
11+
import no.nav.dagpenger.streams.KafkaAivenCredentials
1212
import java.net.InetAddress
1313
import java.net.UnknownHostException
1414

@@ -20,6 +20,7 @@ private val localProperties = ConfigurationMap(
2020
"database.user" to "postgres",
2121
"database.password" to "postgres",
2222
"vault.mountpath" to "postgresql/dev/",
23+
"KAFKA_BROKERS" to "localhost:9092",
2324
"application.profile" to "LOCAL",
2425
"application.httpPort" to "8099",
2526
"hentinntektliste.url" to "https://localhost/inntektskomponenten-ws/rs/api/v1/hentinntektliste",
@@ -32,8 +33,7 @@ private val localProperties = ConfigurationMap(
3233
"flyway.locations" to "db/migration,db/testdata",
3334
"api.secret" to "secret",
3435
"api.keys" to "dp-datalaster-inntekt",
35-
"kafka.subsumsjon.brukt.data.topic" to "privat-dagpenger-subsumsjon-brukt-data",
36-
"kafka.bootstrap.servers" to "localhost:9092",
36+
"kafka.inntekt.brukt.topic" to "teamdagpenger.inntektbrukt.v1",
3737
"unleash.url" to "http://localhost/api/",
3838
"pdl.url" to "http://localhost:4321",
3939
"enhetsregisteret.url" to "https://data.brreg.no/enhetsregisteret"
@@ -53,8 +53,7 @@ private val devProperties = ConfigurationMap(
5353
"jwks.issuer" to "https://isso-q.adeo.no:443/isso/oauth2",
5454
"application.profile" to "DEV",
5555
"application.httpPort" to "8099",
56-
"kafka.subsumsjon.brukt.data.topic" to "privat-dagpenger-subsumsjon-brukt-data",
57-
"kafka.bootstrap.servers" to "b27apvl00045.preprod.local:8443,b27apvl00046.preprod.local:8443,b27apvl00047.preprod.local:8443",
56+
"kafka.inntekt.brukt.topic" to "teamdagpenger.inntektbrukt.v1",
5857
"unleash.url" to "https://unleash.nais.io/api/",
5958
"pdl.url" to "http://pdl-api.default.svc.nais.local/graphql",
6059
"enhetsregisteret.url" to "https://data.brreg.no/enhetsregisteret"
@@ -74,8 +73,7 @@ private val prodProperties = ConfigurationMap(
7473
"jwks.issuer" to "https://isso.adeo.no:443/isso/oauth2",
7574
"application.profile" to "PROD",
7675
"application.httpPort" to "8099",
77-
"kafka.subsumsjon.brukt.data.topic" to "privat-dagpenger-subsumsjon-brukt-data",
78-
"kafka.bootstrap.servers" to "a01apvl00145.adeo.no:8443,a01apvl00146.adeo.no:8443,a01apvl00147.adeo.no:8443,a01apvl00148.adeo.no:8443,a01apvl00149.adeo.no:8443,a01apvl00150.adeo.no:8443",
76+
"kafka.inntekt.brukt.topic" to "teamdagpenger.inntektbrukt.v1",
7977
"unleash.url" to "https://unleash.nais.io/api/",
8078
"pdl.url" to "http://pdl-api.default.svc.nais.local/graphql",
8179
"enhetsregisteret.url" to "https://data.brreg.no/enhetsregisteret"
@@ -86,10 +84,9 @@ data class Configuration(
8684
val database: Database = Database(),
8785
val vault: Vault = Vault(),
8886
val application: Application = Application(),
89-
val kafka: Kafka = Kafka(),
9087
val pdl: Pdl = Pdl(),
9188
val enhetsregisteretUrl: Enhetsregister = Enhetsregister(),
92-
val subsumsjonBruktDataTopic: String = config()[Key("kafka.subsumsjon.brukt.data.topic", stringType)]
89+
val inntektBruktDataTopic: String = config()[Key("kafka.inntekt.brukt.topic", stringType)]
9390
) {
9491

9592
data class Database(
@@ -107,21 +104,11 @@ data class Configuration(
107104
val mountPath: String = config()[Key("vault.mountpath", stringType)]
108105
)
109106

110-
data class Kafka(
111-
val brokers: String = config()[Key("kafka.bootstrap.servers", stringType)],
112-
val user: String? = config().getOrNull(Key("srvdp.inntekt.api.username", stringType)),
113-
val password: String? = config().getOrNull(Key("srvdp.inntekt.api.password", stringType))
114-
) {
115-
fun credential(): KafkaCredential? {
116-
return if (user != null && password != null) {
117-
KafkaCredential(user, password)
118-
} else null
119-
}
120-
}
121-
122107
data class Application(
123108
val id: String = config().getOrElse(Key("application.id", stringType), "dp-inntekt-api-consumer"),
109+
val brokers: String = config()[Key("KAFKA_BROKERS", stringType)],
124110
val profile: Profile = config()[Key("application.profile", stringType)].let { Profile.valueOf(it) },
111+
val credential: KafkaAivenCredentials? = if (profile == Profile.LOCAL) null else KafkaAivenCredentials(),
125112
val httpPort: Int = config()[Key("application.httpPort", intType)],
126113
val username: String = config()[Key("srvdp.inntekt.api.username", stringType)],
127114
val password: String = config()[Key("srvdp.inntekt.api.password", stringType)],

dp-inntekt-api/src/main/kotlin/no/nav/dagpenger/inntekt/subsumsjonbrukt/KafkaSubsumsjonBruktDataConsumer.kt

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import no.nav.dagpenger.inntekt.HealthStatus
1313
import no.nav.dagpenger.inntekt.db.InntektId
1414
import no.nav.dagpenger.inntekt.db.InntektStore
1515
import no.nav.dagpenger.plain.consumerConfig
16-
import no.nav.dagpenger.streams.KafkaCredential
1716
import no.nav.dagpenger.streams.PacketDeserializer
1817
import org.apache.kafka.clients.consumer.CommitFailedException
1918
import org.apache.kafka.clients.consumer.ConsumerConfig
@@ -43,18 +42,14 @@ internal class KafkaSubsumsjonBruktDataConsumer(
4342

4443
fun listen() {
4544
launch(coroutineContext) {
46-
val creds = config.kafka.user?.let { u ->
47-
config.kafka.password?.let { p ->
48-
KafkaCredential(username = u, password = p)
49-
}
50-
}
45+
5146
logger.info { "Starting ${config.application.id}" }
5247

5348
KafkaConsumer<String, Packet>(
5449
consumerConfig(
5550
groupId = config.application.id,
56-
bootstrapServerUrl = config.kafka.brokers,
57-
credential = creds
51+
bootstrapServerUrl = config.application.brokers,
52+
credential = config.application.credential
5853
).also {
5954
it[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = PacketDeserializer::class.java
6055
it[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
@@ -63,16 +58,21 @@ internal class KafkaSubsumsjonBruktDataConsumer(
6358
}
6459
).use { consumer ->
6560
try {
66-
consumer.subscribe(listOf(config.subsumsjonBruktDataTopic))
61+
consumer.subscribe(listOf(config.inntektBruktDataTopic))
6762
while (job.isActive) {
6863
val records = consumer.poll(Duration.ofMillis(100))
6964
val ids = records.asSequence()
7065
.map { record -> record.value() }
71-
.filter { packet -> packet.hasField("faktum") }
72-
.map { packet -> packet.getMapValue("faktum") }
73-
.onEach { faktum -> if (faktum["inntektsId"] == null) { logger.info { "Subsumsjon do not contain inntekts id. Is it manuelt grunnlag? ${faktum["manueltGrunnlag"] != null}" } } }
74-
.filter { faktum -> faktum["inntektsId"] != null }
75-
.map { faktum -> InntektId(faktum["inntektsId"] as String) }
66+
.filter { packet ->
67+
packet.hasFields(
68+
"@event_name",
69+
"aktorId",
70+
"inntektsId",
71+
"kontekst"
72+
) &&
73+
packet.getStringValue("@event_name") == "brukt_inntekt"
74+
}
75+
.map { packet -> InntektId(packet.getStringValue("inntektsId")) }
7676
.toList()
7777

7878
try {

dp-inntekt-api/src/test/kotlin/no/nav/dagpenger/inntekt/ConfigurationTest.kt

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,6 @@ import org.junit.jupiter.api.Test
44

55
class ConfigurationTest {
66

7-
@Test
8-
fun `Configuration is loaded based on application profile`() {
9-
10-
withProps(dummyConfigs + mapOf("NAIS_CLUSTER_NAME" to "dev-fss")) {
11-
with(Configuration()) {
12-
kotlin.test.assertEquals(Profile.DEV, this.application.profile)
13-
}
14-
}
15-
16-
withProps(dummyConfigs + mapOf("NAIS_CLUSTER_NAME" to "prod-fss")) {
17-
with(Configuration()) {
18-
kotlin.test.assertEquals(Profile.PROD, this.application.profile)
19-
}
20-
}
21-
}
22-
237
@Test
248
fun `Default configuration is LOCAL `() {
259
with(Configuration()) {

dp-inntekt-api/src/test/kotlin/no/nav/dagpenger/inntekt/DummyConfigs.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ private val mockedConfigs = listOf(
1010
"api.secret",
1111
"api.keys",
1212
"jwks.url",
13-
"jwks.issuer"
13+
"jwks.issuer",
14+
"KAFKA_BROKERS"
1415
)
1516

1617
val dummyConfigs = mockedConfigs.associateWith { "test" }

dp-inntekt-api/src/test/kotlin/no/nav/dagpenger/inntekt/subsumsjonbrukt/KafkaSubsumsjonBruktDataConsumerTest.kt

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,25 @@ internal class KafkaSubsumsjonBruktDataConsumerTest {
3333
KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka").withTag("5.3.1")).apply { this.start() }
3434
}
3535
}
36+
val inntektId = InntektId(ULID().nextULID())
37+
38+
private val bruktInntektMelding = mapOf(
39+
"@event_name" to "brukt_inntekt",
40+
"inntektsId" to inntektId.id,
41+
"aktorId" to "12345678910",
42+
"kontekst" to mapOf(
43+
"id" to "2",
44+
"type" to "vedtak"
45+
)
46+
)
47+
private val bruktInntektMeldingManueltGrunnlag = mapOf(
48+
"@event_name" to "brukt_inntekt",
49+
"aktorId" to "12345678910",
50+
"kontekst" to mapOf(
51+
"id" to "1",
52+
"type" to "vedtak"
53+
)
54+
)
3655

3756
private val producer by lazy {
3857
KafkaProducer<String, String>(
@@ -49,22 +68,19 @@ internal class KafkaSubsumsjonBruktDataConsumerTest {
4968
@Test
5069
fun `Should mark inntekt id as used`() {
5170
runBlocking {
52-
val inntektId = InntektId(ULID().nextULID())
5371
val storeMock = mockk<InntektStore>(relaxed = false)
5472
coEvery { storeMock.markerInntektBrukt(inntektId) } returns 1
5573
val config = Configuration().run {
56-
copy(kafka = kafka.copy(brokers = Kafka.instance.bootstrapServers, user = null, password = null))
74+
copy(application = application.copy(brokers = Kafka.instance.bootstrapServers, credential = null))
5775
}
5876

5977
val consumer = KafkaSubsumsjonBruktDataConsumer(config, storeMock).apply {
6078
listen()
6179
}
6280

63-
val bruktSubsumsjonData = mapOf("faktum" to mapOf("inntektsId" to inntektId.id))
64-
65-
val metaData = producer.send(ProducerRecord(config.subsumsjonBruktDataTopic, "test", adapter.toJson(bruktSubsumsjonData)))
81+
val metaData = producer.send(ProducerRecord(config.inntektBruktDataTopic, "test", adapter.toJson(bruktInntektMelding)))
6682
.get(5, TimeUnit.SECONDS)
67-
LOGGER.info("Producer produced $bruktSubsumsjonData with meta $metaData")
83+
LOGGER.info("Producer produced $bruktInntektMelding with meta $metaData")
6884

6985
TimeUnit.MILLISECONDS.sleep(500)
7086

@@ -81,7 +97,7 @@ internal class KafkaSubsumsjonBruktDataConsumerTest {
8197
runBlocking {
8298
val storeMock = mockk<InntektStore>(relaxed = false)
8399
val config = Configuration().run {
84-
copy(kafka = kafka.copy(brokers = Kafka.instance.bootstrapServers, user = null, password = null))
100+
copy(application = application.copy(brokers = Kafka.instance.bootstrapServers, credential = null))
85101
}
86102

87103
val consumer = KafkaSubsumsjonBruktDataConsumer(config, storeMock).apply {
@@ -90,7 +106,7 @@ internal class KafkaSubsumsjonBruktDataConsumerTest {
90106

91107
val bruktSubsumsjonData = mapOf("faktum" to mapOf("manueltGrunnlag" to "122212"))
92108

93-
val metaData = producer.send(ProducerRecord(config.subsumsjonBruktDataTopic, "test", adapter.toJson(bruktSubsumsjonData)))
109+
val metaData = producer.send(ProducerRecord(config.inntektBruktDataTopic, "test", adapter.toJson(bruktInntektMeldingManueltGrunnlag)))
94110
.get(5, TimeUnit.SECONDS)
95111
LOGGER.info("Producer produced $bruktSubsumsjonData with meta $metaData")
96112

@@ -112,18 +128,16 @@ internal class KafkaSubsumsjonBruktDataConsumerTest {
112128
val storeMock = mockk<InntektStore>(relaxed = false)
113129
coEvery { storeMock.markerInntektBrukt(inntektId) } throws SQLTransientConnectionException("BLÆ")
114130
val config = Configuration().run {
115-
copy(kafka = kafka.copy(brokers = Kafka.instance.bootstrapServers, user = null, password = null))
131+
copy(application = application.copy(brokers = Kafka.instance.bootstrapServers, credential = null))
116132
}
117133

118134
val consumer = KafkaSubsumsjonBruktDataConsumer(config = config, inntektStore = storeMock, graceDuration = Duration.ofMillis(1)).apply {
119135
listen()
120136
}
121137

122-
val bruktSubsumsjonData = mapOf("faktum" to mapOf("inntektsId" to inntektId.id))
123-
124-
val metaData = producer.send(ProducerRecord(config.subsumsjonBruktDataTopic, "test", adapter.toJson(bruktSubsumsjonData)))
138+
val metaData = producer.send(ProducerRecord(config.inntektBruktDataTopic, "test", adapter.toJson(bruktInntektMelding)))
125139
.get(5, TimeUnit.SECONDS)
126-
LOGGER.info("Producer produced $bruktSubsumsjonData with meta $metaData + should fail")
140+
LOGGER.info("Producer produced $bruktInntektMelding with meta $metaData + should fail")
127141

128142
TimeUnit.MILLISECONDS.sleep(1500)
129143

nais/dev/nais.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ spec:
2727
requests:
2828
cpu: 100m
2929
memory: 256Mi
30+
kafka:
31+
pool: nav-dev
3032
vault:
3133
enabled: true
3234
paths:

nais/prod/nais.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ spec:
2929
memory: 512Mi
3030
secureLogs:
3131
enabled: true
32+
kafka:
33+
pool: nav-prod
3234
vault:
3335
enabled: true
3436
paths:

0 commit comments

Comments
 (0)