@@ -47,58 +47,63 @@ internal class KafkaSubsumsjonBruktDataConsumer(
47
47
launch(coroutineContext) {
48
48
logger.info { " Starting ${config.application.id} " }
49
49
50
- KafkaConsumer <String , String >(
51
- consumerConfig(
52
- groupId = config.application.id,
53
- bootstrapServerUrl = config.application.brokers,
54
- credential = config.application.credential,
55
- ).also {
56
- it[ConsumerConfig .AUTO_OFFSET_RESET_CONFIG ] = " earliest"
57
- it[ConsumerConfig .ENABLE_AUTO_COMMIT_CONFIG ] = " false"
58
- it[ConsumerConfig .MAX_POLL_RECORDS_CONFIG ] = 10
59
- },
60
- ).use { consumer ->
61
- try {
62
- consumer.subscribe(listOf (config.inntektBruktDataTopic))
63
- while (job.isActive) {
64
- val records = consumer.poll(Duration .ofMillis(100 ))
65
- val ids =
66
- records
67
- .asSequence()
68
- .map { record -> record.value() }
69
- .map { jacksonObjectMapper.readTree(it) }
70
- .filter { packet ->
71
- packet.has(" @event_name" ) &&
72
- packet.has(" aktorId" ) &&
73
- packet.has(" inntektsId" ) &&
74
- packet.has(" kontekst" ) &&
75
- packet.get(" @event_name" ).asText() == " brukt_inntekt"
76
- }.map { packet -> InntektId (packet.get(" inntektsId" ).asText()) }
77
- .toList()
50
+ try {
51
+ KafkaConsumer <String , String >(
52
+ consumerConfig(
53
+ groupId = config.application.id,
54
+ bootstrapServerUrl = config.application.brokers,
55
+ credential = config.application.credential,
56
+ ).also {
57
+ it[ConsumerConfig .AUTO_OFFSET_RESET_CONFIG ] = " earliest"
58
+ it[ConsumerConfig .ENABLE_AUTO_COMMIT_CONFIG ] = " false"
59
+ it[ConsumerConfig .MAX_POLL_RECORDS_CONFIG ] = 10
60
+ },
61
+ ).use { consumer ->
62
+ try {
63
+ consumer.subscribe(listOf (config.inntektBruktDataTopic))
64
+ while (job.isActive) {
65
+ val records = consumer.poll(Duration .ofMillis(100 ))
66
+ val ids =
67
+ records
68
+ .asSequence()
69
+ .map { record -> record.value() }
70
+ .map { jacksonObjectMapper.readTree(it) }
71
+ .filter { packet ->
72
+ packet.has(" @event_name" ) &&
73
+ packet.has(" aktorId" ) &&
74
+ packet.has(" inntektsId" ) &&
75
+ packet.has(" kontekst" ) &&
76
+ packet.get(" @event_name" ).asText() == " brukt_inntekt"
77
+ }.map { packet -> InntektId (packet.get(" inntektsId" ).asText()) }
78
+ .toList()
78
79
79
- try {
80
- ids.forEach { id ->
81
- if (inntektStore.markerInntektBrukt(id) == 1 ) {
82
- logger.info(" Marked inntekt with id $id as used" )
80
+ try {
81
+ ids.forEach { id ->
82
+ if (inntektStore.markerInntektBrukt(id) == 1 ) {
83
+ logger.info(" Marked inntekt with id $id as used" )
84
+ }
83
85
}
86
+ if (ids.isNotEmpty()) {
87
+ consumer.commitSync()
88
+ }
89
+ } catch (e: CommitFailedException ) {
90
+ logger.warn(" Kafka threw a commit fail exception, looping back" , e)
84
91
}
85
- if (ids.isNotEmpty()) {
86
- consumer.commitSync()
87
- }
88
- } catch (e: CommitFailedException ) {
89
- logger.warn(" Kafka threw a commit fail exception, looping back" , e)
90
92
}
91
- }
92
- } catch (e: Exception ) {
93
- logger.error(
94
- """
93
+ } catch (e: Exception ) {
94
+ logger.error(
95
+ """
95
96
"Unexpected exception while consuming messages.
96
97
Stopping consumer, grace period ${grace.duration.seconds / 60 } minutes"
97
98
""" .trimIndent(),
98
- e,
99
- )
100
- stop()
99
+ e,
100
+ )
101
+ stop()
102
+ }
101
103
}
104
+ } catch (e: Exception ) {
105
+ logger.error(" Fikk en feil ved oppsett av KafkaConsumer" , e)
106
+ stop()
102
107
}
103
108
}
104
109
}
0 commit comments