Skip to content

Commit a7a49f6

Browse files
authored
Web console: Fix inactive worker counting (#18806)
* inactive if zero across all counters, not just input * add test
1 parent c3c66de commit a7a49f6

File tree

2 files changed

+359
-4
lines changed

2 files changed

+359
-4
lines changed

web-console/src/druid-models/stages/stages.spec.ts

Lines changed: 356 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
import { aggregateSortProgressCounters } from './stages';
19+
import { aggregateSortProgressCounters, Stages } from './stages';
2020
import { STAGES } from './stages.mock';
2121

2222
describe('aggregateSortProgressCounters', () => {
@@ -69,6 +69,361 @@ describe('Stages', () => {
6969
});
7070
});
7171

72+
describe('#getInactiveWorkerCount', () => {
73+
it('returns undefined when no counters exist for stage', () => {
74+
// Create a custom Stages instance where stage has no counters
75+
const customStages = new Stages(
76+
[
77+
{
78+
stageNumber: 5,
79+
definition: {
80+
id: 'test-stage-no-counters',
81+
input: [
82+
{
83+
type: 'external',
84+
inputSource: { type: 'http', uris: [] },
85+
inputFormat: { type: 'json' },
86+
signature: [],
87+
},
88+
],
89+
processor: { type: 'scan' },
90+
signature: [],
91+
maxWorkerCount: 1,
92+
},
93+
phase: 'NEW',
94+
workerCount: 1,
95+
partitionCount: 1,
96+
},
97+
],
98+
{},
99+
);
100+
101+
expect(customStages.getInactiveWorkerCount(customStages.stages[0])).toBeUndefined();
102+
});
103+
104+
it('counts workers with zero rows across all channels', () => {
105+
// Stage 2 has counters data in the mock
106+
const inactiveCount = STAGES.getInactiveWorkerCount(STAGES.stages[2]);
107+
expect(inactiveCount).toBe(0);
108+
});
109+
110+
it('identifies inactive workers correctly', () => {
111+
// Create a custom Stages instance with workers that have zero rows
112+
const customStages = new Stages(
113+
[
114+
{
115+
stageNumber: 0,
116+
definition: {
117+
id: 'test-stage',
118+
input: [
119+
{
120+
type: 'external',
121+
inputSource: { type: 'http', uris: [] },
122+
inputFormat: { type: 'json' },
123+
signature: [],
124+
},
125+
],
126+
processor: { type: 'scan' },
127+
signature: [],
128+
maxWorkerCount: 3,
129+
},
130+
phase: 'READING_INPUT',
131+
workerCount: 3,
132+
partitionCount: 1,
133+
},
134+
],
135+
{
136+
'0': {
137+
'0': {
138+
input0: {
139+
type: 'channel',
140+
rows: [100],
141+
bytes: [1000],
142+
},
143+
output: {
144+
type: 'channel',
145+
rows: [100],
146+
bytes: [1000],
147+
},
148+
},
149+
'1': {
150+
input0: {
151+
type: 'channel',
152+
rows: [0],
153+
bytes: [0],
154+
},
155+
output: {
156+
type: 'channel',
157+
rows: [0],
158+
bytes: [0],
159+
},
160+
},
161+
'2': {
162+
input0: {
163+
type: 'channel',
164+
rows: [0],
165+
bytes: [0],
166+
},
167+
output: {
168+
type: 'channel',
169+
rows: [0],
170+
bytes: [0],
171+
},
172+
},
173+
},
174+
},
175+
);
176+
177+
const inactiveCount = customStages.getInactiveWorkerCount(customStages.stages[0]);
178+
expect(inactiveCount).toBe(2);
179+
});
180+
181+
it('handles missing channel data correctly', () => {
182+
// Create a custom Stages instance where some workers have missing channels
183+
const customStages = new Stages(
184+
[
185+
{
186+
stageNumber: 0,
187+
definition: {
188+
id: 'test-stage',
189+
input: [
190+
{
191+
type: 'external',
192+
inputSource: { type: 'http', uris: [] },
193+
inputFormat: { type: 'json' },
194+
signature: [],
195+
},
196+
],
197+
processor: { type: 'scan' },
198+
signature: [],
199+
maxWorkerCount: 2,
200+
},
201+
phase: 'READING_INPUT',
202+
workerCount: 2,
203+
partitionCount: 1,
204+
},
205+
],
206+
{
207+
'0': {
208+
'0': {
209+
input0: {
210+
type: 'channel',
211+
rows: [100],
212+
bytes: [1000],
213+
},
214+
},
215+
'1': {
216+
// Missing input0 channel - should be counted as inactive
217+
},
218+
},
219+
},
220+
);
221+
222+
const inactiveCount = customStages.getInactiveWorkerCount(customStages.stages[0]);
223+
expect(inactiveCount).toBe(1);
224+
});
225+
226+
it('counts all workers as inactive when all have zero rows', () => {
227+
const customStages = new Stages(
228+
[
229+
{
230+
stageNumber: 0,
231+
definition: {
232+
id: 'test-stage',
233+
input: [
234+
{
235+
type: 'external',
236+
inputSource: { type: 'http', uris: [] },
237+
inputFormat: { type: 'json' },
238+
signature: [],
239+
},
240+
],
241+
processor: { type: 'scan' },
242+
signature: [],
243+
maxWorkerCount: 2,
244+
},
245+
phase: 'READING_INPUT',
246+
workerCount: 2,
247+
partitionCount: 1,
248+
},
249+
],
250+
{
251+
'0': {
252+
'0': {
253+
input0: {
254+
type: 'channel',
255+
rows: [],
256+
bytes: [],
257+
},
258+
},
259+
'1': {
260+
input0: {
261+
type: 'channel',
262+
rows: [0],
263+
bytes: [0],
264+
},
265+
},
266+
},
267+
},
268+
);
269+
270+
const inactiveCount = customStages.getInactiveWorkerCount(customStages.stages[0]);
271+
expect(inactiveCount).toBe(2);
272+
});
273+
274+
it('counts no inactive workers when all have non-zero rows', () => {
275+
const customStages = new Stages(
276+
[
277+
{
278+
stageNumber: 0,
279+
definition: {
280+
id: 'test-stage',
281+
input: [
282+
{
283+
type: 'external',
284+
inputSource: { type: 'http', uris: [] },
285+
inputFormat: { type: 'json' },
286+
signature: [],
287+
},
288+
],
289+
processor: { type: 'scan' },
290+
signature: [],
291+
maxWorkerCount: 3,
292+
},
293+
phase: 'READING_INPUT',
294+
workerCount: 3,
295+
partitionCount: 1,
296+
},
297+
],
298+
{
299+
'0': {
300+
'0': {
301+
input0: {
302+
type: 'channel',
303+
rows: [100],
304+
bytes: [1000],
305+
},
306+
},
307+
'1': {
308+
input0: {
309+
type: 'channel',
310+
rows: [50],
311+
bytes: [500],
312+
},
313+
},
314+
'2': {
315+
input0: {
316+
type: 'channel',
317+
rows: [75],
318+
bytes: [750],
319+
},
320+
},
321+
},
322+
},
323+
);
324+
325+
const inactiveCount = customStages.getInactiveWorkerCount(customStages.stages[0]);
326+
expect(inactiveCount).toBe(0);
327+
});
328+
329+
it('counts worker as active if it has output but no input yet', () => {
330+
// Tests the fix: input is reported in batches, so a worker might have output
331+
// before input counters are updated. Such workers should be considered active.
332+
const customStages = new Stages(
333+
[
334+
{
335+
stageNumber: 0,
336+
definition: {
337+
id: 'test-stage',
338+
input: [
339+
{
340+
type: 'external',
341+
inputSource: { type: 'http', uris: [] },
342+
inputFormat: { type: 'json' },
343+
signature: [],
344+
},
345+
],
346+
processor: { type: 'scan' },
347+
signature: [],
348+
shuffleSpec: {
349+
type: 'targetSize',
350+
clusterBy: { columns: [] },
351+
targetSize: 3000000,
352+
},
353+
maxWorkerCount: 3,
354+
},
355+
phase: 'READING_INPUT',
356+
workerCount: 3,
357+
partitionCount: 1,
358+
},
359+
],
360+
{
361+
'0': {
362+
'0': {
363+
input0: {
364+
type: 'channel',
365+
rows: [100],
366+
bytes: [1000],
367+
},
368+
output: {
369+
type: 'channel',
370+
rows: [100],
371+
bytes: [1000],
372+
},
373+
shuffle: {
374+
type: 'channel',
375+
rows: [100],
376+
bytes: [1000],
377+
},
378+
},
379+
'1': {
380+
// Worker 1 has output and shuffle but input is not reported yet (still zero)
381+
// This can happen because input is reported in batches
382+
input0: {
383+
type: 'channel',
384+
rows: [0],
385+
bytes: [0],
386+
},
387+
output: {
388+
type: 'channel',
389+
rows: [50],
390+
bytes: [500],
391+
},
392+
shuffle: {
393+
type: 'channel',
394+
rows: [50],
395+
bytes: [500],
396+
},
397+
},
398+
'2': {
399+
// Worker 2 is truly inactive - zero across all channels
400+
input0: {
401+
type: 'channel',
402+
rows: [0],
403+
bytes: [0],
404+
},
405+
output: {
406+
type: 'channel',
407+
rows: [0],
408+
bytes: [0],
409+
},
410+
shuffle: {
411+
type: 'channel',
412+
rows: [0],
413+
bytes: [0],
414+
},
415+
},
416+
},
417+
},
418+
);
419+
420+
const inactiveCount = customStages.getInactiveWorkerCount(customStages.stages[0]);
421+
// Only worker 2 should be counted as inactive
422+
// Worker 1 has output/shuffle data, so it's active even though input is zero
423+
expect(inactiveCount).toBe(1);
424+
});
425+
});
426+
72427
describe('#getByPartitionCountersForStage', () => {
73428
it('works for input', () => {
74429
expect(STAGES.getByPartitionCountersForStage(STAGES.stages[2], 'in')).toMatchInlineSnapshot(`

web-console/src/druid-models/stages/stages.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -619,17 +619,17 @@ export class Stages {
619619

620620
getInactiveWorkerCount(stage: StageDefinition): number | undefined {
621621
const { counters } = this;
622-
const { stageNumber, definition } = stage;
622+
const { stageNumber } = stage;
623623
const forStageCounters = counters?.[stageNumber];
624624
if (!forStageCounters) return;
625625

626-
const inputChannelCounters = definition.input.map((_, i) => `input${i}` as ChannelCounterName);
626+
const channelCounters = this.getChannelCounterNamesForStage(stage);
627627

628628
// Calculate and return the number of workers that have zero count across all inputChannelCounters
629629
return sum(
630630
Object.values(forStageCounters).map(stageCounters =>
631631
Number(
632-
inputChannelCounters.every(channel => {
632+
channelCounters.every(channel => {
633633
const c = stageCounters[channel];
634634
if (!c) return true;
635635
const totalRows = sum(c.rows || []);

0 commit comments

Comments
 (0)