Skip to content

Commit b64afb5

Browse files
authored
Introduce the SnapshotStream interface (#6610)
## Motivation This PR refactors the xDS client from a resource-node hierarchy to a stream-based reactive design. The goal is to make snapshot composition and lifecycle management more explicit and composable, while simplifying error propagation and load balancer orchestration. ## Modifications There are two main changes: * **Introduce a reactive snapshot stream core** * Add `SnapshotStream` and `RefCountedStream` with map/switchMap/combine-style operators, along with a static stream implementation. * Add stream-based implementations for cluster/endpoint/listener/route snapshot composition. * Remove legacy resource-node classes and migrate their responsibilities into streams. * Note: Metrics are now recorded only for resources fetched from a config source. This also aligns better with upstream behavior. * **`Snapshot`-based `XdsLoadBalancer` lifecycle refactoring** * Introduce `LoadBalancerFactoryPool` and a factory API for creating/updating load balancers with delayed-close semantics. * This preserves `XdsLoadBalancer` state across cluster updates (e.g., health / ramp-up status). * Treat `XdsLoadBalancer` as an immutable snapshot value that is propagated on endpoint/cluster changes. * For example, an endpoint health update produces a new `XdsLoadBalancer` snapshot and propagates it to the `SnapshotWatcher`. * The `XdsLoadBalancer` API is now exposed directly, so asynchronous selection is no longer necessary. * **Misc** * Change `SnapshotWatcher#onUpdate` to accept a `Throwable` to support error propagation from `SnapshotStream` implementations (e.g., `MapStream`). ## Result * The xDS client is now based on a stream-oriented snapshot pipeline, improving composability and making lifecycle/resource handling more explicit. * Easier stream pipeline composition, paving the way for further implementations (such as SDS) with minimal complexity.
1 parent 9899ba7 commit b64afb5

File tree

84 files changed

+3161
-2007
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

84 files changed

+3161
-2007
lines changed

it/xds-client/src/test/java/com/linecorp/armeria/xds/it/BootstrapTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import com.linecorp.armeria.xds.ClusterSnapshot;
3636
import com.linecorp.armeria.xds.SnapshotWatcher;
3737
import com.linecorp.armeria.xds.XdsBootstrap;
38-
import com.linecorp.armeria.xds.XdsResourceException;
3938

4039
import io.envoyproxy.controlplane.cache.v3.SimpleCache;
4140
import io.envoyproxy.controlplane.cache.v3.Snapshot;
@@ -122,7 +121,7 @@ void secondaryInitTest() {
122121
final AtomicReference<Object> objRef = new AtomicReference<>();
123122
final SnapshotWatcher<Object> watcher = new SnapshotWatcher<>() {
124123
@Override
125-
public void onUpdate(@Nullable Object snapshot, @Nullable XdsResourceException t) {
124+
public void onUpdate(@Nullable Object snapshot, @Nullable Throwable t) {
126125
if (snapshot != null) {
127126
objRef.set(snapshot);
128127
}

it/xds-client/src/test/java/com/linecorp/armeria/xds/it/ErrorHandlingTest.java

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@
5252
import io.envoyproxy.envoy.config.listener.v3.Listener;
5353
import io.envoyproxy.envoy.config.route.v3.RouteConfiguration;
5454
import io.envoyproxy.pgv.ValidationException;
55-
import io.grpc.StatusException;
5655

5756
class ErrorHandlingTest {
5857

@@ -168,7 +167,7 @@ void rootFailure(Consumer<XdsBootstrap> rootGenFn, XdsType type, String expected
168167
final var watcher = new SnapshotWatcher<>() {
169168

170169
@Override
171-
public void onUpdate(@Nullable Object snapshot, @Nullable XdsResourceException t) {
170+
public void onUpdate(@Nullable Object snapshot, @Nullable Throwable t) {
172171
if (t != null) {
173172
errorRef.set(t);
174173
}
@@ -186,7 +185,7 @@ public void onUpdate(@Nullable Object snapshot, @Nullable XdsResourceException t
186185
assertThat(xdsResourceException)
187186
.cause()
188187
.isInstanceOf(IllegalArgumentException.class)
189-
.hasMessageContaining("Unsupported");
188+
.hasMessage("config source not found");
190189
}
191190
}
192191

@@ -258,8 +257,8 @@ public void onUpdate(@Nullable Object snapshot, @Nullable XdsResourceException t
258257
static Stream<Arguments> discoveryFailure_args() {
259258
final Consumer<XdsBootstrap> clusterGen = xdsBootstrap -> xdsBootstrap.clusterRoot("my-cluster");
260259
final Consumer<XdsBootstrap> listenerGen = xdsBootstrap -> xdsBootstrap.listenerRoot("my-listener");
261-
return Stream.of(Arguments.of(clusterGen, XdsType.CLUSTER, "my-cluster"),
262-
Arguments.of(listenerGen, XdsType.LISTENER, "my-listener"));
260+
return Stream.of(Arguments.of(clusterGen, XdsType.ENDPOINT, "unknown-cluster"),
261+
Arguments.of(listenerGen, XdsType.ROUTE, "unknown-route"));
263262
}
264263

265264
@ParameterizedTest
@@ -280,7 +279,7 @@ void discoveryFailure(Consumer<XdsBootstrap> rootGenFn, XdsType type,
280279
final var watcher = new SnapshotWatcher<>() {
281280

282281
@Override
283-
public void onUpdate(@Nullable Object snapshot, @Nullable XdsResourceException t) {
282+
public void onUpdate(@Nullable Object snapshot, @Nullable Throwable t) {
284283
if (t != null) {
285284
errorRef.set(t);
286285
}
@@ -534,7 +533,7 @@ void listenerRootDynamicResourceValidationFailure(Snapshot snapshot, XdsType typ
534533
final var watcher = new SnapshotWatcher<>() {
535534

536535
@Override
537-
public void onUpdate(@Nullable Object snapshot, @Nullable XdsResourceException t) {
536+
public void onUpdate(@Nullable Object snapshot, @Nullable Throwable t) {
538537
if (t != null) {
539538
errorRef.set(t);
540539
}
@@ -550,8 +549,6 @@ public void onUpdate(@Nullable Object snapshot, @Nullable XdsResourceException t
550549
assertThat(xdsResourceException.type()).isEqualTo(type);
551550
assertThat(xdsResourceException.name()).isEqualTo(name);
552551
assertThat(xdsResourceException).cause()
553-
.isInstanceOf(StatusException.class)
554-
.cause()
555552
.isInstanceOf(IllegalArgumentException.class)
556553
.cause()
557554
.isInstanceOf(ValidationException.class)
@@ -594,7 +591,7 @@ void clusterRootDynamicResourceValidationFailure(Snapshot snapshot, XdsType type
594591
final var watcher = new SnapshotWatcher<>() {
595592

596593
@Override
597-
public void onUpdate(@Nullable Object snapshot, @Nullable XdsResourceException t) {
594+
public void onUpdate(@Nullable Object snapshot, @Nullable Throwable t) {
598595
if (t != null) {
599596
errorRef.set(t);
600597
}
@@ -610,8 +607,6 @@ public void onUpdate(@Nullable Object snapshot, @Nullable XdsResourceException t
610607
assertThat(xdsResourceException.type()).isEqualTo(type);
611608
assertThat(xdsResourceException.name()).isEqualTo(name);
612609
assertThat(xdsResourceException).cause()
613-
.isInstanceOf(StatusException.class)
614-
.cause()
615610
.isInstanceOf(IllegalArgumentException.class)
616611
.cause()
617612
.isInstanceOf(ValidationException.class)
Lines changed: 269 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
/*
2+
* Copyright 2024 LINE Corporation
3+
*
4+
* LINE Corporation licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
17+
package com.linecorp.armeria.xds.it;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.awaitility.Awaitility.await;
21+
22+
import java.util.List;
23+
import java.util.concurrent.atomic.AtomicReference;
24+
import java.util.stream.Collectors;
25+
26+
import org.junit.jupiter.api.Test;
27+
import org.junit.jupiter.api.extension.RegisterExtension;
28+
29+
import com.linecorp.armeria.client.ClientRequestContext;
30+
import com.linecorp.armeria.client.Endpoint;
31+
import com.linecorp.armeria.common.HttpMethod;
32+
import com.linecorp.armeria.common.HttpRequest;
33+
import com.linecorp.armeria.common.HttpResponse;
34+
import com.linecorp.armeria.server.ServerBuilder;
35+
import com.linecorp.armeria.server.healthcheck.HealthCheckService;
36+
import com.linecorp.armeria.testing.junit5.server.ServerExtension;
37+
import com.linecorp.armeria.xds.ClusterSnapshot;
38+
import com.linecorp.armeria.xds.ListenerRoot;
39+
import com.linecorp.armeria.xds.ListenerSnapshot;
40+
import com.linecorp.armeria.xds.SnapshotWatcher;
41+
import com.linecorp.armeria.xds.XdsBootstrap;
42+
import com.linecorp.armeria.xds.client.endpoint.XdsLoadBalancer;
43+
44+
import io.envoyproxy.envoy.config.bootstrap.v3.Bootstrap;
45+
46+
class HealthCheckedTest {
47+
48+
@RegisterExtension
49+
static ServerExtension server = new ServerExtension() {
50+
@Override
51+
protected void configure(ServerBuilder sb) throws Exception {
52+
sb.http(0);
53+
sb.http(0);
54+
sb.http(0);
55+
sb.service("/", (ctx, req) -> HttpResponse.of(200));
56+
sb.service("/monitor/healthcheck", HealthCheckService.builder().build());
57+
}
58+
};
59+
60+
@RegisterExtension
61+
static ServerExtension noHealthCheck = new ServerExtension() {
62+
@Override
63+
protected void configure(ServerBuilder sb) throws Exception {
64+
sb.http(0);
65+
sb.http(0);
66+
sb.http(0);
67+
sb.service("/", (ctx, req) -> HttpResponse.of(200));
68+
}
69+
};
70+
71+
private static XdsLoadBalancer pollLoadBalancer(ListenerRoot root, String clusterName) {
72+
final AtomicReference<XdsLoadBalancer> lbRef = new AtomicReference<>();
73+
final SnapshotWatcher<ListenerSnapshot> watcher = (newSnapshot, t) -> {
74+
final ClusterSnapshot clusterSnapshot =
75+
newSnapshot.routeSnapshot().virtualHostSnapshots().get(0).routeEntries().get(0)
76+
.clusterSnapshot();
77+
if (clusterSnapshot != null && clusterName.equals(clusterSnapshot.xdsResource().name())) {
78+
lbRef.set(clusterSnapshot.loadBalancer());
79+
}
80+
};
81+
root.addSnapshotWatcher(watcher);
82+
await().untilAsserted(() -> assertThat(lbRef.get()).isNotNull());
83+
return lbRef.get();
84+
}
85+
86+
private static List<Integer> ports(ServerExtension server) {
87+
return server.server().activePorts().keySet().stream()
88+
.map(addr -> addr.getPort()).sorted().collect(Collectors.toList());
89+
}
90+
91+
@Test
92+
void basicCase() {
93+
final List<Integer> healthyPorts = ports(server);
94+
final List<Integer> noHcPorts = ports(noHealthCheck);
95+
final int port1 = healthyPorts.get(0);
96+
final int port2 = healthyPorts.get(1);
97+
final int port3 = healthyPorts.get(2);
98+
final int noHcPort1 = noHcPorts.get(0);
99+
final int noHcPort2 = noHcPorts.get(1);
100+
final int noHcPort3 = noHcPorts.get(2);
101+
102+
//language=YAML
103+
final String bootstrapYaml =
104+
"""
105+
static_resources:
106+
listeners:
107+
- name: listener
108+
api_listener:
109+
api_listener:
110+
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager\
111+
.v3.HttpConnectionManager
112+
stat_prefix: http
113+
route_config:
114+
name: local_route
115+
virtual_hosts:
116+
- name: local_service
117+
domains: ["*"]
118+
routes:
119+
- match:
120+
prefix: /
121+
route:
122+
cluster: cluster
123+
http_filters:
124+
- name: envoy.filters.http.router
125+
typed_config:
126+
"@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
127+
clusters:
128+
- name: cluster
129+
type: STATIC
130+
load_assignment:
131+
cluster_name: cluster
132+
endpoints:
133+
- lb_endpoints:
134+
- endpoint:
135+
address:
136+
socket_address:
137+
address: 127.0.0.1
138+
port_value: %d
139+
- endpoint:
140+
address:
141+
socket_address:
142+
address: 127.0.0.1
143+
port_value: %d
144+
- endpoint:
145+
address:
146+
socket_address:
147+
address: 127.0.0.1
148+
port_value: %d
149+
- endpoint:
150+
address:
151+
socket_address:
152+
address: 127.0.0.1
153+
port_value: %d
154+
- endpoint:
155+
address:
156+
socket_address:
157+
address: 127.0.0.1
158+
port_value: %d
159+
- endpoint:
160+
address:
161+
socket_address:
162+
address: 127.0.0.1
163+
port_value: %d
164+
health_checks:
165+
- http_health_check:
166+
path: /monitor/healthcheck
167+
timeout: 5s
168+
interval: 10s
169+
unhealthy_threshold: 3
170+
healthy_threshold: 2
171+
""".formatted(port1, port2, port3, noHcPort1, noHcPort2, noHcPort3);
172+
173+
final Bootstrap bootstrap = XdsResourceReader.fromYaml(bootstrapYaml);
174+
try (XdsBootstrap xdsBootstrap = XdsBootstrap.of(bootstrap);
175+
ListenerRoot root = xdsBootstrap.listenerRoot("listener")) {
176+
final XdsLoadBalancer loadBalancer = pollLoadBalancer(root, "cluster");
177+
assertThat(loadBalancer.hostSets().get(0).healthyHostsEndpointGroup().endpoints()
178+
.stream().map(Endpoint::port).collect(Collectors.toSet()))
179+
.containsExactlyInAnyOrder(port1, port2, port3);
180+
181+
final ClientRequestContext ctx =
182+
ClientRequestContext.of(HttpRequest.of(HttpMethod.GET, "/"));
183+
final Endpoint endpoint = loadBalancer.selectNow(ctx);
184+
assertThat(endpoint).isNotNull();
185+
assertThat(endpoint.port()).isIn(port1, port2, port3);
186+
}
187+
}
188+
189+
@Test
190+
void allUnhealthy() {
191+
final List<Integer> noHcPorts = ports(noHealthCheck);
192+
final int noHcPort1 = noHcPorts.get(0);
193+
final int noHcPort2 = noHcPorts.get(1);
194+
final int noHcPort3 = noHcPorts.get(2);
195+
196+
//language=YAML
197+
final String bootstrapYaml =
198+
"""
199+
static_resources:
200+
listeners:
201+
- name: listener
202+
api_listener:
203+
api_listener:
204+
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager\
205+
.v3.HttpConnectionManager
206+
stat_prefix: http
207+
route_config:
208+
name: local_route
209+
virtual_hosts:
210+
- name: local_service
211+
domains: ["*"]
212+
routes:
213+
- match:
214+
prefix: /
215+
route:
216+
cluster: cluster
217+
http_filters:
218+
- name: envoy.filters.http.router
219+
typed_config:
220+
"@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
221+
clusters:
222+
- name: cluster
223+
type: STATIC
224+
load_assignment:
225+
cluster_name: cluster
226+
endpoints:
227+
- lb_endpoints:
228+
- endpoint:
229+
address:
230+
socket_address:
231+
address: 127.0.0.1
232+
port_value: %d
233+
load_balancing_weight: 1
234+
- endpoint:
235+
address:
236+
socket_address:
237+
address: 127.0.0.1
238+
port_value: %d
239+
load_balancing_weight: 1
240+
- endpoint:
241+
address:
242+
socket_address:
243+
address: 127.0.0.1
244+
port_value: %d
245+
load_balancing_weight: 1
246+
health_checks:
247+
- http_health_check:
248+
path: /monitor/healthcheck
249+
timeout: 5s
250+
interval: 10s
251+
unhealthy_threshold: 1
252+
healthy_threshold: 1
253+
""".formatted(noHcPort1, noHcPort2, noHcPort3);
254+
255+
final Bootstrap bootstrap = XdsResourceReader.fromYaml(bootstrapYaml);
256+
try (XdsBootstrap xdsBootstrap = XdsBootstrap.of(bootstrap);
257+
ListenerRoot root = xdsBootstrap.listenerRoot("listener")) {
258+
final XdsLoadBalancer loadBalancer = pollLoadBalancer(root, "cluster");
259+
assertThat(loadBalancer.hostSets().get(0).healthyHostsEndpointGroup().endpoints()).isEmpty();
260+
261+
final ClientRequestContext ctx =
262+
ClientRequestContext.of(HttpRequest.of(HttpMethod.GET, "/"));
263+
final Endpoint endpoint = loadBalancer.selectNow(ctx);
264+
// although all unhealthy, an endpoint is still selected
265+
assertThat(endpoint).isNotNull();
266+
assertThat(endpoint.port()).isIn(noHcPort1, noHcPort2, noHcPort3);
267+
}
268+
}
269+
}

0 commit comments

Comments
 (0)