Skip to content

Commit fd27a08

Browse files
authored
[#2254][FOLLOWUP] improvement(spark):Add include key filter before report extraProperties (#2265)
### What changes were proposed in this pull request? - Add include key filter before report extraProperties - Related docs ### Why are the changes needed? - Add include key filter - Add documents. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Tested on our test cluster. - Client conf ``` --conf spark.shuffle.manager=org.apache.spark.shuffle.DelegationRssShuffleManager --conf spark.rss.access.id="access.id" --conf spark.rss.client.reportIncludeProperties="spark.test.key,test2.key,app.id,yarn.queue" --conf spark.rss.client.reportExcludeProperties="app.id" ``` - Coordinator_rpc_audit.log ``` [2024-11-26 21:14:45.632] cmd=accessCluster statusCode=ACCESS_DENIED from=/XXX:34733 executionTimeUs=276(<1s) appId=N/A args{accessInfo=AccessInfo{accessId='rss-test-mbl-003-bannedid', user= tdwadmin, tags=[ss_v5], extraProperties={access_info_required_shuffle_nodes_num=-1, yarn.queue=g_teg_tdw_operlog-offline, spark.test.key=123_456}}} ```
1 parent d7aad66 commit fd27a08

File tree

6 files changed

+103
-1
lines changed

6 files changed

+103
-1
lines changed

client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,10 @@ private boolean tryAccessCluster() {
133133
RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
134134
List<String> excludeProperties =
135135
rssConf.get(RssClientConf.RSS_CLIENT_REPORT_EXCLUDE_PROPERTIES);
136+
List<String> includeProperties =
137+
rssConf.get(RssClientConf.RSS_CLIENT_REPORT_INCLUDE_PROPERTIES);
136138
rssConf.getAll().stream()
139+
.filter(entry -> includeProperties == null || includeProperties.contains(entry.getKey()))
137140
.filter(entry -> !excludeProperties.contains(entry.getKey()))
138141
.forEach(entry -> extraProperties.put(entry.getKey(), (String) entry.getValue()));
139142

client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,10 @@ private boolean tryAccessCluster() {
137137
RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
138138
List<String> excludeProperties =
139139
rssConf.get(RssClientConf.RSS_CLIENT_REPORT_EXCLUDE_PROPERTIES);
140+
List<String> includeProperties =
141+
rssConf.get(RssClientConf.RSS_CLIENT_REPORT_INCLUDE_PROPERTIES);
140142
rssConf.getAll().stream()
143+
.filter(entry -> includeProperties == null || includeProperties.contains(entry.getKey()))
141144
.filter(entry -> !excludeProperties.contains(entry.getKey()))
142145
.forEach(entry -> extraProperties.put(entry.getKey(), (String) entry.getValue()));
143146

client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,19 @@
2222
import org.apache.spark.SparkConf;
2323
import org.apache.spark.shuffle.sort.SortShuffleManager;
2424
import org.junit.jupiter.api.Test;
25+
import org.mockito.ArgumentCaptor;
2526

27+
import org.apache.uniffle.client.api.CoordinatorClient;
28+
import org.apache.uniffle.client.request.RssAccessClusterRequest;
29+
import org.apache.uniffle.common.config.RssClientConf;
2630
import org.apache.uniffle.storage.util.StorageType;
2731

2832
import static org.apache.uniffle.common.rpc.StatusCode.ACCESS_DENIED;
2933
import static org.apache.uniffle.common.rpc.StatusCode.SUCCESS;
3034
import static org.junit.jupiter.api.Assertions.assertEquals;
3135
import static org.junit.jupiter.api.Assertions.assertFalse;
3236
import static org.junit.jupiter.api.Assertions.assertTrue;
37+
import static org.mockito.Mockito.verify;
3338

3439
public class DelegationRssShuffleManagerTest extends RssShuffleManagerTestBase {
3540

@@ -131,6 +136,87 @@ public void testTryAccessCluster() throws Exception {
131136
assertCreateSortShuffleManager(secondConf);
132137
}
133138

139+
@Test
140+
public void testDefaultIncludeExcludeProperties() throws Exception {
141+
final CoordinatorClient mockClient = setupMockedRssShuffleUtils(SUCCESS);
142+
SparkConf conf = new SparkConf();
143+
conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS, 3000L);
144+
conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES, 3);
145+
conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false");
146+
conf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId");
147+
conf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002");
148+
conf.set("spark.rss.storage.type", StorageType.LOCALFILE.name());
149+
conf.set(RssSparkConfig.RSS_TEST_MODE_ENABLE, true);
150+
final int confInitKeyCount = conf.getAll().length;
151+
assertCreateRssShuffleManager(conf);
152+
153+
// default case: access cluster should include all properties in conf and an extra one.
154+
ArgumentCaptor<RssAccessClusterRequest> argumentCaptor =
155+
ArgumentCaptor.forClass(RssAccessClusterRequest.class);
156+
verify(mockClient).accessCluster(argumentCaptor.capture());
157+
RssAccessClusterRequest request = argumentCaptor.getValue();
158+
assertEquals(confInitKeyCount + 1, request.getExtraProperties().size());
159+
}
160+
161+
@Test
162+
public void testIncludeProperties() throws Exception {
163+
final CoordinatorClient mockClient = setupMockedRssShuffleUtils(SUCCESS);
164+
SparkConf conf = new SparkConf();
165+
conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS, 3000L);
166+
conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES, 3);
167+
conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false");
168+
conf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId");
169+
conf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002");
170+
conf.set("spark.rss.storage.type", StorageType.LOCALFILE.name());
171+
conf.set(RssSparkConfig.RSS_TEST_MODE_ENABLE, true);
172+
// test include properties
173+
conf.set(
174+
RssSparkConfig.SPARK_RSS_CONFIG_PREFIX
175+
+ RssClientConf.RSS_CLIENT_REPORT_INCLUDE_PROPERTIES.key(),
176+
RssSparkConfig.RSS_ACCESS_ID
177+
.key()
178+
.substring(RssSparkConfig.SPARK_RSS_CONFIG_PREFIX.length()));
179+
assertCreateRssShuffleManager(conf);
180+
181+
ArgumentCaptor<RssAccessClusterRequest> argumentCaptor =
182+
ArgumentCaptor.forClass(RssAccessClusterRequest.class);
183+
184+
verify(mockClient).accessCluster(argumentCaptor.capture());
185+
RssAccessClusterRequest request = argumentCaptor.getValue();
186+
// only accessId and extra one
187+
assertEquals(1 + 1, request.getExtraProperties().size());
188+
}
189+
190+
@Test
191+
public void testExcludeProperties() throws Exception {
192+
final CoordinatorClient mockClient = setupMockedRssShuffleUtils(SUCCESS);
193+
SparkConf conf = new SparkConf();
194+
conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS, 3000L);
195+
conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES, 3);
196+
conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false");
197+
conf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId");
198+
conf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002");
199+
conf.set("spark.rss.storage.type", StorageType.LOCALFILE.name());
200+
conf.set(RssSparkConfig.RSS_TEST_MODE_ENABLE, true);
201+
// test exclude properties
202+
conf.set(
203+
RssSparkConfig.SPARK_RSS_CONFIG_PREFIX
204+
+ RssClientConf.RSS_CLIENT_REPORT_EXCLUDE_PROPERTIES.key(),
205+
RssSparkConfig.RSS_ACCESS_ID
206+
.key()
207+
.substring(RssSparkConfig.SPARK_RSS_CONFIG_PREFIX.length()));
208+
final int confInitKeyCount = conf.getAll().length;
209+
assertCreateRssShuffleManager(conf);
210+
211+
ArgumentCaptor<RssAccessClusterRequest> argumentCaptor =
212+
ArgumentCaptor.forClass(RssAccessClusterRequest.class);
213+
214+
verify(mockClient).accessCluster(argumentCaptor.capture());
215+
RssAccessClusterRequest request = argumentCaptor.getValue();
216+
// all accessId and extra one except the excluded one
217+
assertEquals(confInitKeyCount + 1 - 1, request.getExtraProperties().size());
218+
}
219+
134220
private void assertCreateSortShuffleManager(SparkConf conf) throws Exception {
135221
DelegationRssShuffleManager delegationRssShuffleManager =
136222
new DelegationRssShuffleManager(conf, true);

client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssShuffleManagerTestBase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ protected CoordinatorClient createCoordinatorClient(StatusCode status) {
5656
return mockedCoordinatorClient;
5757
}
5858

59-
void setupMockedRssShuffleUtils(StatusCode status) {
59+
CoordinatorClient setupMockedRssShuffleUtils(StatusCode status) {
6060
CoordinatorClient mockCoordinatorClient = createCoordinatorClient(status);
6161
List<CoordinatorClient> coordinatorClients = Lists.newArrayList();
6262
coordinatorClients.add(mockCoordinatorClient);
@@ -65,5 +65,6 @@ void setupMockedRssShuffleUtils(StatusCode status) {
6565
mockedStaticRssShuffleUtils
6666
.when(() -> RssSparkShuffleUtils.createCoordinatorClients(any()))
6767
.thenReturn(client);
68+
return mockCoordinatorClient;
6869
}
6970
}

common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,4 +310,11 @@ public class RssClientConf {
310310
.asList()
311311
.defaultValues()
312312
.withDescription("the report exclude properties could be configured by this option");
313+
314+
public static final ConfigOption<List<String>> RSS_CLIENT_REPORT_INCLUDE_PROPERTIES =
315+
ConfigOptions.key("rss.client.reportIncludeProperties")
316+
.stringType()
317+
.asList()
318+
.noDefaultValue()
319+
.withDescription("the report include properties could be configured by this option");
313320
}

docs/client_guide/client_guide.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ The important configuration of client is listed as following. These configuratio
6060
| <client_type>.rss.client.rpc.netty.maxOrder | 3 | The value of maxOrder for PooledByteBufAllocator when using gRPC internal Netty on the client-side. This configuration will only take effect when rss.rpc.server.type is set to GRPC_NETTY. |
6161
| <client_type>.rss.client.rpc.netty.smallCacheSize | 1024 | The value of smallCacheSize for PooledByteBufAllocator when using gRPC internal Netty on the client-side. This configuration will only take effect when rss.rpc.server.type is set to GRPC_NETTY. |
6262
| <client_type>.rss.client.blockIdManagerClass | - | The block id manager class of server for this application, the implementation of this interface to manage the shuffle block ids |
63+
| <client_type>.rss.client.reportExcludeProperties | - | The value of exclude properties specify a list of client configuration properties that should not be reported to the coordinator by the DelegationRssShuffleManager. |
64+
| <client_type>.rss.client.reportIncludeProperties | - | The value of include properties specify a list of client configuration properties that should be exclusively reported to the coordinator by the DelegationRssShuffleManager. |
6365

6466
Notice:
6567

0 commit comments

Comments
 (0)