1
1
package no .nav .foreldrepenger .abakus .vedtak .kafka ;
2
2
3
- import static org . apache . kafka . streams . errors . StreamsUncaughtExceptionHandler . StreamThreadExceptionResponse . SHUTDOWN_CLIENT ;
3
+ import java . util . List ;
4
4
5
- import java .time .Duration ;
6
- import java .time .temporal .ChronoUnit ;
7
-
8
- import jakarta .enterprise .context .ApplicationScoped ;
9
- import jakarta .inject .Inject ;
10
-
11
- import org .apache .kafka .streams .KafkaStreams ;
12
- import org .apache .kafka .streams .StreamsBuilder ;
13
- import org .apache .kafka .streams .Topology ;
14
- import org .apache .kafka .streams .kstream .Consumed ;
15
5
import org .slf4j .Logger ;
16
6
import org .slf4j .LoggerFactory ;
17
7
18
- import no .nav .foreldrepenger .konfig .KonfigVerdi ;
19
- import no .nav .vedtak .felles .integrasjon .kafka .KafkaProperties ;
8
+ import jakarta .enterprise .context .ApplicationScoped ;
9
+ import jakarta .inject .Inject ;
10
+ import no .nav .vedtak .felles .integrasjon .kafka .KafkaConsumerManager ;
20
11
import no .nav .vedtak .log .metrics .Controllable ;
21
12
import no .nav .vedtak .log .metrics .LiveAndReadinessAware ;
22
13
@@ -25,47 +16,20 @@ public class VedtakConsumer implements LiveAndReadinessAware, Controllable {
25
16
26
17
private static final Logger LOG = LoggerFactory .getLogger (VedtakConsumer .class );
27
18
28
- private static final String APPLICATION_ID = "fpabakus" ; // Hold konstant pga offset commit
29
-
30
- private KafkaStreams stream ;
19
+ private KafkaConsumerManager <String , String > kcm ;
31
20
private String topic ;
32
21
33
22
VedtakConsumer () {
34
23
}
35
24
36
25
@ Inject
37
- public VedtakConsumer (@ KonfigVerdi (value = "kafka.fattevedtak.topic" , defaultVerdi = "teamforeldrepenger.familie-vedtakfattet-v1" ) String topicName ,
38
- VedtaksHendelseHåndterer vedtaksHendelseHåndterer ) {
39
- this .topic = topicName ;
40
-
41
- final Consumed <String , String > consumed = Consumed .with (Topology .AutoOffsetReset .EARLIEST );
42
-
43
- final var builder = new StreamsBuilder ();
44
- builder .stream (topic , consumed ).foreach (vedtaksHendelseHåndterer ::handleMessage );
45
-
46
- this .stream = new KafkaStreams (builder .build (), KafkaProperties .forStreamsStringValue (APPLICATION_ID ));
26
+ public VedtakConsumer (VedtaksHendelseHåndterer vedtaksHendelseHåndterer ) {
27
+ this .kcm = new KafkaConsumerManager <>(List .of (vedtaksHendelseHåndterer ));
47
28
}
48
29
49
- private void addShutdownHooks () {
50
- stream .setStateListener ((newState , oldState ) -> {
51
- LOG .info ("{} :: From state={} to state={}" , topic , oldState , newState );
52
-
53
- if (newState == KafkaStreams .State .ERROR ) {
54
- // if the stream has died there is no reason to keep spinning
55
- LOG .warn ("{} :: No reason to keep living, closing stream" , topic );
56
- stop ();
57
- }
58
- });
59
- stream .setUncaughtExceptionHandler (ex -> {
60
- LOG .error (topic + " :: Caught exception in stream, exiting" , ex );
61
- return SHUTDOWN_CLIENT ;
62
- });
63
- }
64
-
65
-
66
30
@ Override
67
31
public boolean isAlive () {
68
- return stream != null && stream . state (). isRunningOrRebalancing ();
32
+ return kcm . allRunning ();
69
33
}
70
34
71
35
@ Override
@@ -75,15 +39,13 @@ public boolean isReady() {
75
39
76
40
@ Override
77
41
public void start () {
78
- addShutdownHooks ();
79
- stream .start ();
80
- LOG .info ("Starter konsumering av topic={}, tilstand={}" , topic , stream .state ());
42
+ LOG .info ("Starter konsumering av topics={}" , kcm .topicNames ());
43
+ kcm .start ((t , e ) -> LOG .error ("{} :: Caught exception in stream, exiting" , t , e ));
81
44
}
82
45
83
46
@ Override
84
47
public void stop () {
85
- LOG .info ("Starter shutdown av topic={}, tilstand={} med 10 sekunder timeout" , topic , stream .state ());
86
- stream .close (Duration .of (30 , ChronoUnit .SECONDS ));
87
- LOG .info ("Shutdown av topic={}, tilstand={} med 10 sekunder timeout" , topic , stream .state ());
48
+ LOG .info ("Starter shutdown av topics={} med 10 sekunder timeout" , kcm .topicNames ());
49
+ kcm .stop ();
88
50
}
89
51
}
0 commit comments