Skip to content

Commit 9882d82

Browse files
authored
fix(coprocessor): context keys/values not deleted across stages (#8679)
1 parent a96d8bb commit 9882d82

File tree

8 files changed

+325
-64
lines changed

8 files changed

+325
-64
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
### Ensure that coprocessor context keys/values removed in a stage do not re-appear in later stages ([PR #8679](https://github.com/apollographql/router/pull/8679))
2+
3+
Ensure that coprocessor keys that are deleted in a previous stage do not re-appear in later stages
4+
5+
By [@rohan-b99](https://github.com/rohan-b99) in https://github.com/apollographql/router/pull/8679

apollo-router/src/context/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,14 @@ impl Context {
255255
}
256256
}
257257

258+
pub(crate) fn retain(&self, f: impl Fn(&String, &Value) -> bool) {
259+
self.entries.retain(|k, v| f(k, v));
260+
}
261+
262+
pub(crate) fn len(&self) -> usize {
263+
self.entries.len()
264+
}
265+
258266
/// Read only access to the executable document for internal router plugins.
259267
pub(crate) fn executable_document(&self) -> Option<Arc<Valid<ExecutableDocument>>> {
260268
self.extensions()

apollo-router/src/plugins/coprocessor/execution.rs

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -409,16 +409,7 @@ where
409409
}
410410

411411
if let Some(context) = co_processor_output.context {
412-
for (mut key, value) in context.try_into_iter()? {
413-
if let ContextConf::NewContextConf(NewContextConf::Deprecated) =
414-
&response_config.context
415-
{
416-
key = context_key_from_deprecated(key);
417-
}
418-
response
419-
.context
420-
.upsert_json_value(key, move |_current| value);
421-
}
412+
update_context_from_coprocessor(&response.context, context, &response_config.context)?;
422413
}
423414

424415
if let Some(headers) = co_processor_output.headers {
@@ -487,14 +478,11 @@ where
487478
)?;
488479

489480
if let Some(context) = co_processor_output.context {
490-
for (mut key, value) in context.try_into_iter()? {
491-
if let ContextConf::NewContextConf(NewContextConf::Deprecated) =
492-
&response_config_context
493-
{
494-
key = context_key_from_deprecated(key);
495-
}
496-
generator_map_context.upsert_json_value(key, move |_current| value);
497-
}
481+
update_context_from_coprocessor(
482+
&generator_map_context,
483+
context,
484+
&response_config_context,
485+
)?;
498486
}
499487

500488
// We return the deferred_response into our stream of response chunks

apollo-router/src/plugins/coprocessor/mod.rs

Lines changed: 48 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -495,6 +495,47 @@ fn default_response_validation() -> bool {
495495
true
496496
}
497497

498+
/// Update the target context based on the context returned from the coprocessor.
499+
/// This function handles both updates/inserts and deletions:
500+
/// - Keys present in the returned context (with non-null values) are updated/inserted
501+
/// - Keys that were sent to the coprocessor but are missing from the returned context are deleted
502+
pub(crate) fn update_context_from_coprocessor(
503+
target_context: &Context,
504+
context_returned: Context,
505+
context_config: &ContextConf,
506+
) -> Result<(), BoxError> {
507+
// Collect keys that are in the returned context
508+
let mut keys_returned = HashSet::with_capacity(context_returned.len());
509+
510+
for (mut key, value) in context_returned.try_into_iter()? {
511+
// Handle deprecated key names - convert back to actual key names
512+
if let ContextConf::NewContextConf(NewContextConf::Deprecated) = context_config {
513+
key = context_key_from_deprecated(key);
514+
}
515+
516+
keys_returned.insert(key.clone());
517+
target_context.insert_json_value(key, value);
518+
}
519+
520+
// Delete keys that were sent but are missing from the returned context
521+
// If the context config is selective, only delete keys that are in the selective list
522+
match context_config {
523+
ContextConf::NewContextConf(NewContextConf::Selective(context_keys)) => {
524+
target_context.retain(|key, _v| {
525+
if keys_returned.contains(key) {
526+
return true;
527+
} else if context_keys.contains(key) {
528+
return false;
529+
}
530+
true
531+
});
532+
}
533+
_ => target_context.retain(|key, _v| keys_returned.contains(key)),
534+
}
535+
536+
Ok(())
537+
}
538+
498539
fn record_coprocessor_duration(stage: PipelineStep, duration: Duration) {
499540
f64_histogram!(
500541
"apollo.router.operations.coprocessor.duration",
@@ -1025,16 +1066,7 @@ where
10251066
}
10261067

10271068
if let Some(context) = co_processor_output.context {
1028-
for (mut key, value) in context.try_into_iter()? {
1029-
if let ContextConf::NewContextConf(NewContextConf::Deprecated) =
1030-
&response_config.context
1031-
{
1032-
key = context_key_from_deprecated(key);
1033-
}
1034-
response
1035-
.context
1036-
.upsert_json_value(key, move |_current| value);
1037-
}
1069+
update_context_from_coprocessor(&response.context, context, &response_config.context)?;
10381070
}
10391071

10401072
if let Some(headers) = co_processor_output.headers {
@@ -1099,14 +1131,11 @@ where
10991131
};
11001132

11011133
if let Some(context) = co_processor_output.context {
1102-
for (mut key, value) in context.try_into_iter()? {
1103-
if let ContextConf::NewContextConf(NewContextConf::Deprecated) =
1104-
&context_conf
1105-
{
1106-
key = context_key_from_deprecated(key);
1107-
}
1108-
generator_map_context.upsert_json_value(key, move |_current| value);
1109-
}
1134+
update_context_from_coprocessor(
1135+
&generator_map_context,
1136+
context,
1137+
&context_conf,
1138+
)?;
11101139
}
11111140

11121141
// We return the final_bytes into our stream of response chunks
@@ -1368,16 +1397,7 @@ where
13681397
}
13691398

13701399
if let Some(context) = co_processor_output.context {
1371-
for (mut key, value) in context.try_into_iter()? {
1372-
if let ContextConf::NewContextConf(NewContextConf::Deprecated) =
1373-
&response_config.context
1374-
{
1375-
key = context_key_from_deprecated(key);
1376-
}
1377-
response
1378-
.context
1379-
.upsert_json_value(key, move |_current| value);
1380-
}
1400+
update_context_from_coprocessor(&response.context, context, &response_config.context)?;
13811401
}
13821402

13831403
if let Some(headers) = co_processor_output.headers {

apollo-router/src/plugins/coprocessor/supergraph.rs

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -413,16 +413,7 @@ where
413413
}
414414

415415
if let Some(context) = co_processor_output.context {
416-
for (mut key, value) in context.try_into_iter()? {
417-
if let ContextConf::NewContextConf(NewContextConf::Deprecated) =
418-
&response_config.context
419-
{
420-
key = context_key_from_deprecated(key);
421-
}
422-
response
423-
.context
424-
.upsert_json_value(key, move |_current| value);
425-
}
416+
update_context_from_coprocessor(&response.context, context, &response_config.context)?;
426417
}
427418

428419
if let Some(headers) = co_processor_output.headers {
@@ -499,14 +490,11 @@ where
499490
)?;
500491

501492
if let Some(context) = co_processor_output.context {
502-
for (mut key, value) in context.try_into_iter()? {
503-
if let ContextConf::NewContextConf(NewContextConf::Deprecated) =
504-
&response_config_context
505-
{
506-
key = context_key_from_deprecated(key);
507-
}
508-
generator_map_context.upsert_json_value(key, move |_current| value);
509-
}
493+
update_context_from_coprocessor(
494+
&generator_map_context,
495+
context,
496+
&response_config_context,
497+
)?;
510498
}
511499

512500
// We return the deferred_response into our stream of response chunks

apollo-router/src/plugins/coprocessor/test.rs

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3999,4 +3999,119 @@ mod tests {
39993999
.is_ok()
40004000
);
40014001
}
4002+
4003+
// Tests for context key deletion functionality
4004+
4005+
#[test]
4006+
fn test_update_context_from_coprocessor_deletes_missing_keys() {
4007+
use crate::Context;
4008+
use crate::plugins::coprocessor::update_context_from_coprocessor;
4009+
4010+
// Create a context with some keys
4011+
let target_context = Context::new();
4012+
target_context.insert("k1", "v1".to_string()).unwrap();
4013+
target_context.insert("k2", "v2".to_string()).unwrap();
4014+
target_context.insert("k3", "v3".to_string()).unwrap();
4015+
4016+
// Coprocessor returns context without k2 (deleted)
4017+
let returned_context = Context::new();
4018+
returned_context
4019+
.insert("k1", "v1_updated".to_string())
4020+
.unwrap();
4021+
// k2 is missing (deleted)
4022+
returned_context.insert("k3", "v3".to_string()).unwrap();
4023+
4024+
// Update context
4025+
update_context_from_coprocessor(
4026+
&target_context,
4027+
returned_context,
4028+
&ContextConf::NewContextConf(NewContextConf::All),
4029+
)
4030+
.unwrap();
4031+
4032+
// k1 should be updated
4033+
assert_eq!(
4034+
target_context.get_json_value("k1"),
4035+
Some(serde_json_bytes::json!("v1_updated"))
4036+
);
4037+
// k2 should be deleted
4038+
assert!(!target_context.contains_key("k2"));
4039+
// k3 should remain
4040+
assert_eq!(
4041+
target_context.get_json_value("k3"),
4042+
Some(serde_json_bytes::json!("v3"))
4043+
);
4044+
}
4045+
4046+
#[test]
4047+
fn test_update_context_from_coprocessor_adds_new_keys() {
4048+
use crate::Context;
4049+
use crate::plugins::coprocessor::update_context_from_coprocessor;
4050+
4051+
// Create a context with some keys
4052+
let target_context = Context::new();
4053+
target_context.insert("k1", "v1".to_string()).unwrap();
4054+
4055+
// Coprocessor returns context with a new key
4056+
let returned_context = Context::new();
4057+
returned_context
4058+
.insert("k1", "v1_updated".to_string())
4059+
.unwrap();
4060+
returned_context.insert("k2", "v2_new".to_string()).unwrap();
4061+
4062+
// Update context
4063+
update_context_from_coprocessor(
4064+
&target_context,
4065+
returned_context,
4066+
&ContextConf::NewContextConf(NewContextConf::All),
4067+
)
4068+
.unwrap();
4069+
4070+
// k1 should be updated
4071+
assert_eq!(
4072+
target_context.get_json_value("k1"),
4073+
Some(serde_json_bytes::json!("v1_updated"))
4074+
);
4075+
// k2 should be added
4076+
assert_eq!(
4077+
target_context.get_json_value("k2"),
4078+
Some(serde_json_bytes::json!("v2_new"))
4079+
);
4080+
}
4081+
4082+
#[test]
4083+
fn test_update_context_from_coprocessor_preserves_keys_not_sent() {
4084+
use std::collections::HashSet;
4085+
use std::sync::Arc;
4086+
4087+
use crate::Context;
4088+
use crate::plugins::coprocessor::update_context_from_coprocessor;
4089+
4090+
// Create a context with some keys
4091+
let target_context = Context::new();
4092+
target_context.insert("k1", "v1".to_string()).unwrap();
4093+
target_context
4094+
.insert("key_not_sent", "preserved_value".to_string())
4095+
.unwrap();
4096+
4097+
// Coprocessor returns context without k1 (deleted)
4098+
let returned_context = Context::new();
4099+
4100+
// Use Selective config to only send "k1", not "key_not_sent"
4101+
let selective_keys: HashSet<String> = ["k1".to_string()].into();
4102+
let context_config =
4103+
ContextConf::NewContextConf(NewContextConf::Selective(Arc::new(selective_keys)));
4104+
4105+
// Update context
4106+
update_context_from_coprocessor(&target_context, returned_context, &context_config)
4107+
.unwrap();
4108+
4109+
// k1 should be deleted (was sent but missing from returned context)
4110+
assert!(!target_context.contains_key("k1"));
4111+
// key_not_sent should be preserved (wasn't sent to coprocessor)
4112+
assert_eq!(
4113+
target_context.get_json_value("key_not_sent"),
4114+
Some(serde_json_bytes::json!("preserved_value"))
4115+
);
4116+
}
40024117
}

0 commit comments

Comments
 (0)