Skip to content

Commit fab96c5

Browse files
authored
fix(native-pos): Enable shuffle stats for sapphire java (#26427)
# Release Note ``` == NO RELEASE NOTE == ``` Summary: Fix shuffle statistics reporting for PrestoSparkRemoteSourceOperator Shuffle statistics (shuffledDataSize and shuffledPositions) were not being reported for Sapphire Java queries, making it impossible to diagnose shuffle performance issues in the query performance tab. The issue was that PrestoSparkRemoteSourceOperator did not record shuffle read statistics when processing pages, unlike other shuffle operators (ExchangeOperator and MergeOperator) which call operatorContext.recordRawInput() to track shuffle reads. This change adds the missing recordRawInput() call in PrestoSparkRemoteSourceOperator.getOutput() to record shuffle read bytes and position counts. The QueryStats.create() method already checks for PrestoSparkRemoteSourceOperator by name and aggregates these statistics, but was receiving zeros because the operator never populated them. Testing: - E2E testing see test plan in phebricator diff details - Unit tests Differential Revision: D85469234
1 parent 6baedcf commit fab96c5

File tree

2 files changed

+205
-0
lines changed

2 files changed

+205
-0
lines changed

presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkRemoteSourceOperator.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,10 @@ public Page getOutput()
8383
finished = true;
8484
return null;
8585
}
86+
87+
// Record shuffle read statistics
88+
operatorContext.recordRawInput(page.getSizeInBytes(), page.getPositionCount());
89+
8690
return page;
8791
}
8892

Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.spark.execution;
15+
16+
import com.facebook.presto.Session;
17+
import com.facebook.presto.common.Page;
18+
import com.facebook.presto.common.block.BlockBuilder;
19+
import com.facebook.presto.common.type.BigintType;
20+
import com.facebook.presto.operator.DriverContext;
21+
import com.facebook.presto.operator.OperatorContext;
22+
import com.facebook.presto.operator.OperatorStats;
23+
import com.facebook.presto.operator.TaskContext;
24+
import com.facebook.presto.operator.UpdateMemory;
25+
import com.facebook.presto.spi.plan.PlanNodeId;
26+
import com.facebook.presto.testing.TestingSession;
27+
import org.testng.annotations.AfterMethod;
28+
import org.testng.annotations.BeforeMethod;
29+
import org.testng.annotations.Test;
30+
31+
import java.util.concurrent.Executor;
32+
import java.util.concurrent.ScheduledExecutorService;
33+
34+
import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
35+
import static com.facebook.presto.testing.TestingTaskContext.createTaskContext;
36+
import static java.util.concurrent.Executors.newCachedThreadPool;
37+
import static java.util.concurrent.Executors.newScheduledThreadPool;
38+
import static org.testng.Assert.assertEquals;
39+
import static org.testng.Assert.assertNotNull;
40+
import static org.testng.Assert.assertNull;
41+
42+
@Test(singleThreaded = true)
43+
public class TestPrestoSparkRemoteSourceOperator
44+
{
45+
private Executor executor;
46+
private ScheduledExecutorService scheduledExecutor;
47+
private TaskContext taskContext;
48+
49+
@BeforeMethod
50+
public void setUp()
51+
{
52+
executor = newCachedThreadPool(daemonThreadsNamed("test-executor-%s"));
53+
scheduledExecutor = newScheduledThreadPool(2, daemonThreadsNamed("test-scheduledExecutor-%s"));
54+
Session session = TestingSession.testSessionBuilder().build();
55+
taskContext = createTaskContext(executor, scheduledExecutor, session);
56+
}
57+
58+
@AfterMethod(alwaysRun = true)
59+
public void tearDown()
60+
{
61+
if (taskContext != null) {
62+
taskContext.failed(new Exception("Cleaning up"));
63+
}
64+
if (executor != null) {
65+
((java.util.concurrent.ExecutorService) executor).shutdownNow();
66+
}
67+
if (scheduledExecutor != null) {
68+
scheduledExecutor.shutdownNow();
69+
}
70+
}
71+
72+
@Test
73+
public void testRecordsShuffleReadStatistics()
74+
{
75+
// Create a test page with known size and position count
76+
Page testPage1 = createTestPage(100, 10); // 10 rows
77+
Page testPage2 = createTestPage(200, 20); // 20 rows
78+
79+
// Create a mock page input that provides test pages
80+
TestPrestoSparkPageInput pageInput = new TestPrestoSparkPageInput(testPage1, testPage2);
81+
82+
// Create operator context
83+
DriverContext driverContext = taskContext.addPipelineContext(0, true, true, false).addDriverContext();
84+
OperatorContext operatorContext = driverContext.addOperatorContext(
85+
0,
86+
new PlanNodeId("test"),
87+
PrestoSparkRemoteSourceOperator.class.getSimpleName());
88+
89+
// Create the operator
90+
PrestoSparkRemoteSourceOperator operator = new PrestoSparkRemoteSourceOperator(
91+
new PlanNodeId("test"),
92+
operatorContext,
93+
pageInput,
94+
true);
95+
96+
// Initially, no raw input should be recorded
97+
OperatorStats initialStats = operatorContext.getOperatorStats();
98+
assertEquals(initialStats.getRawInputPositions(), 0);
99+
assertEquals(initialStats.getRawInputDataSizeInBytes(), 0);
100+
101+
// Get first page
102+
Page page1 = operator.getOutput();
103+
assertNotNull(page1);
104+
assertEquals(page1.getPositionCount(), 10);
105+
106+
// Verify that raw input statistics are recorded for first page
107+
OperatorStats statsAfterPage1 = operatorContext.getOperatorStats();
108+
assertEquals(statsAfterPage1.getRawInputPositions(), 10);
109+
assertEquals(statsAfterPage1.getRawInputDataSizeInBytes(), testPage1.getSizeInBytes());
110+
111+
// Get second page
112+
Page page2 = operator.getOutput();
113+
assertNotNull(page2);
114+
assertEquals(page2.getPositionCount(), 20);
115+
116+
// Verify that raw input statistics are accumulated for both pages
117+
OperatorStats statsAfterPage2 = operatorContext.getOperatorStats();
118+
assertEquals(statsAfterPage2.getRawInputPositions(), 30); // 10 + 20
119+
assertEquals(statsAfterPage2.getRawInputDataSizeInBytes(), testPage1.getSizeInBytes() + testPage2.getSizeInBytes());
120+
121+
// Get third page (should be null - no more pages)
122+
Page page3 = operator.getOutput();
123+
assertNull(page3);
124+
125+
// Statistics should remain the same after getting null
126+
OperatorStats finalStats = operatorContext.getOperatorStats();
127+
assertEquals(finalStats.getRawInputPositions(), 30);
128+
assertEquals(finalStats.getRawInputDataSizeInBytes(), testPage1.getSizeInBytes() + testPage2.getSizeInBytes());
129+
130+
operator.close();
131+
}
132+
133+
@Test
134+
public void testNoStatisticsWhenNoPages()
135+
{
136+
// Create a page input with no pages
137+
TestPrestoSparkPageInput pageInput = new TestPrestoSparkPageInput();
138+
139+
// Create operator context
140+
DriverContext driverContext = taskContext.addPipelineContext(0, true, true, false).addDriverContext();
141+
OperatorContext operatorContext = driverContext.addOperatorContext(
142+
0,
143+
new PlanNodeId("test"),
144+
PrestoSparkRemoteSourceOperator.class.getSimpleName());
145+
146+
// Create the operator
147+
PrestoSparkRemoteSourceOperator operator = new PrestoSparkRemoteSourceOperator(
148+
new PlanNodeId("test"),
149+
operatorContext,
150+
pageInput,
151+
true);
152+
153+
// Get page (should be null immediately)
154+
Page page = operator.getOutput();
155+
assertNull(page);
156+
157+
// No statistics should be recorded
158+
OperatorStats stats = operatorContext.getOperatorStats();
159+
assertEquals(stats.getRawInputPositions(), 0);
160+
assertEquals(stats.getRawInputDataSizeInBytes(), 0);
161+
162+
operator.close();
163+
}
164+
165+
/**
166+
* Helper method to create a test page with specified size and position count
167+
*/
168+
private Page createTestPage(long targetSize, int positionCount)
169+
{
170+
BlockBuilder blockBuilder = BigintType.BIGINT.createBlockBuilder(null, positionCount);
171+
for (int i = 0; i < positionCount; i++) {
172+
BigintType.BIGINT.writeLong(blockBuilder, i);
173+
}
174+
return new Page(blockBuilder.build());
175+
}
176+
177+
/**
178+
* Test implementation of PrestoSparkPageInput for testing purposes
179+
*/
180+
private static class TestPrestoSparkPageInput
181+
implements PrestoSparkPageInput
182+
{
183+
private final Page[] pages;
184+
private int currentIndex;
185+
186+
public TestPrestoSparkPageInput(Page... pages)
187+
{
188+
this.pages = pages;
189+
this.currentIndex = 0;
190+
}
191+
192+
@Override
193+
public Page getNextPage(UpdateMemory updateMemory)
194+
{
195+
if (currentIndex >= pages.length) {
196+
return null;
197+
}
198+
return pages[currentIndex++];
199+
}
200+
}
201+
}

0 commit comments

Comments
 (0)