@@ -6,20 +6,23 @@ import kotlinx.coroutines.Job
6
6
import kotlinx.coroutines.SupervisorJob
7
7
import kotlinx.coroutines.launch
8
8
import mu.KotlinLogging
9
- import no.nav.dagpenger.events.Packet
9
+ import no.nav.dagpenger.inntekt.Config
10
10
import no.nav.dagpenger.inntekt.HealthCheck
11
11
import no.nav.dagpenger.inntekt.HealthStatus
12
12
import no.nav.dagpenger.inntekt.InntektApiConfig
13
13
import no.nav.dagpenger.inntekt.db.InntektId
14
14
import no.nav.dagpenger.inntekt.db.InntektStore
15
- import no.nav.dagpenger.plain.consumerConfig
16
- import no.nav.dagpenger.streams.PacketDeserializer
15
+ import no.nav.dagpenger.inntekt.serder.jacksonObjectMapper
16
+ import org.apache.kafka.clients.CommonClientConfigs
17
17
import org.apache.kafka.clients.consumer.CommitFailedException
18
18
import org.apache.kafka.clients.consumer.ConsumerConfig
19
19
import org.apache.kafka.clients.consumer.KafkaConsumer
20
+ import org.apache.kafka.common.config.SslConfigs
21
+ import org.apache.kafka.common.serialization.StringDeserializer
20
22
import java.time.Duration
21
23
import java.time.ZoneOffset
22
24
import java.time.ZonedDateTime
25
+ import java.util.Properties
23
26
import kotlin.coroutines.CoroutineContext
24
27
25
28
internal class KafkaSubsumsjonBruktDataConsumer (
@@ -43,13 +46,12 @@ internal class KafkaSubsumsjonBruktDataConsumer(
43
46
launch(coroutineContext) {
44
47
logger.info { " Starting ${config.application.id} " }
45
48
46
- KafkaConsumer <String , Packet >(
49
+ KafkaConsumer <String , String >(
47
50
consumerConfig(
48
51
groupId = config.application.id,
49
52
bootstrapServerUrl = config.application.brokers,
50
53
credential = config.application.credential,
51
54
).also {
52
- it[ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG ] = PacketDeserializer ::class .java
53
55
it[ConsumerConfig .AUTO_OFFSET_RESET_CONFIG ] = " earliest"
54
56
it[ConsumerConfig .ENABLE_AUTO_COMMIT_CONFIG ] = " false"
55
57
it[ConsumerConfig .MAX_POLL_RECORDS_CONFIG ] = 10
@@ -62,16 +64,15 @@ internal class KafkaSubsumsjonBruktDataConsumer(
62
64
val ids =
63
65
records.asSequence()
64
66
.map { record -> record.value() }
67
+ .map { jacksonObjectMapper.readTree(it) }
65
68
.filter { packet ->
66
- packet.hasFields(
67
- " @event_name" ,
68
- " aktorId" ,
69
- " inntektsId" ,
70
- " kontekst" ,
71
- ) &&
72
- packet.getStringValue(" @event_name" ) == " brukt_inntekt"
69
+ packet.has(" @event_name" ) &&
70
+ packet.has(" aktorId" ) &&
71
+ packet.has(" inntektsId" ) &&
72
+ packet.has(" kontekst" ) &&
73
+ packet.get(" @event_name" ).asText() == " brukt_inntekt"
73
74
}
74
- .map { packet -> InntektId (packet.getStringValue (" inntektsId" )) }
75
+ .map { packet -> InntektId (packet.get (" inntektsId" ).asText( )) }
75
76
.toList()
76
77
77
78
try {
@@ -126,4 +127,51 @@ internal class KafkaSubsumsjonBruktDataConsumer(
126
127
127
128
fun expired () = ZonedDateTime .now(ZoneOffset .UTC ).isAfter(expires)
128
129
}
130
+
131
+ companion object {
132
+ private val defaultConsumerConfig =
133
+ Properties ().apply {
134
+ put(ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG , StringDeserializer ::class .java.name)
135
+ put(ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG , StringDeserializer ::class .java.name)
136
+ }
137
+
138
+ internal fun commonConfig (
139
+ bootstrapServers : String ,
140
+ credential : Config .KafkaAivenCredentials ? = null,
141
+ ): Properties {
142
+ return Properties ().apply {
143
+ put(CommonClientConfigs .BOOTSTRAP_SERVERS_CONFIG , bootstrapServers)
144
+ credential?.let { creds ->
145
+ putAll(
146
+ Properties ().apply {
147
+ put(CommonClientConfigs .SECURITY_PROTOCOL_CONFIG , creds.securityProtocolConfig)
148
+ put(
149
+ SslConfigs .SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG ,
150
+ creds.sslEndpointIdentificationAlgorithmConfig,
151
+ )
152
+ put(SslConfigs .SSL_TRUSTSTORE_TYPE_CONFIG , creds.sslTruststoreTypeConfig)
153
+ put(SslConfigs .SSL_KEYSTORE_TYPE_CONFIG , creds.sslKeystoreTypeConfig)
154
+ put(SslConfigs .SSL_TRUSTSTORE_LOCATION_CONFIG , creds.sslTruststoreLocationConfig)
155
+ put(SslConfigs .SSL_TRUSTSTORE_PASSWORD_CONFIG , creds.sslTruststorePasswordConfig)
156
+ put(SslConfigs .SSL_KEYSTORE_LOCATION_CONFIG , creds.sslKeystoreLocationConfig)
157
+ put(SslConfigs .SSL_KEYSTORE_PASSWORD_CONFIG , creds.sslKeystorePasswordConfig)
158
+ },
159
+ )
160
+ }
161
+ }
162
+ }
163
+ }
164
+
165
+ private fun consumerConfig (
166
+ groupId : String ,
167
+ bootstrapServerUrl : String ,
168
+ credential : Config .KafkaAivenCredentials ? = null,
169
+ properties : Properties = defaultConsumerConfig,
170
+ ): Properties {
171
+ return Properties ().apply {
172
+ putAll(properties)
173
+ putAll(commonConfig(bootstrapServerUrl, credential))
174
+ put(ConsumerConfig .GROUP_ID_CONFIG , groupId)
175
+ }
176
+ }
129
177
}
0 commit comments