15
15
import net .jodah .failsafe .RetryPolicy ;
16
16
import org .slf4j .Logger ;
17
17
import org .slf4j .LoggerFactory ;
18
+ import org .springframework .scheduling .annotation .EnableScheduling ;
18
19
import org .springframework .scheduling .annotation .Scheduled ;
19
20
import org .springframework .stereotype .Component ;
20
21
28
29
import java .util .concurrent .ExecutorService ;
29
30
30
31
@ Component
32
+ @ EnableScheduling
31
33
public class EventHubSender {
32
34
33
35
private static final Logger LOG = LoggerFactory .getLogger (MethodHandles .lookup ().lookupClass ());
@@ -60,20 +62,20 @@ public void send(List<LoMessage> messages) {
60
62
.withBackoff (eventHubProperties .getThrottlingDelay ().toMillis (), Duration .ofMinutes (1 ).toMillis (), ChronoUnit .MILLIS )
61
63
.onRetry (r -> {
62
64
LOG .debug ("Problem while sending message to Event Hub because of: {}. Retrying..." , r .getLastFailure ().getMessage ());
63
- counters .getMesasageSentAttemptFailedCounter ().increment ();
65
+ counters .getMesasageSentAttemptFailedCounter ().increment (messages . size () );
64
66
})
65
67
.onSuccess (r -> {
66
- LOG .debug ("Message was sent to Event Hub" );
67
- counters .getMesasageSentCounter ().increment ();
68
+ LOG .debug ("Batch of messages of the following size were sent: {}" , messages . size () );
69
+ counters .getMesasageSentCounter ().increment (messages . size () );
68
70
messages .forEach (m -> loService .sendAck (m .messageId ()));
69
71
})
70
72
.onFailure (r -> {
71
73
LOG .error ("Cannot send messages to Event Hub because of {}" , r .getFailure ());
72
- counters .getMesasageSentFailedCounter ().increment ();
74
+ counters .getMesasageSentFailedCounter ().increment (messages . size () );
73
75
messages .forEach (m -> loService .sendAck (m .messageId ()));
74
76
})
75
77
).with (executorService ).run (execution -> {
76
- counters .getMesasageSentAttemptCounter ().increment ();
78
+ counters .getMesasageSentAttemptCounter ().increment (messages . size () );
77
79
try {
78
80
List <String > messageContentList = messages .stream ().map (LoMessage ::message ).toList ();
79
81
eventHubClientFacade .sendSync (messageContentList );
@@ -98,6 +100,7 @@ private void checkConnection() {
98
100
99
101
@ Scheduled (fixedRateString = "${azure.evt-hub.synchronization-interval}" )
100
102
public void send () {
103
+ LOG .debug ("Number of messages waiting to be sent: {}" , messageQueue .size ());
101
104
if (!messageQueue .isEmpty ()) {
102
105
LOG .info ("Start sending messages..." );
103
106
0 commit comments