Skip to content

Commit 9794876

Browse files
[multicast] put_upsert test helper, pool selection tests, and ASM source TODO wrt Dendrite
Includes: - Add shared `put_upsert` helper for idempotent PUT+CREATED requests, for 201 responses - Add pool_selection.rs tests for SSM/ASM fallback behavior - ASM sources TODO/workaround: - Only send sources to DPD for SSM groups (232/8 IPv4, ff3x:: IPv6) - ASM groups get `None` for sources, meaning "any source allowed" - Temporary fix until dendrite accepts ASM source filtering (upcoming PR) - Schema - Bump version 213.0. 214.0.0 (post-merge_
1 parent 52a3d65 commit 9794876

File tree

9 files changed

+369
-337
lines changed

9 files changed

+369
-337
lines changed

nexus/src/app/multicast/dataplane.rs

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -415,11 +415,22 @@ impl MulticastDataplaneClient {
415415
vni: Vni::from(u32::from(external_group.vni.0)),
416416
};
417417

418-
let sources_dpd =
419-
sources.iter().map(|ip| IpSrc::Exact(ip.ip())).collect::<Vec<_>>();
420-
421418
let external_group_ip = external_group.multicast_ip.ip();
422419

420+
// TODO: ASM source filtering will be accepted in dendrite in a follow-up
421+
// PR stacked on this one. For now, we only send sources to DPD for SSM
422+
// groups. ASM groups get `None`, meaning "any source allowed".
423+
let sources_dpd = if is_ssm_address(external_group_ip) {
424+
Some(
425+
sources
426+
.iter()
427+
.map(|ip| IpSrc::Exact(ip.ip()))
428+
.collect::<Vec<_>>(),
429+
)
430+
} else {
431+
None
432+
};
433+
423434
let create_operations =
424435
dpd_clients.into_iter().map(|(switch_location, client)| {
425436
let tag = tag.clone();
@@ -444,7 +455,7 @@ impl MulticastDataplaneClient {
444455
nat_target: Some(nat_target),
445456
},
446457
tag: Some(tag.clone()),
447-
sources: Some(sources),
458+
sources,
448459
};
449460

450461
let external_response = self
@@ -555,11 +566,20 @@ impl MulticastDataplaneClient {
555566
let new_name_str = params.new_name.to_string();
556567
let external_group_ip = params.external_group.multicast_ip.ip();
557568

558-
let sources_dpd = params
559-
.new_sources
560-
.iter()
561-
.map(|ip| IpSrc::Exact(ip.ip()))
562-
.collect::<Vec<_>>();
569+
// TODO: ASM source filtering will be accepted in dendrite in a follow-up
570+
// PR stacked on this one. For now, we only send sources to DPD for SSM
571+
// groups. ASM groups get `None`, meaning "any source allowed".
572+
let sources_dpd = if is_ssm_address(external_group_ip) {
573+
Some(
574+
params
575+
.new_sources
576+
.iter()
577+
.map(|ip| IpSrc::Exact(ip.ip()))
578+
.collect::<Vec<_>>(),
579+
)
580+
} else {
581+
None
582+
};
563583

564584
let update_operations =
565585
dpd_clients.into_iter().map(|(switch_location, client)| {
@@ -634,14 +654,14 @@ impl MulticastDataplaneClient {
634654
external_forwarding: external_forwarding.clone(),
635655
internal_forwarding: internal_forwarding.clone(),
636656
tag: Some(new_name.clone()),
637-
sources: Some(sources.clone()),
657+
sources: sources.clone(),
638658
};
639659
let create_entry = MulticastGroupCreateExternalEntry {
640660
group_ip: external_group_ip,
641661
external_forwarding,
642662
internal_forwarding,
643663
tag: Some(new_name.clone()),
644-
sources: Some(sources),
664+
sources,
645665
};
646666

647667
let external_response = self

nexus/tests/integration_tests/multicast/api.rs

Lines changed: 43 additions & 145 deletions
Original file line numberDiff line numberDiff line change
@@ -157,15 +157,12 @@ async fn test_multicast_api_behavior(cptestctx: &ControlPlaneTestContext) {
157157
let duplicate_join_params = InstanceMulticastGroupJoin { source_ips: None };
158158

159159
// This should succeed idempotently
160-
NexusRequest::new(
161-
RequestBuilder::new(client, Method::PUT, &duplicate_join_url)
162-
.body(Some(&duplicate_join_params))
163-
.expect_status(Some(StatusCode::CREATED)),
160+
put_upsert::<_, MulticastGroupMember>(
161+
client,
162+
&duplicate_join_url,
163+
&duplicate_join_params,
164164
)
165-
.authn_as(AuthnMode::PrivilegedUser)
166-
.execute()
167-
.await
168-
.expect("Idempotent instance join should succeed");
165+
.await;
169166

170167
// Final verification: member count should still be 2 (no duplicates)
171168
let final_members = list_multicast_group_members(client, group_name).await;
@@ -211,19 +208,12 @@ async fn test_multicast_api_behavior(cptestctx: &ControlPlaneTestContext) {
211208
// Join using UUIDs (no project parameter)
212209
let join_url_uuid =
213210
format!("/v1/instances/{instance_uuid}/multicast-groups/{group_uuid}");
214-
let member_uuid: MulticastGroupMember = NexusRequest::new(
215-
RequestBuilder::new(client, Method::PUT, &join_url_uuid)
216-
.body(Some(&InstanceMulticastGroupJoin::default()))
217-
.expect_status(Some(StatusCode::CREATED)),
211+
let member_uuid: MulticastGroupMember = put_upsert(
212+
client,
213+
&join_url_uuid,
214+
&InstanceMulticastGroupJoin::default(),
218215
)
219-
.authn_as(AuthnMode::PrivilegedUser)
220-
.execute()
221-
.await
222-
.expect("UUID-based join should succeed")
223-
.parsed_body()
224-
.expect(
225-
"Failed to parse MulticastGroupMember from UUID-based join response",
226-
);
216+
.await;
227217

228218
assert_eq!(member_uuid.instance_id, instance_uuid);
229219
// Instance is stopped (start: false), so reconciler transitions "Joining"→"Left"
@@ -436,17 +426,8 @@ async fn test_join_by_ip_ssm_with_sources(cptestctx: &ControlPlaneTestContext) {
436426
let join_body =
437427
InstanceMulticastGroupJoin { source_ips: Some(vec![source_ip]) };
438428

439-
let member: MulticastGroupMember = NexusRequest::new(
440-
RequestBuilder::new(client, Method::PUT, &join_url)
441-
.body(Some(&join_body))
442-
.expect_status(Some(StatusCode::CREATED)),
443-
)
444-
.authn_as(AuthnMode::PrivilegedUser)
445-
.execute()
446-
.await
447-
.expect("SSM join-by-IP should succeed")
448-
.parsed_body()
449-
.expect("Should parse member");
429+
let member: MulticastGroupMember =
430+
put_upsert(client, &join_url, &join_body).await;
450431

451432
assert_eq!(
452433
member.multicast_ip.to_string(),
@@ -570,19 +551,10 @@ async fn test_join_existing_ssm_group_by_id_without_sources_fails(
570551
"/v1/instances/ssm-id-inst-1/multicast-groups/{ssm_ip}?project={project_name}"
571552
);
572553

573-
let member_1: MulticastGroupMember = NexusRequest::new(
574-
RequestBuilder::new(client, Method::PUT, &join_url_1)
575-
.body(Some(&InstanceMulticastGroupJoin {
576-
source_ips: Some(vec![source_ip]),
577-
}))
578-
.expect_status(Some(StatusCode::CREATED)),
579-
)
580-
.authn_as(AuthnMode::PrivilegedUser)
581-
.execute()
582-
.await
583-
.expect("First instance should join SSM group with sources")
584-
.parsed_body()
585-
.unwrap();
554+
let join_body_1 =
555+
InstanceMulticastGroupJoin { source_ips: Some(vec![source_ip]) };
556+
let member_1: MulticastGroupMember =
557+
put_upsert(client, &join_url_1, &join_body_1).await;
586558

587559
let group_id = member_1.multicast_group_id;
588560

@@ -656,17 +628,7 @@ async fn test_join_existing_ssm_group_by_name_without_sources_fails(
656628
source_ips: Some(vec!["10.0.0.1".parse().unwrap()]),
657629
};
658630

659-
NexusRequest::new(
660-
RequestBuilder::new(client, Method::PUT, &join_url)
661-
.body(Some(&join_body))
662-
.expect_status(Some(StatusCode::CREATED)),
663-
)
664-
.authn_as(AuthnMode::PrivilegedUser)
665-
.execute()
666-
.await
667-
.expect("First instance should join SSM group with sources")
668-
.parsed_body::<MulticastGroupMember>()
669-
.unwrap();
631+
put_upsert::<_, MulticastGroupMember>(client, &join_url, &join_body).await;
670632

671633
// Get the group's auto-generated name
672634
let expected_group_name = format!("mcast-{}", ssm_ip.replace('.', "-"));
@@ -800,17 +762,7 @@ async fn test_join_existing_ssm_group_by_ip_without_sources_fails(
800762
source_ips: Some(vec!["10.0.0.1".parse().unwrap()]),
801763
};
802764

803-
NexusRequest::new(
804-
RequestBuilder::new(client, Method::PUT, &join_url)
805-
.body(Some(&join_body))
806-
.expect_status(Some(StatusCode::CREATED)),
807-
)
808-
.authn_as(AuthnMode::PrivilegedUser)
809-
.execute()
810-
.await
811-
.expect("First instance should join SSM group with sources")
812-
.parsed_body::<MulticastGroupMember>()
813-
.unwrap();
765+
put_upsert::<_, MulticastGroupMember>(client, &join_url, &join_body).await;
814766

815767
let expected_group_name = format!("mcast-{}", ssm_ip.replace('.', "-"));
816768

@@ -934,35 +886,19 @@ async fn test_join_by_ip_existing_group(cptestctx: &ControlPlaneTestContext) {
934886
let join_url_1 = format!(
935887
"/v1/instances/existing-inst-1/multicast-groups/{explicit_ip}?project={project_name}"
936888
);
937-
let member1: MulticastGroupMember = NexusRequest::new(
938-
RequestBuilder::new(client, Method::PUT, &join_url_1)
939-
.body(Some(&InstanceMulticastGroupJoin::default()))
940-
.expect_status(Some(StatusCode::CREATED)),
941-
)
942-
.authn_as(AuthnMode::PrivilegedUser)
943-
.execute()
944-
.await
945-
.expect("First join-by-IP should succeed")
946-
.parsed_body()
947-
.expect("Should parse member");
889+
let member1: MulticastGroupMember =
890+
put_upsert(client, &join_url_1, &InstanceMulticastGroupJoin::default())
891+
.await;
948892

949893
wait_for_group_active(client, &expected_group_name).await;
950894

951895
// Second instance joins the same IP; should attach to existing group
952896
let join_url_2 = format!(
953897
"/v1/instances/existing-inst-2/multicast-groups/{explicit_ip}?project={project_name}"
954898
);
955-
let member2: MulticastGroupMember = NexusRequest::new(
956-
RequestBuilder::new(client, Method::PUT, &join_url_2)
957-
.body(Some(&InstanceMulticastGroupJoin::default()))
958-
.expect_status(Some(StatusCode::CREATED)),
959-
)
960-
.authn_as(AuthnMode::PrivilegedUser)
961-
.execute()
962-
.await
963-
.expect("Second join-by-IP should succeed")
964-
.parsed_body()
965-
.expect("Should parse member");
899+
let member2: MulticastGroupMember =
900+
put_upsert(client, &join_url_2, &InstanceMulticastGroupJoin::default())
901+
.await;
966902

967903
// Both members should have the same group and IP
968904
assert_eq!(member1.multicast_group_id, member2.multicast_group_id);
@@ -1021,37 +957,21 @@ async fn test_join_by_ip_different_sources_succeeds(
1021957
let join_url_1 = format!(
1022958
"/v1/instances/diff-sources-inst-1/multicast-groups/{explicit_ssm_ip}?project={project_name}"
1023959
);
1024-
NexusRequest::new(
1025-
RequestBuilder::new(client, Method::PUT, &join_url_1)
1026-
.body(Some(&InstanceMulticastGroupJoin {
1027-
source_ips: Some(vec![source1]),
1028-
}))
1029-
.expect_status(Some(StatusCode::CREATED)),
1030-
)
1031-
.authn_as(AuthnMode::PrivilegedUser)
1032-
.execute()
1033-
.await
1034-
.expect("First SSM join should succeed")
1035-
.parsed_body::<MulticastGroupMember>()
1036-
.expect("Should parse member");
960+
let join_body_1 =
961+
InstanceMulticastGroupJoin { source_ips: Some(vec![source1]) };
962+
put_upsert::<_, MulticastGroupMember>(client, &join_url_1, &join_body_1)
963+
.await;
1037964

1038965
wait_for_group_active(client, &expected_group_name).await;
1039966

1040967
// Second instance joins with different source (sources are per-member)
1041968
let join_url_2 = format!(
1042969
"/v1/instances/diff-sources-inst-2/multicast-groups/{explicit_ssm_ip}?project={project_name}"
1043970
);
1044-
NexusRequest::new(
1045-
RequestBuilder::new(client, Method::PUT, &join_url_2)
1046-
.body(Some(&InstanceMulticastGroupJoin {
1047-
source_ips: Some(vec![source2]),
1048-
}))
1049-
.expect_status(Some(StatusCode::CREATED)),
1050-
)
1051-
.authn_as(AuthnMode::PrivilegedUser)
1052-
.execute()
1053-
.await
1054-
.expect("Second instance with different sources should succeed");
971+
let join_body_2 =
972+
InstanceMulticastGroupJoin { source_ips: Some(vec![source2]) };
973+
put_upsert::<_, MulticastGroupMember>(client, &join_url_2, &join_body_2)
974+
.await;
1055975

1056976
// Verify group source_ips is union of both members' sources
1057977
let group: MulticastGroup = object_get(
@@ -1118,15 +1038,12 @@ async fn test_join_by_ip_asm_with_sources_succeeds(
11181038
"/v1/instances/{}/multicast-groups/{explicit_ip}?project={project_name}",
11191039
instance1.identity.name
11201040
);
1121-
NexusRequest::new(
1122-
RequestBuilder::new(client, Method::PUT, &join_url1)
1123-
.body(Some(&InstanceMulticastGroupJoin::default()))
1124-
.expect_status(Some(StatusCode::CREATED)),
1041+
put_upsert::<_, MulticastGroupMember>(
1042+
client,
1043+
&join_url1,
1044+
&InstanceMulticastGroupJoin::default(),
11251045
)
1126-
.authn_as(AuthnMode::PrivilegedUser)
1127-
.execute()
1128-
.await
1129-
.expect("First instance ASM join without sources should succeed");
1046+
.await;
11301047

11311048
wait_for_group_active(client, &expected_group_name).await;
11321049

@@ -1148,18 +1065,10 @@ async fn test_join_by_ip_asm_with_sources_succeeds(
11481065
);
11491066
let source1: IpAddr = "10.99.99.1".parse().unwrap();
11501067
let source2: IpAddr = "10.99.99.2".parse().unwrap();
1151-
1152-
NexusRequest::new(
1153-
RequestBuilder::new(client, Method::PUT, &join_url2)
1154-
.body(Some(&InstanceMulticastGroupJoin {
1155-
source_ips: Some(vec![source1, source2]),
1156-
}))
1157-
.expect_status(Some(StatusCode::CREATED)),
1158-
)
1159-
.authn_as(AuthnMode::PrivilegedUser)
1160-
.execute()
1161-
.await
1162-
.expect("Second instance ASM join with sources should succeed");
1068+
let join_body_2 =
1069+
InstanceMulticastGroupJoin { source_ips: Some(vec![source1, source2]) };
1070+
put_upsert::<_, MulticastGroupMember>(client, &join_url2, &join_body_2)
1071+
.await;
11631072

11641073
// Verify group source_ips is union of all member sources
11651074
let group: MulticastGroup = object_get(
@@ -1257,19 +1166,8 @@ async fn test_explicit_ip_bypasses_ssm_asm_selection(
12571166
let join_body =
12581167
InstanceMulticastGroupJoin { source_ips: Some(vec![source_ip]) };
12591168

1260-
let member: MulticastGroupMember = NexusRequest::new(
1261-
RequestBuilder::new(client, Method::PUT, &join_url)
1262-
.body(Some(&join_body))
1263-
.expect_status(Some(StatusCode::CREATED)),
1264-
)
1265-
.authn_as(AuthnMode::PrivilegedUser)
1266-
.execute()
1267-
.await
1268-
.expect(
1269-
"ASM IP with sources should succeed (IP determines pool, not sources)",
1270-
)
1271-
.parsed_body()
1272-
.unwrap();
1169+
let member: MulticastGroupMember =
1170+
put_upsert(client, &join_url, &join_body).await;
12731171

12741172
// Verify member has the source IP
12751173
assert_eq!(member.source_ips, vec![source_ip]);

nexus/tests/integration_tests/multicast/authorization.rs

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -387,15 +387,8 @@ async fn test_cross_project_instance_attachment_allowed(
387387
// First instance join implicitly creates the group
388388
let join_url1 = "/v1/instances/instance1/multicast-groups/cross-project-group?project=project1";
389389
let join_params = InstanceMulticastGroupJoin { source_ips: None };
390-
NexusRequest::new(
391-
RequestBuilder::new(client, http::Method::PUT, join_url1)
392-
.body(Some(&join_params))
393-
.expect_status(Some(StatusCode::CREATED)),
394-
)
395-
.authn_as(AuthnMode::PrivilegedUser)
396-
.execute()
397-
.await
398-
.expect("Should join instance1 to group");
390+
put_upsert::<_, MulticastGroupMember>(client, join_url1, &join_params)
391+
.await;
399392

400393
// Fetch the implicitly created group
401394
let group: MulticastGroup =
@@ -406,15 +399,8 @@ async fn test_cross_project_instance_attachment_allowed(
406399
"/v1/instances/instance2/multicast-groups/{}?project=project2",
407400
group.identity.name
408401
);
409-
NexusRequest::new(
410-
RequestBuilder::new(client, http::Method::PUT, &join_url2)
411-
.body(Some(&join_params))
412-
.expect_status(Some(StatusCode::CREATED)),
413-
)
414-
.authn_as(AuthnMode::PrivilegedUser)
415-
.execute()
416-
.await
417-
.expect("Should join instance2 to group");
402+
put_upsert::<_, MulticastGroupMember>(client, &join_url2, &join_params)
403+
.await;
418404

419405
// Verify both instances are members of the same group
420406
let members =

0 commit comments

Comments
 (0)