Skip to content

Commit c72a5c9

Browse files
committed
Introduce banned id manager and checker
1 parent 3cf82d7 commit c72a5c9

File tree

9 files changed

+403
-0
lines changed

9 files changed

+403
-0
lines changed

coordinator/src/main/java/org/apache/uniffle/coordinator/AccessManager.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public class AccessManager {
4040
private final CoordinatorConf coordinatorConf;
4141
private final ClusterManager clusterManager;
4242
private final QuotaManager quotaManager;
43+
private final BannedManager bannedManager;
4344
private final Configuration hadoopConf;
4445
private List<AccessChecker> accessCheckers = Lists.newArrayList();
4546

@@ -53,6 +54,7 @@ public AccessManager(
5354
this.clusterManager = clusterManager;
5455
this.hadoopConf = hadoopConf;
5556
this.quotaManager = quotaManager;
57+
this.bannedManager = new BannedManager(coordinatorConf);
5658
init();
5759
}
5860

@@ -103,6 +105,10 @@ public QuotaManager getQuotaManager() {
103105
return quotaManager;
104106
}
105107

108+
public BannedManager getBannedManager() {
109+
return bannedManager;
110+
}
111+
106112
public void close() throws IOException {
107113
for (AccessChecker checker : accessCheckers) {
108114
checker.close();
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.uniffle.coordinator;
19+
20+
import java.util.Collections;
21+
import java.util.Set;
22+
23+
import org.apache.commons.lang3.tuple.Pair;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
27+
/** BannedManager is a manager for ban the abnormal app. */
28+
public class BannedManager {
29+
private static final Logger LOG = LoggerFactory.getLogger(BannedManager.class);
30+
// versionId -> bannedIds
31+
private volatile Pair<String, Set<String>> bannedIdsFromRest =
32+
Pair.of("0", Collections.emptySet());
33+
34+
public BannedManager(CoordinatorConf conf) {
35+
LOG.info("BannedManager initialized successfully.");
36+
}
37+
38+
public boolean checkBanned(String id) {
39+
return bannedIdsFromRest.getValue().contains(id);
40+
}
41+
42+
public void reloadBannedIdsFromRest(Pair<String, Set<String>> newBannedIds) {
43+
if (newBannedIds.getKey().equals(bannedIdsFromRest.getKey())) {
44+
LOG.warn("receive bannedIds from rest with the same version: {}", newBannedIds.getKey());
45+
}
46+
bannedIdsFromRest = newBannedIds;
47+
}
48+
49+
public String getBannedIdsFromRestVersion() {
50+
return bannedIdsFromRest.getKey();
51+
}
52+
}

coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,16 @@ public class CoordinatorConf extends RssBaseConf {
256256
.asList()
257257
.defaultValues("appHeartbeat", "heartbeat")
258258
.withDescription("Exclude record rpc audit operation list, separated by ','");
259+
public static final ConfigOption<String> COORDINATOR_ACCESS_BANNED_ID_PROVIDER =
260+
ConfigOptions.key("rss.coordinator.access.bannedIdProvider")
261+
.stringType()
262+
.noDefaultValue()
263+
.withDescription("Get the banned id from Access banned id provider ");
264+
public static final ConfigOption<String> COORDINATOR_ACCESS_BANNED_ID_PROVIDER_REG_PATTERN =
265+
ConfigOptions.key("rss.coordinator.access.bannedIdProviderPattern")
266+
.stringType()
267+
.defaultValue("(.*)")
268+
.withDescription("The regular banned id pattern to extract");
259269

260270
public CoordinatorConf() {}
261271

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.uniffle.coordinator.access.checker;
19+
20+
import java.util.regex.Matcher;
21+
import java.util.regex.Pattern;
22+
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
26+
import org.apache.uniffle.common.util.Constants;
27+
import org.apache.uniffle.coordinator.AccessManager;
28+
import org.apache.uniffle.coordinator.CoordinatorConf;
29+
import org.apache.uniffle.coordinator.access.AccessCheckResult;
30+
import org.apache.uniffle.coordinator.access.AccessInfo;
31+
import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
32+
33+
/**
34+
* AccessBannedChecker maintain a list of banned id and update it periodically, it checks the banned
35+
* id in the access request and reject if the id is in the banned list.
36+
*/
37+
public class AccessBannedChecker extends AbstractAccessChecker {
38+
private static final Logger LOG = LoggerFactory.getLogger(AccessBannedChecker.class);
39+
private final AccessManager accessManager;
40+
private final String bannedIdProviderKey;
41+
private final Pattern bannedIdProviderPattern;
42+
43+
public AccessBannedChecker(AccessManager accessManager) throws Exception {
44+
super(accessManager);
45+
this.accessManager = accessManager;
46+
CoordinatorConf conf = accessManager.getCoordinatorConf();
47+
bannedIdProviderKey = conf.get(CoordinatorConf.COORDINATOR_ACCESS_BANNED_ID_PROVIDER);
48+
String bannedIdProviderRegex =
49+
conf.get(CoordinatorConf.COORDINATOR_ACCESS_BANNED_ID_PROVIDER_REG_PATTERN);
50+
bannedIdProviderPattern = Pattern.compile(bannedIdProviderRegex);
51+
52+
LOG.info(
53+
"Construct BannedChecker. BannedIdProviderKey is {}, pattern is {}",
54+
bannedIdProviderKey,
55+
bannedIdProviderRegex);
56+
}
57+
58+
@Override
59+
public AccessCheckResult check(AccessInfo accessInfo) {
60+
if (accessInfo.getExtraProperties() != null
61+
&& accessInfo.getExtraProperties().containsKey(bannedIdProviderKey)) {
62+
String bannedIdPropertyValue = accessInfo.getExtraProperties().get(bannedIdProviderKey);
63+
Matcher matcher = bannedIdProviderPattern.matcher(bannedIdPropertyValue);
64+
if (matcher.find()) {
65+
String bannedId = matcher.group(1);
66+
if (accessManager.getBannedManager() != null
67+
&& accessManager.getBannedManager().checkBanned(bannedId)) {
68+
String msg = String.format("Denied by BannedChecker, accessInfo[%s].", accessInfo);
69+
if (LOG.isDebugEnabled()) {
70+
LOG.debug("BannedIdPropertyValue is {}, {}", bannedIdPropertyValue, msg);
71+
}
72+
CoordinatorMetrics.counterTotalBannedDeniedRequest.inc();
73+
return new AccessCheckResult(false, msg);
74+
}
75+
}
76+
}
77+
78+
return new AccessCheckResult(true, Constants.COMMON_SUCCESS_MESSAGE);
79+
}
80+
81+
@Override
82+
public void close() {}
83+
}

coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public class CoordinatorMetrics {
4343
private static final String TOTAL_CANDIDATES_DENIED_REQUEST = "total_candidates_denied_request";
4444
private static final String TOTAL_LOAD_DENIED_REQUEST = "total_load_denied_request";
4545
private static final String TOTAL_QUOTA_DENIED_REQUEST = "total_quota_denied_request";
46+
private static final String TOTAL_BANNED_DENIED_REQUEST = "total_banned_denied_request";
4647
public static final String REMOTE_STORAGE_IN_USED_PREFIX = "remote_storage_in_used_";
4748
public static final String APP_NUM_TO_USER = "app_num";
4849
public static final String USER_LABEL = "user_name";
@@ -57,6 +58,7 @@ public class CoordinatorMetrics {
5758
public static Counter counterTotalCandidatesDeniedRequest;
5859
public static Counter counterTotalQuotaDeniedRequest;
5960
public static Counter counterTotalLoadDeniedRequest;
61+
public static Counter counterTotalBannedDeniedRequest;
6062
public static final Map<String, Gauge> GAUGE_USED_REMOTE_STORAGE = JavaUtils.newConcurrentMap();
6163

6264
private static MetricsManager metricsManager;
@@ -118,5 +120,6 @@ private static void setUpMetrics() {
118120
metricsManager.addCounter(TOTAL_CANDIDATES_DENIED_REQUEST);
119121
counterTotalQuotaDeniedRequest = metricsManager.addCounter(TOTAL_QUOTA_DENIED_REQUEST);
120122
counterTotalLoadDeniedRequest = metricsManager.addCounter(TOTAL_LOAD_DENIED_REQUEST);
123+
counterTotalBannedDeniedRequest = metricsManager.addCounter(TOTAL_BANNED_DENIED_REQUEST);
121124
}
122125
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.uniffle.coordinator.web.resource;
19+
20+
import javax.servlet.ServletContext;
21+
22+
import org.apache.commons.lang3.tuple.Pair;
23+
import org.apache.hbase.thirdparty.javax.ws.rs.Consumes;
24+
import org.apache.hbase.thirdparty.javax.ws.rs.GET;
25+
import org.apache.hbase.thirdparty.javax.ws.rs.POST;
26+
import org.apache.hbase.thirdparty.javax.ws.rs.Path;
27+
import org.apache.hbase.thirdparty.javax.ws.rs.core.Context;
28+
import org.apache.hbase.thirdparty.javax.ws.rs.core.MediaType;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
32+
import org.apache.uniffle.common.web.resource.BaseResource;
33+
import org.apache.uniffle.common.web.resource.Response;
34+
import org.apache.uniffle.coordinator.AccessManager;
35+
import org.apache.uniffle.coordinator.BannedManager;
36+
import org.apache.uniffle.coordinator.web.vo.BannedReloadVO;
37+
38+
@Path("/banned")
39+
public class BannedResource extends BaseResource {
40+
private static final Logger LOG = LoggerFactory.getLogger(BannedResource.class);
41+
@Context protected ServletContext servletContext;
42+
43+
@Consumes(MediaType.APPLICATION_JSON)
44+
@POST
45+
@Path("/reload")
46+
public Response<String> reload(BannedReloadVO bannedReloadVo) {
47+
BannedManager bannedManager = getAccessManager().getBannedManager();
48+
if (bannedManager != null && bannedReloadVo != null) {
49+
bannedManager.reloadBannedIdsFromRest(
50+
Pair.of(bannedReloadVo.getVersion(), bannedReloadVo.getIds()));
51+
LOG.info("reload {} banned ids.", bannedReloadVo.getIds().size());
52+
return Response.success("success");
53+
} else {
54+
return Response.fail("bannedManager is not initialized or bannedIds is null.");
55+
}
56+
}
57+
58+
@GET
59+
@Path("version")
60+
public Response<String> version() {
61+
BannedManager bannedManager = getAccessManager().getBannedManager();
62+
if (bannedManager != null) {
63+
String version = bannedManager.getBannedIdsFromRestVersion();
64+
LOG.info("Get version of banned ids is {}.", version);
65+
return Response.success(version);
66+
} else {
67+
return Response.fail("bannedManager is not initialized.");
68+
}
69+
}
70+
71+
private AccessManager getAccessManager() {
72+
return (AccessManager) servletContext.getAttribute(AccessManager.class.getCanonicalName());
73+
}
74+
}

coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/CoordinatorServerResource.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,4 +116,9 @@ public String getCoordinatorStacks() {
116116
public Class<ConfOpsResource> getConfOps() {
117117
return ConfOpsResource.class;
118118
}
119+
120+
@Path("/banned")
121+
public Class<BannedResource> getBannedResource() {
122+
return BannedResource.class;
123+
}
119124
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.uniffle.coordinator.web.vo;
19+
20+
import java.util.Collections;
21+
import java.util.Set;
22+
23+
public class BannedReloadVO {
24+
private String version;
25+
private Set<String> ids = Collections.emptySet();
26+
27+
public String getVersion() {
28+
return version;
29+
}
30+
31+
public Set<String> getIds() {
32+
return ids;
33+
}
34+
35+
public void setIds(Set<String> ids) {
36+
if (ids == null) {
37+
ids = Collections.emptySet();
38+
}
39+
this.ids = ids;
40+
}
41+
42+
public void setVersion(String version) {
43+
this.version = version;
44+
}
45+
46+
@Override
47+
public String toString() {
48+
return "BannedIdsVO{"
49+
+ "versionId='"
50+
+ version
51+
+ '\''
52+
+ ", size of bannedIds="
53+
+ ids.size()
54+
+ '}';
55+
}
56+
}

0 commit comments

Comments
 (0)