2121
2222import com .google .common .collect .ImmutableMap ;
2323import org .apache .druid .indexing .common .stats .TaskRealtimeMetricsMonitor ;
24- import org .apache .druid .java .util .emitter .core .Event ;
25- import org .apache .druid .java .util .emitter .service .ServiceEmitter ;
26- import org .apache .druid .java .util .emitter .service .ServiceEventBuilder ;
2724import org .apache .druid .java .util .emitter .service .ServiceMetricEvent ;
2825import org .apache .druid .java .util .metrics .MonitorUtils ;
26+ import org .apache .druid .java .util .metrics .StubServiceEmitter ;
2927import org .apache .druid .query .DruidMetrics ;
3028import org .apache .druid .segment .incremental .InputRowThrownAwayReason ;
3129import org .apache .druid .segment .incremental .RowIngestionMeters ;
3432import org .junit .Assert ;
3533import org .junit .Before ;
3634import org .junit .Test ;
37- import org .junit .runner .RunWith ;
38- import org .mockito .Answers ;
39- import org .mockito .ArgumentMatchers ;
40- import org .mockito .Mock ;
41- import org .mockito .Mockito ;
42- import org .mockito .junit .MockitoJUnitRunner ;
43-
44- import java .util .ArrayList ;
35+
4536import java .util .HashMap ;
4637import java .util .List ;
4738import java .util .Map ;
4839
49- @ RunWith (MockitoJUnitRunner .class )
5040public class TaskRealtimeMetricsMonitorTest
5141{
5242 private static final Map <String , String []> DIMENSIONS = ImmutableMap .of (
@@ -57,28 +47,18 @@ public class TaskRealtimeMetricsMonitorTest
5747 );
5848
5949 private static final Map <String , Object > TAGS = ImmutableMap .of ("author" , "Author Name" , "version" , 10 );
60- private SegmentGenerationMetrics segmentGenerationMetrics ;
6150
62- @ Mock ( answer = Answers . RETURNS_MOCKS )
51+ private SegmentGenerationMetrics segmentGenerationMetrics ;
6352 private RowIngestionMeters rowIngestionMeters ;
64- @ Mock
65- private ServiceEmitter emitter ;
66- private Map <String , ServiceMetricEvent > emittedEvents ;
53+ private StubServiceEmitter emitter ;
6754 private TaskRealtimeMetricsMonitor target ;
6855
6956 @ Before
7057 public void setUp ()
7158 {
72- emittedEvents = new HashMap <>();
7359 segmentGenerationMetrics = new SegmentGenerationMetrics ();
74- Mockito .doCallRealMethod ().when (emitter ).emit (ArgumentMatchers .any (ServiceEventBuilder .class ));
75- Mockito
76- .doAnswer (invocation -> {
77- ServiceMetricEvent e = invocation .getArgument (0 );
78- emittedEvents .put (e .getMetric (), e );
79- return null ;
80- })
81- .when (emitter ).emit (ArgumentMatchers .any (Event .class ));
60+ rowIngestionMeters = new SimpleRowIngestionMeters ();
61+ emitter = new StubServiceEmitter ();
8262 target = new TaskRealtimeMetricsMonitor (
8363 segmentGenerationMetrics ,
8464 rowIngestionMeters ,
@@ -90,45 +70,49 @@ public void setUp()
9070 public void testdoMonitorShouldEmitUserProvidedTags ()
9171 {
9272 target .doMonitor (emitter );
93- for (ServiceMetricEvent sme : emittedEvents .values ()) {
73+
74+ List <ServiceMetricEvent > events = emitter .getMetricEvents ("ingest/events/unparseable" );
75+ Assert .assertFalse (events .isEmpty ());
76+ for (ServiceMetricEvent sme : events ) {
9477 Assert .assertEquals (TAGS , sme .getUserDims ().get (DruidMetrics .TAGS ));
9578 }
9679 }
9780
9881 @ Test
9982 public void testdoMonitorWithoutTagsShouldNotEmitTags ()
10083 {
84+ ServiceMetricEvent .Builder builderWithoutTags = new ServiceMetricEvent .Builder ();
85+ MonitorUtils .addDimensionsToBuilder (builderWithoutTags , DIMENSIONS );
86+
10187 target = new TaskRealtimeMetricsMonitor (
10288 segmentGenerationMetrics ,
10389 rowIngestionMeters ,
104- createMetricEventBuilder ()
90+ builderWithoutTags
10591 );
106- for (ServiceMetricEvent sme : emittedEvents .values ()) {
92+ target .doMonitor (emitter );
93+
94+ List <ServiceMetricEvent > events = emitter .getMetricEvents ("ingest/events/unparseable" );
95+ Assert .assertFalse (events .isEmpty ());
96+ for (ServiceMetricEvent sme : events ) {
10797 Assert .assertFalse (sme .getUserDims ().containsKey (DruidMetrics .TAGS ));
10898 }
10999 }
110100
111101 @ Test
112102 public void testMessageGapAggStats ()
113103 {
114- target = new TaskRealtimeMetricsMonitor (
115- segmentGenerationMetrics ,
116- rowIngestionMeters ,
117- createMetricEventBuilder ()
118- );
119-
120104 target .doMonitor (emitter );
121- Assert .assertFalse ( emittedEvents . containsKey ("ingest/events/minMessageGap" ));
122- Assert .assertFalse ( emittedEvents . containsKey ("ingest/events/maxMessageGap" ));
123- Assert .assertFalse ( emittedEvents . containsKey ("ingest/events/avgMessageGap" ));
105+ Assert .assertTrue ( emitter . getMetricEvents ("ingest/events/minMessageGap" ). isEmpty ( ));
106+ Assert .assertTrue ( emitter . getMetricEvents ("ingest/events/maxMessageGap" ). isEmpty ( ));
107+ Assert .assertTrue ( emitter . getMetricEvents ("ingest/events/avgMessageGap" ). isEmpty ( ));
124108
125- emittedEvents . clear ();
109+ emitter . flush ();
126110 segmentGenerationMetrics .reportMessageGap (1 );
127111 target .doMonitor (emitter );
128112
129- Assert .assertTrue ( emittedEvents . containsKey ("ingest/events/minMessageGap" ));
130- Assert .assertTrue ( emittedEvents . containsKey ("ingest/events/maxMessageGap" ));
131- Assert .assertTrue ( emittedEvents . containsKey ("ingest/events/avgMessageGap" ));
113+ Assert .assertFalse ( emitter . getMetricEvents ("ingest/events/minMessageGap" ). isEmpty ( ));
114+ Assert .assertFalse ( emitter . getMetricEvents ("ingest/events/maxMessageGap" ). isEmpty ( ));
115+ Assert .assertFalse ( emitter . getMetricEvents ("ingest/events/avgMessageGap" ). isEmpty ( ));
132116 }
133117
134118 @ Test
@@ -146,31 +130,18 @@ public void testThrownAwayEmitsReasonDimension()
146130 realMeters .incrementThrownAway (InputRowThrownAwayReason .FILTERED );
147131 realMeters .incrementThrownAway (InputRowThrownAwayReason .FILTERED );
148132
149- List <ServiceMetricEvent > allEmittedEvents = new ArrayList <>();
150- ServiceEmitter captureEmitter = Mockito .mock (ServiceEmitter .class );
151- Mockito .doCallRealMethod ().when (captureEmitter ).emit (ArgumentMatchers .any (ServiceEventBuilder .class ));
152- Mockito
153- .doAnswer (invocation -> {
154- ServiceMetricEvent e = invocation .getArgument (0 );
155- allEmittedEvents .add (e );
156- return null ;
157- })
158- .when (captureEmitter ).emit (ArgumentMatchers .any (Event .class ));
159-
160133 TaskRealtimeMetricsMonitor monitor = new TaskRealtimeMetricsMonitor (
161134 segmentGenerationMetrics ,
162135 realMeters ,
163136 createMetricEventBuilder ()
164137 );
165138
166- monitor .doMonitor (captureEmitter );
139+ monitor .doMonitor (emitter );
167140
168141 Map <String , Long > thrownAwayByReason = new HashMap <>();
169- for (ServiceMetricEvent event : allEmittedEvents ) {
170- if ("ingest/events/thrownAway" .equals (event .getMetric ())) {
171- Object reason = event .getUserDims ().get ("reason" );
172- thrownAwayByReason .put (reason .toString (), event .getValue ().longValue ());
173- }
142+ for (ServiceMetricEvent event : emitter .getMetricEvents ("ingest/events/thrownAway" )) {
143+ Object reason = event .getUserDims ().get ("reason" );
144+ thrownAwayByReason .put (reason .toString (), event .getValue ().longValue ());
174145 }
175146
176147 Assert .assertEquals (Long .valueOf (2 ), thrownAwayByReason .get ("null" ));
@@ -186,57 +157,33 @@ public void testThrownAwayReasonDimensionOnlyEmittedWhenNonZero()
186157 realMeters .incrementThrownAway (InputRowThrownAwayReason .NULL_OR_EMPTY_RECORD );
187158 realMeters .incrementThrownAway (InputRowThrownAwayReason .FILTERED );
188159
189- List <ServiceMetricEvent > allEmittedEvents = new ArrayList <>();
190- ServiceEmitter captureEmitter = Mockito .mock (ServiceEmitter .class );
191- Mockito .doCallRealMethod ().when (captureEmitter ).emit (ArgumentMatchers .any (ServiceEventBuilder .class ));
192- Mockito
193- .doAnswer (invocation -> {
194- ServiceMetricEvent e = invocation .getArgument (0 );
195- allEmittedEvents .add (e );
196- return null ;
197- })
198- .when (captureEmitter ).emit (ArgumentMatchers .any (Event .class ));
199-
200160 TaskRealtimeMetricsMonitor monitor = new TaskRealtimeMetricsMonitor (
201161 segmentGenerationMetrics ,
202162 realMeters ,
203163 createMetricEventBuilder ()
204164 );
205165
206- monitor .doMonitor (captureEmitter );
166+ monitor .doMonitor (emitter );
207167
208- List <String > emittedReasons = new ArrayList <>();
209- for (ServiceMetricEvent event : allEmittedEvents ) {
210- if ("ingest/events/thrownAway" .equals (event .getMetric ())) {
211- Object reason = event .getUserDims ().get ("reason" );
212- emittedReasons .add (reason .toString ());
213- }
168+ Map <String , Long > thrownAwayByReason = new HashMap <>();
169+ for (ServiceMetricEvent event : emitter .getMetricEvents ("ingest/events/thrownAway" )) {
170+ Object reason = event .getUserDims ().get ("reason" );
171+ thrownAwayByReason .put (reason .toString (), event .getValue ().longValue ());
214172 }
215173
216174 // Only reasons with non-zero counts should be emitted
217- Assert .assertEquals (2 , emittedReasons .size ());
218- Assert .assertTrue (emittedReasons . contains ("null" ));
219- Assert .assertTrue (emittedReasons . contains ("filtered" ));
220- Assert .assertFalse (emittedReasons . contains ("beforeMinMessageTime" ));
221- Assert .assertFalse (emittedReasons . contains ("afterMaxMessageTime" ));
175+ Assert .assertEquals (2 , thrownAwayByReason .size ());
176+ Assert .assertTrue (thrownAwayByReason . containsKey ("null" ));
177+ Assert .assertTrue (thrownAwayByReason . containsKey ("filtered" ));
178+ Assert .assertFalse (thrownAwayByReason . containsKey ("beforeMinMessageTime" ));
179+ Assert .assertFalse (thrownAwayByReason . containsKey ("afterMaxMessageTime" ));
222180 }
223181
224182 @ Test
225183 public void testThrownAwayReasonDeltaAcrossMonitorCalls ()
226184 {
227185 SimpleRowIngestionMeters realMeters = new SimpleRowIngestionMeters ();
228186
229- List <ServiceMetricEvent > allEmittedEvents = new ArrayList <>();
230- ServiceEmitter captureEmitter = Mockito .mock (ServiceEmitter .class );
231- Mockito .doCallRealMethod ().when (captureEmitter ).emit (ArgumentMatchers .any (ServiceEventBuilder .class ));
232- Mockito
233- .doAnswer (invocation -> {
234- ServiceMetricEvent e = invocation .getArgument (0 );
235- allEmittedEvents .add (e );
236- return null ;
237- })
238- .when (captureEmitter ).emit (ArgumentMatchers .any (Event .class ));
239-
240187 TaskRealtimeMetricsMonitor monitor = new TaskRealtimeMetricsMonitor (
241188 segmentGenerationMetrics ,
242189 realMeters ,
@@ -245,30 +192,27 @@ public void testThrownAwayReasonDeltaAcrossMonitorCalls()
245192
246193 realMeters .incrementThrownAway (InputRowThrownAwayReason .NULL_OR_EMPTY_RECORD );
247194 realMeters .incrementThrownAway (InputRowThrownAwayReason .NULL_OR_EMPTY_RECORD );
248- monitor .doMonitor (captureEmitter );
195+ monitor .doMonitor (emitter );
249196
250197 long firstCallNullCount = 0 ;
251- for (ServiceMetricEvent event : allEmittedEvents ) {
252- if ("ingest/events/thrownAway" .equals (event .getMetric ())
253- && "null" .equals (event .getUserDims ().get ("reason" ))) {
198+ for (ServiceMetricEvent event : emitter .getMetricEvents ("ingest/events/thrownAway" )) {
199+ if ("null" .equals (event .getUserDims ().get ("reason" ))) {
254200 firstCallNullCount = event .getValue ().longValue ();
255201 }
256202 }
257203 Assert .assertEquals (2 , firstCallNullCount );
258204
259- allEmittedEvents . clear ();
205+ emitter . flush ();
260206 realMeters .incrementThrownAway (InputRowThrownAwayReason .NULL_OR_EMPTY_RECORD );
261207 realMeters .incrementThrownAway (InputRowThrownAwayReason .FILTERED );
262208 realMeters .incrementThrownAway (InputRowThrownAwayReason .FILTERED );
263- monitor .doMonitor (captureEmitter );
209+ monitor .doMonitor (emitter );
264210
265211 // Find counts from second call - should be deltas only
266212 Map <String , Long > secondCallCounts = new HashMap <>();
267- for (ServiceMetricEvent event : allEmittedEvents ) {
268- if ("ingest/events/thrownAway" .equals (event .getMetric ())) {
269- Object reason = event .getUserDims ().get ("reason" );
270- secondCallCounts .put (reason .toString (), event .getValue ().longValue ());
271- }
213+ for (ServiceMetricEvent event : emitter .getMetricEvents ("ingest/events/thrownAway" )) {
214+ Object reason = event .getUserDims ().get ("reason" );
215+ secondCallCounts .put (reason .toString (), event .getValue ().longValue ());
272216 }
273217
274218 // Should emit only the delta (1 more NULL, 2 new FILTERED)
0 commit comments