Skip to content

Commit a637279

Browse files
committed
Address comment, set no default value and add UT
1 parent 063378c commit a637279

File tree

5 files changed

+107
-4
lines changed

5 files changed

+107
-4
lines changed

client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ private boolean tryAccessCluster() {
136136
List<String> includeProperties =
137137
rssConf.get(RssClientConf.RSS_CLIENT_REPORT_INCLUDE_PROPERTIES);
138138
rssConf.getAll().stream()
139-
.filter(entry -> includeProperties.isEmpty() || includeProperties.contains(entry.getKey()))
139+
.filter(entry -> includeProperties == null || includeProperties.contains(entry.getKey()))
140140
.filter(entry -> !excludeProperties.contains(entry.getKey()))
141141
.forEach(entry -> extraProperties.put(entry.getKey(), (String) entry.getValue()));
142142

client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ private boolean tryAccessCluster() {
140140
List<String> includeProperties =
141141
rssConf.get(RssClientConf.RSS_CLIENT_REPORT_INCLUDE_PROPERTIES);
142142
rssConf.getAll().stream()
143-
.filter(entry -> includeProperties.isEmpty() || includeProperties.contains(entry.getKey()))
143+
.filter(entry -> includeProperties == null || includeProperties.contains(entry.getKey()))
144144
.filter(entry -> !excludeProperties.contains(entry.getKey()))
145145
.forEach(entry -> extraProperties.put(entry.getKey(), (String) entry.getValue()));
146146

client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,20 @@
2222
import org.apache.spark.SparkConf;
2323
import org.apache.spark.shuffle.sort.SortShuffleManager;
2424
import org.junit.jupiter.api.Test;
25+
import org.mockito.ArgumentCaptor;
2526

27+
import org.apache.uniffle.client.api.CoordinatorClient;
28+
import org.apache.uniffle.client.request.RssAccessClusterRequest;
29+
import org.apache.uniffle.common.config.RssClientConf;
2630
import org.apache.uniffle.storage.util.StorageType;
2731

2832
import static org.apache.uniffle.common.rpc.StatusCode.ACCESS_DENIED;
2933
import static org.apache.uniffle.common.rpc.StatusCode.SUCCESS;
3034
import static org.junit.jupiter.api.Assertions.assertEquals;
3135
import static org.junit.jupiter.api.Assertions.assertFalse;
3236
import static org.junit.jupiter.api.Assertions.assertTrue;
37+
import static org.mockito.Mockito.reset;
38+
import static org.mockito.Mockito.verify;
3339

3440
public class DelegationRssShuffleManagerTest extends RssShuffleManagerTestBase {
3541

@@ -131,6 +137,102 @@ public void testTryAccessCluster() throws Exception {
131137
assertCreateSortShuffleManager(secondConf);
132138
}
133139

140+
@Test
141+
public void testDefaultIncludeExcludeProperties() throws Exception {
142+
CoordinatorClient mockClient = setupMockedRssShuffleUtils(SUCCESS);
143+
SparkConf conf = new SparkConf();
144+
conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS, 3000L);
145+
conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES, 3);
146+
conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false");
147+
conf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId");
148+
conf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002");
149+
conf.set("spark.rss.storage.type", StorageType.LOCALFILE.name());
150+
conf.set(RssSparkConfig.RSS_TEST_MODE_ENABLE, true);
151+
int confInitKeyCount = conf.getAll().length;
152+
assertCreateRssShuffleManager(conf);
153+
154+
// default case: access cluster should include all properties in conf and an extra one.
155+
ArgumentCaptor<RssAccessClusterRequest> argumentCaptor =
156+
ArgumentCaptor.forClass(RssAccessClusterRequest.class);
157+
verify(mockClient).accessCluster(argumentCaptor.capture());
158+
RssAccessClusterRequest request = argumentCaptor.getValue();
159+
assertEquals(confInitKeyCount + 1, request.getExtraProperties().size());
160+
reset(mockClient);
161+
}
162+
163+
@Test
164+
public void testIncludeProperties() throws Exception {
165+
CoordinatorClient mockClient = setupMockedRssShuffleUtils(SUCCESS);
166+
SparkConf conf = new SparkConf();
167+
conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS, 3000L);
168+
conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES, 3);
169+
conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false");
170+
conf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId");
171+
conf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002");
172+
conf.set("spark.rss.storage.type", StorageType.LOCALFILE.name());
173+
conf.set(RssSparkConfig.RSS_TEST_MODE_ENABLE, true);
174+
// test include properties
175+
conf.set(
176+
RssSparkConfig.SPARK_RSS_CONFIG_PREFIX
177+
+ RssClientConf.RSS_CLIENT_REPORT_INCLUDE_PROPERTIES.key(),
178+
RssSparkConfig.RSS_ACCESS_ID
179+
.key()
180+
.substring(RssSparkConfig.SPARK_RSS_CONFIG_PREFIX.length()));
181+
int confInitKeyCount = conf.getAll().length;
182+
assertCreateRssShuffleManager(conf);
183+
184+
ArgumentCaptor<RssAccessClusterRequest> argumentCaptor =
185+
ArgumentCaptor.forClass(RssAccessClusterRequest.class);
186+
187+
verify(mockClient).accessCluster(argumentCaptor.capture());
188+
RssAccessClusterRequest request = argumentCaptor.getValue();
189+
// only accessId and extra one
190+
assertEquals(1 + 1, request.getExtraProperties().size());
191+
reset(mockClient);
192+
193+
// test exclude properties
194+
conf.set(
195+
RssSparkConfig.SPARK_RSS_CONFIG_PREFIX
196+
+ RssClientConf.RSS_CLIENT_REPORT_EXCLUDE_PROPERTIES.key(),
197+
RssSparkConfig.RSS_ACCESS_ID.key());
198+
assertCreateRssShuffleManager(conf);
199+
argumentCaptor = ArgumentCaptor.forClass(RssAccessClusterRequest.class);
200+
verify(mockClient).accessCluster(argumentCaptor.capture());
201+
request = argumentCaptor.getValue();
202+
assertEquals(confInitKeyCount + 1 - 1, request.getExtraProperties().size());
203+
reset(mockClient);
204+
}
205+
206+
@Test
207+
public void testExcludeProperties() throws Exception {
208+
CoordinatorClient mockClient = setupMockedRssShuffleUtils(SUCCESS);
209+
SparkConf conf = new SparkConf();
210+
conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS, 3000L);
211+
conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES, 3);
212+
conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false");
213+
conf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId");
214+
conf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002");
215+
conf.set("spark.rss.storage.type", StorageType.LOCALFILE.name());
216+
conf.set(RssSparkConfig.RSS_TEST_MODE_ENABLE, true);
217+
// test exclude properties
218+
conf.set(
219+
RssSparkConfig.SPARK_RSS_CONFIG_PREFIX
220+
+ RssClientConf.RSS_CLIENT_REPORT_EXCLUDE_PROPERTIES.key(),
221+
RssSparkConfig.RSS_ACCESS_ID
222+
.key()
223+
.substring(RssSparkConfig.SPARK_RSS_CONFIG_PREFIX.length()));
224+
int confInitKeyCount = conf.getAll().length;
225+
assertCreateRssShuffleManager(conf);
226+
227+
ArgumentCaptor<RssAccessClusterRequest> argumentCaptor =
228+
ArgumentCaptor.forClass(RssAccessClusterRequest.class);
229+
230+
verify(mockClient).accessCluster(argumentCaptor.capture());
231+
RssAccessClusterRequest request = argumentCaptor.getValue();
232+
// all accessId and extra one except the excluded one
233+
assertEquals(confInitKeyCount + 1 - 1, request.getExtraProperties().size());
234+
}
235+
134236
private void assertCreateSortShuffleManager(SparkConf conf) throws Exception {
135237
DelegationRssShuffleManager delegationRssShuffleManager =
136238
new DelegationRssShuffleManager(conf, true);

client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssShuffleManagerTestBase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ protected CoordinatorClient createCoordinatorClient(StatusCode status) {
5656
return mockedCoordinatorClient;
5757
}
5858

59-
void setupMockedRssShuffleUtils(StatusCode status) {
59+
CoordinatorClient setupMockedRssShuffleUtils(StatusCode status) {
6060
CoordinatorClient mockCoordinatorClient = createCoordinatorClient(status);
6161
List<CoordinatorClient> coordinatorClients = Lists.newArrayList();
6262
coordinatorClients.add(mockCoordinatorClient);
@@ -65,5 +65,6 @@ void setupMockedRssShuffleUtils(StatusCode status) {
6565
mockedStaticRssShuffleUtils
6666
.when(() -> RssSparkShuffleUtils.createCoordinatorClients(any()))
6767
.thenReturn(client);
68+
return mockCoordinatorClient;
6869
}
6970
}

common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,6 @@ public class RssClientConf {
315315
ConfigOptions.key("rss.client.reportIncludeProperties")
316316
.stringType()
317317
.asList()
318-
.defaultValues()
318+
.noDefaultValue()
319319
.withDescription("the report include properties could be configured by this option");
320320
}

0 commit comments

Comments
 (0)