@@ -20,6 +20,7 @@ import org.apache.kafka.clients.producer.ProducerRecord
20
20
import org.apache.kafka.common.serialization.StringSerializer
21
21
import org.junit.jupiter.api.Test
22
22
import org.testcontainers.containers.KafkaContainer
23
+ import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy
23
24
import org.testcontainers.utility.DockerImageName
24
25
import java.sql.SQLTransientConnectionException
25
26
import java.time.Duration
@@ -28,14 +29,18 @@ import java.time.ZonedDateTime
28
29
import java.util.concurrent.TimeUnit
29
30
30
31
private val LOGGER = KotlinLogging .logger { }
32
+
31
33
internal class KafkaSubsumsjonBruktDataConsumerTest {
32
34
private object Kafka {
33
35
val instance by lazy {
34
- KafkaContainer (DockerImageName .parse(" confluentinc/cp-kafka" ).withTag(" 5.3.1" )).apply { this .start() }
36
+ KafkaContainer (DockerImageName .parse(" confluentinc/cp-kafka" ).withTag(" 5.3.1" )).apply {
37
+ this .waitingFor(HostPortWaitStrategy ())
38
+ this .start()
39
+ }
35
40
}
36
41
}
37
- val inntektId = InntektId (ULID ().nextULID())
38
42
43
+ val inntektId = InntektId (ULID ().nextULID())
39
44
private val bruktInntektMelding = mapOf (
40
45
" @event_name" to " brukt_inntekt" ,
41
46
" inntektsId" to inntektId.id,
@@ -53,7 +58,6 @@ internal class KafkaSubsumsjonBruktDataConsumerTest {
53
58
" type" to " vedtak"
54
59
)
55
60
)
56
-
57
61
private val producer by lazy {
58
62
KafkaProducer <String , String >(
59
63
producerConfig(
@@ -66,6 +70,7 @@ internal class KafkaSubsumsjonBruktDataConsumerTest {
66
70
)
67
71
}
68
72
private val adapter = moshiInstance.adapter<Map <String , Any ?>>(Map ::class .java).lenient()
73
+
69
74
@Test
70
75
fun `Should mark inntekt id as used` () {
71
76
runBlocking {
@@ -74,16 +79,15 @@ internal class KafkaSubsumsjonBruktDataConsumerTest {
74
79
val config = Config .config.inntektApiConfig.run {
75
80
copy(application = application.copy(brokers = Kafka .instance.bootstrapServers, credential = null ))
76
81
}
77
-
78
82
val consumer = KafkaSubsumsjonBruktDataConsumer (config, storeMock).apply {
79
83
listen()
80
84
}
81
-
82
- val metaData = producer.send(ProducerRecord (config.inntektBruktDataTopic, " test" , adapter.toJson(bruktInntektMelding)))
83
- .get(5 , TimeUnit .SECONDS )
85
+ val metaData =
86
+ producer.send(ProducerRecord (config.inntektBruktDataTopic, " test" , adapter.toJson(bruktInntektMelding)))
87
+ .get(5 , TimeUnit .SECONDS )
84
88
LOGGER .info(" Producer produced $bruktInntektMelding with meta $metaData " )
85
89
86
- TimeUnit .MILLISECONDS .sleep(500 )
90
+ TimeUnit .MILLISECONDS .sleep(800 )
87
91
88
92
verify(exactly = 1 ) {
89
93
storeMock.markerInntektBrukt(inntektId)
@@ -100,14 +104,17 @@ internal class KafkaSubsumsjonBruktDataConsumerTest {
100
104
val config = Config .config.inntektApiConfig.run {
101
105
copy(application = application.copy(brokers = Kafka .instance.bootstrapServers, credential = null ))
102
106
}
103
-
104
107
val consumer = KafkaSubsumsjonBruktDataConsumer (config, storeMock).apply {
105
108
listen()
106
109
}
107
-
108
110
val bruktSubsumsjonData = mapOf (" faktum" to mapOf (" manueltGrunnlag" to " 122212" ))
109
-
110
- val metaData = producer.send(ProducerRecord (config.inntektBruktDataTopic, " test" , adapter.toJson(bruktInntektMeldingManueltGrunnlag)))
111
+ val metaData = producer.send(
112
+ ProducerRecord (
113
+ config.inntektBruktDataTopic,
114
+ " test" ,
115
+ adapter.toJson(bruktInntektMeldingManueltGrunnlag)
116
+ )
117
+ )
111
118
.get(5 , TimeUnit .SECONDS )
112
119
LOGGER .info(" Producer produced $bruktSubsumsjonData with meta $metaData " )
113
120
@@ -131,13 +138,16 @@ internal class KafkaSubsumsjonBruktDataConsumerTest {
131
138
val config = Config .config.inntektApiConfig.run {
132
139
copy(application = application.copy(brokers = Kafka .instance.bootstrapServers, credential = null ))
133
140
}
134
-
135
- val consumer = KafkaSubsumsjonBruktDataConsumer (config = config, inntektStore = storeMock, graceDuration = Duration .ofMillis(1 )).apply {
141
+ val consumer = KafkaSubsumsjonBruktDataConsumer (
142
+ config = config,
143
+ inntektStore = storeMock,
144
+ graceDuration = Duration .ofMillis(1 )
145
+ ).apply {
136
146
listen()
137
147
}
138
-
139
- val metaData = producer.send(ProducerRecord (config.inntektBruktDataTopic, " test" , adapter.toJson(bruktInntektMelding)))
140
- .get(5 , TimeUnit .SECONDS )
148
+ val metaData =
149
+ producer.send(ProducerRecord (config.inntektBruktDataTopic, " test" , adapter.toJson(bruktInntektMelding)))
150
+ .get(5 , TimeUnit .SECONDS )
141
151
LOGGER .info(" Producer produced $bruktInntektMelding with meta $metaData + should fail" )
142
152
143
153
TimeUnit .MILLISECONDS .sleep(1500 )
@@ -148,9 +158,11 @@ internal class KafkaSubsumsjonBruktDataConsumerTest {
148
158
149
159
@Test
150
160
fun `Grace period is over` () {
151
- val graceperiod1 = KafkaSubsumsjonBruktDataConsumer .Grace (from = ZonedDateTime .now(ZoneOffset .UTC ).minusHours(1 ))
161
+ val graceperiod1 =
162
+ KafkaSubsumsjonBruktDataConsumer .Grace (from = ZonedDateTime .now(ZoneOffset .UTC ).minusHours(1 ))
152
163
graceperiod1.expired() shouldBe false
153
- val graceperiod2 = KafkaSubsumsjonBruktDataConsumer .Grace (from = ZonedDateTime .now(ZoneOffset .UTC ).minusHours(4 ))
164
+ val graceperiod2 =
165
+ KafkaSubsumsjonBruktDataConsumer .Grace (from = ZonedDateTime .now(ZoneOffset .UTC ).minusHours(4 ))
154
166
graceperiod2.expired() shouldBe true
155
167
}
156
168
}
0 commit comments