Skip to content

Commit d51c186

Browse files
authored
dynamic_modules: added an E2E integration test for clusters (#43890)
## Description This PR adds an E2E integration test for clusters Dynamic Modules. --- **Commit Message:** dynamic_modules: added an E2E integration test for clusters **Additional Description:** Added an E2E integration test for clusters Dynamic Modules. **Risk Level:** Low **Testing:** CI **Docs Changes:** N/A **Release Notes:** N/A Signed-off-by: Rohit Agrawal <rohit.agrawal@databricks.com>
1 parent 2bb4614 commit d51c186

File tree

8 files changed

+618
-45
lines changed

8 files changed

+618
-45
lines changed

source/extensions/clusters/dynamic_modules/abi_impl.cc

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1078,19 +1078,42 @@ void envoy_dynamic_module_callback_cluster_lb_async_host_selection_complete(
10781078
envoy_dynamic_module_type_cluster_host_envoy_ptr host,
10791079
envoy_dynamic_module_type_module_buffer details) {
10801080
auto* lb = getLb(lb_envoy_ptr);
1081-
auto* context = getContext(context_envoy_ptr);
1082-
1083-
Envoy::Upstream::HostConstSharedPtr host_shared;
1084-
if (host != nullptr) {
1085-
host_shared = lb->handle()->cluster()->findHost(host);
1086-
}
10871081

1082+
// Copy the details string on the calling thread. The pointer is not valid after we return.
10881083
std::string details_str;
10891084
if (details.ptr != nullptr && details.length > 0) {
10901085
details_str.assign(details.ptr, details.length);
10911086
}
10921087

1093-
context->onAsyncHostSelection(std::move(host_shared), std::move(details_str));
1088+
auto cancelled = lb->activeAsyncCancelled();
1089+
auto* dispatcher = lb->activeAsyncDispatcher();
1090+
1091+
if (dispatcher != nullptr) {
1092+
// Post all work to the worker thread. The host lookup and context access must happen
1093+
// on the worker thread because the module may call this from a background thread.
1094+
// Keep the cluster alive via the handle's shared_ptr until the callback fires.
1095+
auto handle = lb->handle();
1096+
dispatcher->post([context_envoy_ptr, host, details_str = std::move(details_str),
1097+
cancelled = std::move(cancelled), handle = std::move(handle)]() {
1098+
if (cancelled != nullptr && cancelled->load(std::memory_order_acquire)) {
1099+
return;
1100+
}
1101+
auto* context = getContext(context_envoy_ptr);
1102+
Envoy::Upstream::HostConstSharedPtr host_shared;
1103+
if (host != nullptr) {
1104+
host_shared = handle->cluster()->findHost(host);
1105+
}
1106+
context->onAsyncHostSelection(std::move(host_shared), std::string(details_str));
1107+
});
1108+
} else {
1109+
// No worker dispatcher. Complete inline on the calling thread.
1110+
auto* context = getContext(context_envoy_ptr);
1111+
Envoy::Upstream::HostConstSharedPtr host_shared;
1112+
if (host != nullptr) {
1113+
host_shared = lb->handle()->cluster()->findHost(host);
1114+
}
1115+
context->onAsyncHostSelection(std::move(host_shared), std::move(details_str));
1116+
}
10941117
}
10951118

10961119
// =============================================================================

source/extensions/clusters/dynamic_modules/cluster.cc

Lines changed: 34 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include "envoy/config/core/v3/base.pb.h"
44
#include "envoy/config/endpoint/v3/endpoint_components.pb.h"
5+
#include "envoy/network/connection.h"
56
#include "envoy/network/drain_decision.h"
67
#include "envoy/upstream/locality.h"
78

@@ -10,12 +11,9 @@
1011
#include "source/common/network/utility.h"
1112
#include "source/common/protobuf/protobuf.h"
1213
#include "source/common/protobuf/utility.h"
13-
#include "source/common/runtime/runtime_features.h"
1414
#include "source/common/upstream/upstream_impl.h"
1515
#include "source/extensions/dynamic_modules/dynamic_modules.h"
1616

17-
#include "absl/strings/str_cat.h"
18-
1917
namespace Envoy {
2018
namespace Extensions {
2119
namespace Clusters {
@@ -57,10 +55,9 @@ struct DynamicModuleThreadAwareLoadBalancer : public Upstream::ThreadAwareLoadBa
5755

5856
absl::StatusOr<std::shared_ptr<DynamicModuleClusterConfig>> DynamicModuleClusterConfig::create(
5957
const std::string& cluster_name, const std::string& cluster_config,
60-
const std::string& metrics_namespace,
6158
Envoy::Extensions::DynamicModules::DynamicModulePtr module, Stats::Scope& stats_scope) {
62-
auto config = std::shared_ptr<DynamicModuleClusterConfig>(new DynamicModuleClusterConfig(
63-
cluster_name, cluster_config, metrics_namespace, std::move(module), stats_scope));
59+
auto config = std::shared_ptr<DynamicModuleClusterConfig>(
60+
new DynamicModuleClusterConfig(cluster_name, cluster_config, std::move(module), stats_scope));
6461

6562
// Resolve all required function pointers from the dynamic module.
6663
#define RESOLVE_SYMBOL(name, type, member) \
@@ -142,9 +139,8 @@ absl::StatusOr<std::shared_ptr<DynamicModuleClusterConfig>> DynamicModuleCluster
142139

143140
DynamicModuleClusterConfig::DynamicModuleClusterConfig(
144141
const std::string& cluster_name, const std::string& cluster_config,
145-
const std::string& metrics_namespace,
146142
Envoy::Extensions::DynamicModules::DynamicModulePtr module, Stats::Scope& stats_scope)
147-
: stats_scope_(stats_scope.createScope(absl::StrCat(metrics_namespace, "."))),
143+
: stats_scope_(stats_scope.createScope("dynamicmodulescustom.")),
148144
stat_name_pool_(stats_scope_->symbolTable()), cluster_name_(cluster_name),
149145
cluster_config_(cluster_config), dynamic_module_(std::move(module)) {}
150146

@@ -161,6 +157,12 @@ DynamicModuleClusterConfig::~DynamicModuleClusterConfig() {
161157
DynamicModuleClusterHandle::~DynamicModuleClusterHandle() {
162158
std::shared_ptr<DynamicModuleCluster> cluster = std::move(cluster_);
163159
cluster_.reset();
160+
// Release lifecycle handles eagerly while the lifecycle notifier is still valid. When the
161+
// dispatcher destructor clears pending callbacks, the cluster destructor would otherwise try to
162+
// unregister from already-destroyed lifecycle notifier lists.
163+
cluster->server_initialized_handle_.reset();
164+
cluster->shutdown_handle_.reset();
165+
cluster->drain_handle_.reset();
164166
Event::Dispatcher& dispatcher = cluster->dispatcher_;
165167
dispatcher.post([cluster = std::move(cluster)]() mutable { cluster.reset(); });
166168
}
@@ -620,6 +622,14 @@ DynamicModuleLoadBalancer::chooseHost(Upstream::LoadBalancerContext* context) {
620622
return {nullptr};
621623
}
622624

625+
// Pre-capture the worker dispatcher and prepare the cancellation flag before calling into the
626+
// module. The module's choose_host may spawn a background thread that calls
627+
// async_host_selection_complete, which reads these fields. Setting them beforehand establishes
628+
// a happens-before relationship via the thread::spawn synchronization in the module.
629+
const auto* connection = context != nullptr ? context->downstreamConnection() : nullptr;
630+
active_async_dispatcher_ = connection != nullptr ? &connection->dispatcher() : nullptr;
631+
active_async_cancelled_ = std::make_shared<std::atomic<bool>>(false);
632+
623633
envoy_dynamic_module_type_cluster_host_envoy_ptr host_ptr = nullptr;
624634
envoy_dynamic_module_type_cluster_lb_async_handle_module_ptr async_handle = nullptr;
625635
handle_->cluster_->config()->on_cluster_lb_choose_host_(in_module_lb_, context, &host_ptr,
@@ -629,10 +639,14 @@ DynamicModuleLoadBalancer::chooseHost(Upstream::LoadBalancerContext* context) {
629639
// Async pending: the module will call the completion callback later.
630640
auto cancelable = std::make_unique<DynamicModuleAsyncHostSelectionHandle>(
631641
async_handle, in_module_lb_,
632-
handle_->cluster_->config()->on_cluster_lb_cancel_host_selection_);
642+
handle_->cluster_->config()->on_cluster_lb_cancel_host_selection_, active_async_cancelled_);
633643
return Upstream::HostSelectionResponse{nullptr, std::move(cancelable)};
634644
}
635645

646+
// Synchronous result or no host. Clear the async state.
647+
active_async_dispatcher_ = nullptr;
648+
active_async_cancelled_ = nullptr;
649+
636650
if (host_ptr == nullptr) {
637651
return {nullptr};
638652
}
@@ -642,12 +656,19 @@ DynamicModuleLoadBalancer::chooseHost(Upstream::LoadBalancerContext* context) {
642656
return {host};
643657
}
644658

645-
void DynamicModuleAsyncHostSelectionHandle::cancel() {
646-
if (cancel_fn_ != nullptr) {
659+
DynamicModuleAsyncHostSelectionHandle::~DynamicModuleAsyncHostSelectionHandle() {
660+
// Free the module-side async handle. The cancel function takes ownership of the handle and
661+
// drops it, so this works for both cancellation and normal completion paths.
662+
if (async_handle_ != nullptr && cancel_fn_ != nullptr) {
647663
cancel_fn_(in_module_lb_, async_handle_);
664+
async_handle_ = nullptr;
648665
}
649666
}
650667

668+
void DynamicModuleAsyncHostSelectionHandle::cancel() {
669+
cancelled_->store(true, std::memory_order_release);
670+
}
671+
651672
const Upstream::PrioritySet& DynamicModuleLoadBalancer::prioritySet() const {
652673
return handle_->cluster_->prioritySet();
653674
}
@@ -724,28 +745,14 @@ DynamicModuleClusterFactory::createClusterWithConfig(
724745
module_or_error.status().message()));
725746
}
726747

727-
// Use configured metrics namespace or fall back to the default.
728-
const std::string metrics_namespace = module_config.metrics_namespace().empty()
729-
? std::string(DefaultMetricsNamespace)
730-
: module_config.metrics_namespace();
731-
732748
// Create the cluster configuration.
733749
auto config_or_error = DynamicModuleClusterConfig::create(
734-
proto_config.cluster_name(), cluster_config_bytes, metrics_namespace,
735-
std::move(module_or_error.value()), context.serverFactoryContext().serverScope());
750+
proto_config.cluster_name(), cluster_config_bytes, std::move(module_or_error.value()),
751+
context.serverFactoryContext().serverScope());
736752
if (!config_or_error.ok()) {
737753
return config_or_error.status();
738754
}
739755

740-
// When the runtime guard is enabled, register the metrics namespace as a custom stat namespace.
741-
// This causes the namespace prefix to be stripped from prometheus output and no envoy_ prefix
742-
// is added. This is the legacy behavior for backward compatibility.
743-
if (Runtime::runtimeFeatureEnabled(
744-
"envoy.reloadable_features.dynamic_modules_strip_custom_stat_prefix")) {
745-
context.serverFactoryContext().api().customStatNamespaces().registerStatNamespace(
746-
metrics_namespace);
747-
}
748-
749756
// Create the cluster.
750757
absl::Status creation_status = absl::OkStatus();
751758
auto new_cluster = std::shared_ptr<DynamicModuleCluster>(new DynamicModuleCluster(

source/extensions/clusters/dynamic_modules/cluster.h

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#pragma once
22

3+
#include <atomic>
34
#include <memory>
45
#include <string>
56
#include <vector>
@@ -31,10 +32,6 @@ namespace Extensions {
3132
namespace Clusters {
3233
namespace DynamicModules {
3334

34-
// The default custom stat namespace which prepends all user-defined metrics.
35-
// This can be overridden via the ``metrics_namespace`` field in ``DynamicModuleConfig``.
36-
constexpr absl::string_view DefaultMetricsNamespace = "dynamicmodulescustom";
37-
3835
class DynamicModuleCluster;
3936
class DynamicModuleClusterScheduler;
4037
class DynamicModuleClusterTestPeer;
@@ -70,14 +67,12 @@ class DynamicModuleClusterConfig {
7067
*
7168
* @param cluster_name the name identifying the cluster implementation in the module.
7269
* @param cluster_config the configuration bytes to pass to the module.
73-
* @param metrics_namespace the namespace prefix for metrics emitted by this module.
7470
* @param module the loaded dynamic module.
7571
* @param stats_scope the stats scope for creating custom metrics.
7672
* @return a shared pointer to the config, or an error status.
7773
*/
7874
static absl::StatusOr<std::shared_ptr<DynamicModuleClusterConfig>>
7975
create(const std::string& cluster_name, const std::string& cluster_config,
80-
const std::string& metrics_namespace,
8176
Envoy::Extensions::DynamicModules::DynamicModulePtr module, Stats::Scope& stats_scope);
8277

8378
~DynamicModuleClusterConfig();
@@ -268,7 +263,6 @@ class DynamicModuleClusterConfig {
268263

269264
private:
270265
DynamicModuleClusterConfig(const std::string& cluster_name, const std::string& cluster_config,
271-
const std::string& metrics_namespace,
272266
Envoy::Extensions::DynamicModules::DynamicModulePtr module,
273267
Stats::Scope& stats_scope);
274268

@@ -471,15 +465,19 @@ class DynamicModuleAsyncHostSelectionHandle : public Upstream::AsyncHostSelectio
471465
DynamicModuleAsyncHostSelectionHandle(
472466
envoy_dynamic_module_type_cluster_lb_async_handle_module_ptr async_handle,
473467
envoy_dynamic_module_type_cluster_lb_module_ptr in_module_lb,
474-
OnClusterLbCancelHostSelectionType cancel_fn)
475-
: async_handle_(async_handle), in_module_lb_(in_module_lb), cancel_fn_(cancel_fn) {}
468+
OnClusterLbCancelHostSelectionType cancel_fn, std::shared_ptr<std::atomic<bool>> cancelled)
469+
: async_handle_(async_handle), in_module_lb_(in_module_lb), cancel_fn_(cancel_fn),
470+
cancelled_(std::move(cancelled)) {}
471+
472+
~DynamicModuleAsyncHostSelectionHandle() override;
476473

477474
void cancel() override;
478475

479476
private:
480477
envoy_dynamic_module_type_cluster_lb_async_handle_module_ptr async_handle_;
481478
envoy_dynamic_module_type_cluster_lb_module_ptr in_module_lb_;
482479
OnClusterLbCancelHostSelectionType cancel_fn_;
480+
std::shared_ptr<std::atomic<bool>> cancelled_;
483481
};
484482

485483
/**
@@ -510,6 +508,22 @@ class DynamicModuleLoadBalancer : public Upstream::LoadBalancer {
510508
// Access the handle for async host selection completion.
511509
const DynamicModuleClusterHandleSharedPtr& handle() const { return handle_; }
512510

511+
/**
512+
* Returns the shared cancellation flag for the current async host selection. When the router
513+
* cancels the selection (e.g., stream timeout), the flag is set so the posted completion
514+
* callback becomes a no-op. Returns nullptr when there is no active async selection.
515+
*/
516+
std::shared_ptr<std::atomic<bool>> activeAsyncCancelled() const {
517+
return active_async_cancelled_;
518+
}
519+
520+
/**
521+
* Returns the worker thread's dispatcher captured during chooseHost. Used by the async
522+
* completion callback in abi_impl.cc to post to the correct worker thread without accessing
523+
* the LoadBalancerContext from a background thread.
524+
*/
525+
Event::Dispatcher* activeAsyncDispatcher() const { return active_async_dispatcher_; }
526+
513527
// Per-host custom data storage.
514528
bool setHostData(uint32_t priority, size_t index, uintptr_t data);
515529
bool getHostData(uint32_t priority, size_t index, uintptr_t* data) const;
@@ -522,6 +536,13 @@ class DynamicModuleLoadBalancer : public Upstream::LoadBalancer {
522536
const DynamicModuleClusterHandleSharedPtr handle_;
523537
envoy_dynamic_module_type_cluster_lb_module_ptr in_module_lb_;
524538

539+
// Shared cancellation flag for the active async host selection. Set in chooseHost when the
540+
// module returns AsyncPending, and read by the posted completion callback in abi_impl.cc.
541+
std::shared_ptr<std::atomic<bool>> active_async_cancelled_;
542+
543+
// Worker thread dispatcher captured during chooseHost for async completion posting.
544+
Event::Dispatcher* active_async_dispatcher_{nullptr};
545+
525546
// Per-host data storage keyed by (priority, index). This is per-LB-instance (per-worker).
526547
absl::flat_hash_map<std::pair<uint32_t, size_t>, uintptr_t> per_host_data_;
527548

test/extensions/clusters/dynamic_modules/BUILD

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,22 @@ envoy_cc_test(
4141
"@envoy_api//envoy/extensions/clusters/dynamic_modules/v3:pkg_cc_proto",
4242
],
4343
)
44+
45+
envoy_cc_test(
46+
name = "integration_test",
47+
srcs = ["integration_test.cc"],
48+
data = [
49+
"//test/extensions/dynamic_modules/test_data/rust:cluster_integration_test",
50+
],
51+
rbe_pool = "6gig",
52+
deps = [
53+
"//source/extensions/clusters/dynamic_modules:cluster",
54+
"//source/extensions/dynamic_modules:abi_impl",
55+
"//source/extensions/load_balancing_policies/cluster_provided:config",
56+
"//test/extensions/dynamic_modules:util",
57+
"//test/integration:http_integration_lib",
58+
"@envoy_api//envoy/config/bootstrap/v3:pkg_cc_proto",
59+
"@envoy_api//envoy/config/cluster/v3:pkg_cc_proto",
60+
"@envoy_api//envoy/extensions/clusters/dynamic_modules/v3:pkg_cc_proto",
61+
],
62+
)

test/extensions/clusters/dynamic_modules/cluster_test.cc

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2059,8 +2059,10 @@ TEST_F(DynamicModuleClusterTest, AsyncHostSelectionHandleCancel) {
20592059
reinterpret_cast<envoy_dynamic_module_type_cluster_lb_async_handle_module_ptr>(0xCAFE);
20602060
auto* dummy_lb = reinterpret_cast<envoy_dynamic_module_type_cluster_lb_module_ptr>(0xBEEF);
20612061

2062-
DynamicModuleAsyncHostSelectionHandle handle(dummy_async_handle, dummy_lb, nullptr);
2062+
auto cancelled = std::make_shared<std::atomic<bool>>(false);
2063+
DynamicModuleAsyncHostSelectionHandle handle(dummy_async_handle, dummy_lb, nullptr, cancelled);
20632064
handle.cancel();
2065+
EXPECT_TRUE(cancelled->load());
20642066
}
20652067

20662068
// Test DynamicModuleAsyncHostSelectionHandle cancel with null cancel_fn.
@@ -2069,7 +2071,8 @@ TEST_F(DynamicModuleClusterTest, AsyncHostSelectionHandleCancelNullFn) {
20692071
reinterpret_cast<envoy_dynamic_module_type_cluster_lb_async_handle_module_ptr>(0xCAFE);
20702072
auto* dummy_lb = reinterpret_cast<envoy_dynamic_module_type_cluster_lb_module_ptr>(0xBEEF);
20712073

2072-
DynamicModuleAsyncHostSelectionHandle handle(dummy_async_handle, dummy_lb, nullptr);
2074+
auto cancelled = std::make_shared<std::atomic<bool>>(false);
2075+
DynamicModuleAsyncHostSelectionHandle handle(dummy_async_handle, dummy_lb, nullptr, cancelled);
20732076
// Should not crash with nullptr cancel function.
20742077
handle.cancel();
20752078
}

0 commit comments

Comments
 (0)