Skip to content

Commit cdc5929

Browse files
committed
Merge branch 'feat/improve' of https://github.com/Aias00/shenyu into feat/improve
2 parents edc66cc + 6095a4a commit cdc5929

File tree

19 files changed

+355
-18
lines changed

19 files changed

+355
-18
lines changed

shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/UpstreamCheckService.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,8 @@ private List<CommonUpstream> submitJust(final String selectorId, final CommonUps
222222
if (!exists.isPresent()) {
223223
upstreams.add(commonUpstream);
224224
} else {
225+
CommonUpstream existUpstream = exists.get();
226+
existUpstream.setHealthCheckEnabled(commonUpstream.isHealthCheckEnabled());
225227
LOG.info("upstream host {} is exists.", commonUpstream.getUpstreamHost());
226228
}
227229
PENDING_SYNC.add(commonUpstream.hashCode());
@@ -251,6 +253,12 @@ public boolean checkAndSubmit(final String selectorId, final CommonUpstream comm
251253
if (!REGISTER_TYPE_HTTP.equalsIgnoreCase(registerType) || !checked) {
252254
return false;
253255
}
256+
if (!commonUpstream.isHealthCheckEnabled()) {
257+
commonUpstream.setStatus(true);
258+
commonUpstream.setTimestamp(System.currentTimeMillis());
259+
this.submit(selectorId, commonUpstream);
260+
return false;
261+
}
254262
final boolean pass = UpstreamCheckUtils.checkUrl(commonUpstream.getUpstreamUrl());
255263
if (pass) {
256264
this.submit(selectorId, commonUpstream);
@@ -310,6 +318,14 @@ private void checkZombie0(final ZombieUpstream zombieUpstream) {
310318
ZOMBIE_SET.remove(zombieUpstream);
311319
String selectorId = zombieUpstream.getSelectorId();
312320
CommonUpstream commonUpstream = zombieUpstream.getCommonUpstream();
321+
if (!commonUpstream.isHealthCheckEnabled()) {
322+
commonUpstream.setTimestamp(System.currentTimeMillis());
323+
commonUpstream.setStatus(true);
324+
List<CommonUpstream> old = ListUtils.unmodifiableList(UPSTREAM_MAP.getOrDefault(selectorId, Collections.emptyList()));
325+
this.submitJust(selectorId, commonUpstream);
326+
updateHandler(selectorId, old, UPSTREAM_MAP.get(selectorId));
327+
return;
328+
}
313329
final boolean pass = UpstreamCheckUtils.checkUrl(commonUpstream.getUpstreamUrl());
314330
if (pass) {
315331
commonUpstream.setTimestamp(System.currentTimeMillis());
@@ -332,6 +348,14 @@ private void check(final String selectorId, final List<CommonUpstream> upstreamL
332348
final List<CompletableFuture<CommonUpstream>> checkFutures = new ArrayList<>(upstreamList.size());
333349
for (CommonUpstream commonUpstream : upstreamList) {
334350
checkFutures.add(CompletableFuture.supplyAsync(() -> {
351+
if (!commonUpstream.isHealthCheckEnabled()) {
352+
if (!commonUpstream.isStatus()) {
353+
commonUpstream.setTimestamp(System.currentTimeMillis());
354+
commonUpstream.setStatus(true);
355+
PENDING_SYNC.add(commonUpstream.hashCode());
356+
}
357+
return commonUpstream;
358+
}
335359
final boolean pass = UpstreamCheckUtils.checkUrl(commonUpstream.getUpstreamUrl());
336360
if (pass) {
337361
if (!commonUpstream.isStatus()) {

shenyu-admin/src/main/java/org/apache/shenyu/admin/transfer/DiscoveryTransfer.java

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,14 @@ public DiscoveryUpstreamDO mapToDo(DiscoveryUpstreamData discoveryUpstreamData)
7878
public CommonUpstream mapToCommonUpstream(DiscoveryUpstreamData discoveryUpstreamData) {
7979
return Optional.ofNullable(discoveryUpstreamData).map(data -> {
8080
String url = data.getUrl();
81-
return new CommonUpstream(data.getProtocol(), url.split(":")[0], url, false, data.getDateCreated().getTime());
81+
CommonUpstream commonUpstream = new CommonUpstream(data.getProtocol(), url.split(":")[0], url, false,
82+
data.getDateCreated().getTime());
83+
Properties properties = Optional.ofNullable(data.getProps())
84+
.map(props -> GsonUtils.getInstance().fromJson(props, Properties.class))
85+
.orElse(new Properties());
86+
commonUpstream
87+
.setHealthCheckEnabled(Boolean.parseBoolean(properties.getProperty("healthCheckEnabled", "true")));
88+
return commonUpstream;
8289
}).orElse(null);
8390
}
8491

@@ -103,7 +110,6 @@ public DiscoveryUpstreamVO mapToVo(DiscoveryUpstreamDO discoveryUpstreamDO) {
103110
}).orElse(null);
104111
}
105112

106-
107113
public DiscoveryRelVO mapToVo(DiscoveryRelDO discoveryRelDO) {
108114
return Optional.ofNullable(discoveryRelDO).map(data -> {
109115
DiscoveryRelVO discoveryRelVO = new DiscoveryRelVO();
@@ -116,7 +122,6 @@ public DiscoveryRelVO mapToVo(DiscoveryRelDO discoveryRelDO) {
116122
}).orElse(null);
117123
}
118124

119-
120125
public DiscoveryRelDO mapToDO(DiscoveryRelDTO discoveryRelDTO) {
121126
return Optional.ofNullable(discoveryRelDTO).map(data -> {
122127
DiscoveryRelDO discoveryRelDO = new DiscoveryRelDO();
@@ -335,13 +340,38 @@ public DiscoveryUpstreamDTO mapToDTO(DiscoveryUpstreamDO discoveryUpstreamDO) {
335340
return discoveryUpstreamDTO;
336341
}).orElse(null);
337342
}
338-
343+
339344
/**
340345
* mapToDiscoveryUpstreamData.
346+
*
341347
* @param commonUpstream commonUpstream
342348
* @return DiscoveryUpstreamData
343349
*/
344350
public DiscoveryUpstreamData mapToDiscoveryUpstreamData(CommonUpstream commonUpstream) {
345-
return mapToData(CommonUpstreamUtils.buildDefaultDiscoveryUpstreamDTO(commonUpstream.getUpstreamUrl().split(":")[0], Integer.valueOf(commonUpstream.getUpstreamUrl().split(":")[1]), commonUpstream.getProtocol(),commonUpstream.getNamespaceId()));
351+
String upstreamUrl = commonUpstream.getUpstreamUrl();
352+
String[] parts = Optional.ofNullable(upstreamUrl)
353+
.map(url -> url.split(":", 2))
354+
.orElseThrow(() -> new IllegalArgumentException("Upstream URL must not be null"));
355+
if (parts.length < 2) {
356+
throw new IllegalArgumentException("Invalid upstream URL, expected 'host:port' format but was: " + upstreamUrl);
357+
}
358+
String host = parts[0];
359+
int port;
360+
try {
361+
port = Integer.parseInt(parts[1]);
362+
} catch (NumberFormatException ex) {
363+
throw new IllegalArgumentException("Invalid port in upstream URL: " + upstreamUrl, ex);
364+
}
365+
DiscoveryUpstreamDTO discoveryUpstreamDTO = CommonUpstreamUtils.buildDefaultDiscoveryUpstreamDTO(
366+
host,
367+
port,
368+
commonUpstream.getProtocol(),
369+
commonUpstream.getNamespaceId());
370+
Properties properties = Optional.ofNullable(discoveryUpstreamDTO.getProps())
371+
.map(props -> GsonUtils.getInstance().fromJson(props, Properties.class))
372+
.orElse(new Properties());
373+
properties.setProperty("healthCheckEnabled", String.valueOf(commonUpstream.isHealthCheckEnabled()));
374+
discoveryUpstreamDTO.setProps(GsonUtils.getInstance().toJson(properties));
375+
return mapToData(discoveryUpstreamDTO);
346376
}
347377
}

shenyu-admin/src/main/java/org/apache/shenyu/admin/utils/CommonUpstreamUtils.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -246,9 +246,13 @@ public static List<CommonUpstream> convertCommonUpstreamList(final List<? extend
246246
return Optional.ofNullable(upstreamList)
247247
.orElse(Collections.emptyList())
248248
.stream()
249-
.map(upstream -> new CommonUpstream(upstream.getProtocol(),
250-
upstream.getUpstreamHost(), upstream.getUpstreamUrl(),
251-
upstream.isStatus(), upstream.getTimestamp()))
249+
.map(upstream -> {
250+
CommonUpstream commonUpstream = new CommonUpstream(upstream.getProtocol(),
251+
upstream.getUpstreamHost(), upstream.getUpstreamUrl(),
252+
upstream.isStatus(), upstream.getTimestamp());
253+
commonUpstream.setHealthCheckEnabled(upstream.isHealthCheckEnabled());
254+
return commonUpstream;
255+
})
252256
.collect(Collectors.toList());
253257
}
254258

shenyu-admin/src/test/java/org/apache/shenyu/admin/service/UpstreamCheckServiceTest.java

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,4 +327,86 @@ private void setupUpstreamMap() {
327327
upstreamMap.put("UrlReachableAnother", Collections.singletonList(divideUpstream1));
328328
upstreamMap.put("UrlErrorAnother", Collections.singletonList(divideUpstream2));
329329
}
330+
331+
@Test
332+
public void testCheckAndSubmitWithHealthCheckDisabled() {
333+
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1,
334+
ShenyuThreadFactory.create("scheduled-upstream-task", false));
335+
ReflectionTestUtils.setField(upstreamCheckService, "executor", executor);
336+
337+
final DivideUpstream divideUpstream = DivideUpstream.builder()
338+
.upstreamUrl("unreachable-url:8080")
339+
.upstreamHost("unreachable-host")
340+
.healthCheckEnabled(false)
341+
.status(false)
342+
.build();
343+
344+
boolean result = upstreamCheckService.checkAndSubmit("testSelector", divideUpstream);
345+
346+
assertFalse(result);
347+
assertTrue(divideUpstream.isStatus());
348+
assertTrue(upstreamMap.containsKey("testSelector"));
349+
}
350+
351+
@Test
352+
public void testSubmitWithHealthCheckDisabled() {
353+
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1,
354+
ShenyuThreadFactory.create("scheduled-upstream-task", false));
355+
ReflectionTestUtils.setField(upstreamCheckService, "executor", executor);
356+
357+
final DivideUpstream divideUpstream = DivideUpstream.builder()
358+
.upstreamUrl("any-url:8080")
359+
.upstreamHost("any-host")
360+
.healthCheckEnabled(false)
361+
.status(true)
362+
.build();
363+
364+
upstreamCheckService.submit("testSelectorDisabled", divideUpstream);
365+
366+
assertTrue(upstreamMap.containsKey("testSelectorDisabled"));
367+
assertEquals(1, upstreamMap.get("testSelectorDisabled").size());
368+
}
369+
370+
@Test
371+
public void testHealthCheckEnabledDefaultsToTrue() {
372+
final DivideUpstream divideUpstream = DivideUpstream.builder()
373+
.upstreamUrl("test-url:8080")
374+
.upstreamHost("test-host")
375+
.build();
376+
377+
assertTrue(divideUpstream.isHealthCheckEnabled());
378+
}
379+
380+
@Test
381+
public void testSubmitSyncsHealthCheckEnabledForExistingUpstream() {
382+
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1,
383+
ShenyuThreadFactory.create("scheduled-upstream-task", false));
384+
ReflectionTestUtils.setField(upstreamCheckService, "executor", executor);
385+
386+
final String selectorId = "testSyncSelector";
387+
388+
final DivideUpstream upstream1 = DivideUpstream.builder()
389+
.upstreamUrl("sync-url:8080")
390+
.upstreamHost("sync-host")
391+
.healthCheckEnabled(true)
392+
.status(true)
393+
.build();
394+
upstreamCheckService.submit(selectorId, upstream1);
395+
396+
assertTrue(upstreamMap.containsKey(selectorId));
397+
assertEquals(1, upstreamMap.get(selectorId).size());
398+
assertTrue(upstreamMap.get(selectorId).get(0).isHealthCheckEnabled());
399+
400+
final DivideUpstream upstream2 = DivideUpstream.builder()
401+
.upstreamUrl("sync-url:8080")
402+
.upstreamHost("sync-host")
403+
.healthCheckEnabled(false)
404+
.status(true)
405+
.build();
406+
upstreamCheckService.submit(selectorId, upstream2);
407+
408+
assertEquals(1, upstreamMap.get(selectorId).size());
409+
assertFalse(upstreamMap.get(selectorId).get(0).isHealthCheckEnabled());
410+
assertTrue(upstreamMap.get(selectorId).get(0).isStatus());
411+
}
330412
}

shenyu-common/src/main/java/org/apache/shenyu/common/dto/convert/selector/CommonUpstream.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@ public class CommonUpstream {
5959
*/
6060
private boolean gray;
6161

62+
/**
63+
* health check enabled.
64+
*/
65+
private boolean healthCheckEnabled = true;
66+
6267
/**
6368
* Instantiates a new Common upstream.
6469
*/
@@ -209,6 +214,24 @@ public void setGray(final boolean gray) {
209214
this.gray = gray;
210215
}
211216

217+
/**
218+
* get healthCheckEnabled.
219+
*
220+
* @return healthCheckEnabled
221+
*/
222+
public boolean isHealthCheckEnabled() {
223+
return healthCheckEnabled;
224+
}
225+
226+
/**
227+
* set healthCheckEnabled.
228+
*
229+
* @param healthCheckEnabled healthCheckEnabled
230+
*/
231+
public void setHealthCheckEnabled(final boolean healthCheckEnabled) {
232+
this.healthCheckEnabled = healthCheckEnabled;
233+
}
234+
212235
/**
213236
* set namespaceId.
214237
*
@@ -231,12 +254,13 @@ public boolean equals(final Object o) {
231254
&& Objects.equals(protocol, that.protocol)
232255
&& Objects.equals(gray, that.gray)
233256
&& Objects.equals(upstreamUrl, that.upstreamUrl)
234-
&& Objects.equals(namespaceId, that.namespaceId);
257+
&& Objects.equals(namespaceId, that.namespaceId)
258+
&& Objects.equals(healthCheckEnabled, that.healthCheckEnabled);
235259
}
236260

237261
@Override
238262
public int hashCode() {
239-
return Objects.hash(upstreamHost, protocol, upstreamUrl, namespaceId, gray);
263+
return Objects.hash(upstreamHost, protocol, upstreamUrl, namespaceId, gray, healthCheckEnabled);
240264
}
241265

242266
@Override
@@ -259,6 +283,8 @@ public String toString() {
259283
+ namespaceId
260284
+ ", gray="
261285
+ gray
286+
+ ", healthCheckEnabled="
287+
+ healthCheckEnabled
262288
+ '}';
263289
}
264290
}

shenyu-common/src/main/java/org/apache/shenyu/common/dto/convert/selector/DivideUpstream.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ private DivideUpstream(final Builder builder) {
5858
setTimestamp(builder.timestamp);
5959
this.warmup = builder.warmup;
6060
setNamespaceId(builder.namespaceId);
61+
setHealthCheckEnabled(builder.healthCheckEnabled);
6162
}
6263

6364
/**
@@ -200,6 +201,11 @@ public static final class Builder {
200201
*/
201202
private String namespaceId;
202203

204+
/**
205+
* healthCheckEnabled.
206+
*/
207+
private boolean healthCheckEnabled = true;
208+
203209
/**
204210
* no args constructor.
205211
*/
@@ -303,5 +309,16 @@ public Builder namespaceId(final String namespaceId) {
303309
this.namespaceId = namespaceId;
304310
return this;
305311
}
312+
313+
/**
314+
* build healthCheckEnabled.
315+
*
316+
* @param healthCheckEnabled healthCheckEnabled
317+
* @return this
318+
*/
319+
public Builder healthCheckEnabled(final boolean healthCheckEnabled) {
320+
this.healthCheckEnabled = healthCheckEnabled;
321+
return this;
322+
}
306323
}
307324
}

shenyu-common/src/main/java/org/apache/shenyu/common/dto/convert/selector/GrpcUpstream.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ private GrpcUpstream(final Builder builder) {
4646
setStatus(statusValue);
4747
setTimestamp(builder.timestamp);
4848
setNamespaceId(builder.namespaceId);
49+
setHealthCheckEnabled(builder.healthCheckEnabled);
4950
}
5051

5152
/**
@@ -163,6 +164,11 @@ public static final class Builder {
163164
*/
164165
private String namespaceId;
165166

167+
/**
168+
* health check enabled.
169+
*/
170+
private boolean healthCheckEnabled = true;
171+
166172
/**
167173
* no args constructor.
168174
*/
@@ -255,5 +261,16 @@ public Builder namespaceId(final String namespaceId) {
255261
this.namespaceId = namespaceId;
256262
return this;
257263
}
264+
265+
/**
266+
* build healthCheckEnabled.
267+
*
268+
* @param healthCheckEnabled healthCheckEnabled
269+
* @return this
270+
*/
271+
public Builder healthCheckEnabled(final boolean healthCheckEnabled) {
272+
this.healthCheckEnabled = healthCheckEnabled;
273+
return this;
274+
}
258275
}
259276
}

shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCacheManager.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,12 @@ public void removeByKey(final String key) {
146146
*/
147147
public void submit(final String selectorId, final List<Upstream> upstreamList) {
148148
List<Upstream> actualUpstreamList = Objects.isNull(upstreamList) ? Lists.newArrayList() : upstreamList;
149+
actualUpstreamList.forEach(upstream -> {
150+
if (!upstream.isHealthCheckEnabled()) {
151+
upstream.setStatus(true);
152+
upstream.setHealthy(true);
153+
}
154+
});
149155
Map<Boolean, List<Upstream>> partitionedUpstreams = actualUpstreamList.stream()
150156
.collect(Collectors.partitioningBy(Upstream::isStatus));
151157
List<Upstream> validUpstreamList = partitionedUpstreams.get(true);
@@ -173,6 +179,10 @@ public void submit(final String selectorId, final List<Upstream> upstreamList) {
173179
Upstream matchedExistUp = existUpstreamMap.get(key);
174180
if (Objects.nonNull(matchedExistUp)) {
175181
matchedExistUp.setWeight(validUp.getWeight());
182+
matchedExistUp.setHealthCheckEnabled(validUp.isHealthCheckEnabled());
183+
if (!matchedExistUp.isHealthCheckEnabled()) {
184+
matchedExistUp.setHealthy(true);
185+
}
176186
}
177187
});
178188

0 commit comments

Comments
 (0)