4
4
#include " envoy/config/core/v3/base.pb.h"
5
5
#include " envoy/extensions/clusters/dynamic_forward_proxy/v3/cluster.pb.h"
6
6
#include " envoy/extensions/filters/http/dynamic_forward_proxy/v3/dynamic_forward_proxy.pb.h"
7
+ #include " envoy/router/string_accessor.h"
8
+ #include " envoy/stream_info/uint32_accessor.h"
7
9
8
10
#include " source/common/http/utility.h"
9
11
#include " source/common/network/filter_state_proxy_info.h"
@@ -24,7 +26,33 @@ void latchTime(Http::StreamDecoderFilterCallbacks* decoder_callbacks, absl::stri
24
26
downstream_timing.setValue (key, decoder_callbacks->dispatcher ().timeSource ().monotonicTime ());
25
27
}
26
28
29
+ // Helper function to apply filter state overrides to host and port.
30
+ // Conditionally checks filter state based on the allow_dynamic_host_from_filter_state flag.
31
+ void applyFilterStateOverrides (absl::string_view& host, uint32_t & port,
32
+ Http::StreamDecoderFilterCallbacks* decoder_callbacks,
33
+ bool allow_dynamic_host_from_filter_state) {
34
+ if (!allow_dynamic_host_from_filter_state) {
35
+ return ;
36
+ }
37
+
38
+ const Router::StringAccessor* dynamic_host_filter_state =
39
+ decoder_callbacks->streamInfo ().filterState ()->getDataReadOnly <Router::StringAccessor>(
40
+ " envoy.upstream.dynamic_host" );
41
+ if (dynamic_host_filter_state) {
42
+ host = dynamic_host_filter_state->asString ();
43
+ }
44
+
45
+ const StreamInfo::UInt32Accessor* dynamic_port_filter_state =
46
+ decoder_callbacks->streamInfo ().filterState ()->getDataReadOnly <StreamInfo::UInt32Accessor>(
47
+ " envoy.upstream.dynamic_port" );
48
+ if (dynamic_port_filter_state != nullptr && dynamic_port_filter_state->value () > 0 &&
49
+ dynamic_port_filter_state->value () <= 65535 ) {
50
+ port = dynamic_port_filter_state->value ();
51
+ }
52
+ }
53
+
27
54
} // namespace
55
+
28
56
struct ResponseStringValues {
29
57
const std::string DnsCacheOverflow = " DNS cache overflow" ;
30
58
const std::string PendingRequestOverflow = " Dynamic forward proxy pending request overflow" ;
@@ -64,7 +92,8 @@ ProxyFilterConfig::ProxyFilterConfig(
64
92
tls_slot_(context.serverFactoryContext().threadLocal()),
65
93
cluster_init_timeout_(PROTOBUF_GET_MS_OR_DEFAULT(proto_config.sub_cluster_config(),
66
94
cluster_init_timeout, 5000)),
67
- save_upstream_address_(proto_config.save_upstream_address()) {
95
+ save_upstream_address_(proto_config.save_upstream_address()),
96
+ allow_dynamic_host_from_filter_state_(proto_config.allow_dynamic_host_from_filter_state()) {
68
97
tls_slot_.set (
69
98
[&](Event::Dispatcher&) { return std::make_shared<ThreadLocalClusterInfo>(*this ); });
70
99
}
@@ -115,16 +144,15 @@ LoadClusterEntryHandlePtr ProxyFilterConfig::addDynamicCluster(
115
144
116
145
ProxyFilterConfig::ThreadLocalClusterInfo::~ThreadLocalClusterInfo () {
117
146
for (const auto & it : pending_clusters_) {
118
- for (auto cluster : it.second ) {
147
+ for (const auto cluster : it.second ) {
119
148
cluster->cancel ();
120
149
}
121
150
}
122
151
}
123
152
void ProxyFilterConfig::ThreadLocalClusterInfo::onClusterAddOrUpdate (
124
153
absl::string_view cluster_name, Upstream::ThreadLocalClusterCommand&) {
125
154
ENVOY_LOG (debug, " thread local cluster {} added or updated" , cluster_name);
126
- auto it = pending_clusters_.find (cluster_name);
127
- if (it != pending_clusters_.end ()) {
155
+ if (const auto it = pending_clusters_.find (cluster_name); it != pending_clusters_.end ()) {
128
156
for (auto * cluster : it->second ) {
129
157
auto & callbacks = cluster->callbacks_ ;
130
158
cluster->cancel ();
@@ -170,8 +198,8 @@ bool ProxyFilter::isProxying() {
170
198
171
199
Http::FilterHeadersStatus ProxyFilter::decodeHeaders (Http::RequestHeaderMap& headers, bool ) {
172
200
Router::RouteConstSharedPtr route = decoder_callbacks_->route ();
173
- const Router::RouteEntry* route_entry;
174
- if (!route || !( route_entry = route-> routeEntry ()) ) {
201
+ const Router::RouteEntry* route_entry = route ? route-> routeEntry () : nullptr ;
202
+ if (!route_entry) {
175
203
return Http::FilterHeadersStatus::Continue;
176
204
}
177
205
@@ -288,20 +316,32 @@ Http::FilterHeadersStatus ProxyFilter::decodeHeaders(Http::RequestHeaderMap& hea
288
316
return Http::FilterHeadersStatus::StopIteration;
289
317
}
290
318
}
291
- auto result = config_->cache ().loadDnsCacheEntryWithForceRefresh (
292
- headers.Host ()->value ().getStringView (), default_port, is_proxying, force_cache_refresh,
293
- *this );
294
- cache_load_handle_ = std::move (result.handle_ );
319
+
320
+ // Get host value from the request headers.
321
+ const auto host_attributes =
322
+ Http::Utility::parseAuthority (headers.Host ()->value ().getStringView ());
323
+ absl::string_view host = host_attributes.host_ ;
324
+ uint16_t port = host_attributes.port_ .value_or (default_port);
325
+
326
+ // Apply filter state overrides for host and port.
327
+ uint32_t port_u32 = port;
328
+ applyFilterStateOverrides (host, port_u32, decoder_callbacks_,
329
+ config_->allowDynamicHostFromFilterState ());
330
+ port = port_u32;
331
+
332
+ auto [status_, handle_, host_info_] = config_->cache ().loadDnsCacheEntryWithForceRefresh (
333
+ host, port, is_proxying, force_cache_refresh, *this );
334
+ cache_load_handle_ = std::move (handle_);
295
335
if (cache_load_handle_ == nullptr ) {
296
336
circuit_breaker_.reset ();
297
337
}
298
338
299
- switch (result. status_ ) {
339
+ switch (status_) {
300
340
case LoadDnsCacheEntryStatus::InCache: {
301
341
ASSERT (cache_load_handle_ == nullptr );
302
342
ENVOY_STREAM_LOG (debug, " DNS cache entry already loaded, continuing" , *decoder_callbacks_);
303
343
304
- auto const & host = result. host_info_ ;
344
+ auto const & host = host_info_;
305
345
latchTime (decoder_callbacks_, DNS_END);
306
346
if (is_proxying) {
307
347
ENVOY_BUG (host.has_value (), " Proxying request but no host entry in DNS cache." );
@@ -330,18 +370,28 @@ Http::FilterHeadersStatus ProxyFilter::decodeHeaders(Http::RequestHeaderMap& hea
330
370
PANIC_DUE_TO_CORRUPT_ENUM;
331
371
}
332
372
333
- Http::FilterHeadersStatus ProxyFilter::loadDynamicCluster (
334
- Extensions::Common::DynamicForwardProxy::DfpClusterSharedPtr cluster,
335
- Http::RequestHeaderMap& headers, uint16_t default_port) {
373
+ Http::FilterHeadersStatus
374
+ ProxyFilter::loadDynamicCluster (const Common::DynamicForwardProxy::DfpClusterSharedPtr& cluster,
375
+ const Http::RequestHeaderMap& headers, uint16_t default_port) {
376
+
377
+ // Parse host and port from headers.
336
378
const auto host_attributes = Http::Utility::parseAuthority (headers.getHostValue ());
337
379
auto host = std::string (host_attributes.host_ );
338
380
auto port = host_attributes.port_ .value_or (default_port);
339
381
382
+ // Apply filter state overrides using the helper function.
383
+ absl::string_view host_view = host; // Create string_view for the helper.
384
+ uint32_t port_u32 = port;
385
+ applyFilterStateOverrides (host_view, port_u32, decoder_callbacks_,
386
+ config_->allowDynamicHostFromFilterState ());
387
+ host = std::string (host_view); // Convert back to string.
388
+ port = port_u32;
389
+
340
390
latchTime (decoder_callbacks_, DNS_START);
341
391
342
392
// cluster name is prefix + host + port
343
- auto cluster_name = " DFPCluster:" + host + " :" + std::to_string (port);
344
- Upstream::ThreadLocalCluster* local_cluster =
393
+ const auto cluster_name = " DFPCluster:" + host + " :" + std::to_string (port);
394
+ const Upstream::ThreadLocalCluster* local_cluster =
345
395
config_->clusterManager ().getThreadLocalCluster (cluster_name);
346
396
if (local_cluster && cluster->touch (cluster_name)) {
347
397
ENVOY_STREAM_LOG (debug, " using the thread local cluster after touch success" ,
@@ -352,7 +402,7 @@ Http::FilterHeadersStatus ProxyFilter::loadDynamicCluster(
352
402
353
403
// Still need to add dynamic cluster again even the thread local cluster exists while touch
354
404
// failed, that means the cluster is removed in main thread due to ttl reached.
355
- // Otherwise, we may not be able to get the thread local cluster in router.
405
+ // Otherwise, we may not be able to get the thread local cluster in the router.
356
406
357
407
// Create a new cluster & register a callback to tls
358
408
cluster_load_handle_ = config_->addDynamicCluster (cluster, cluster_name, host, port, *this );
0 commit comments