diff --git a/ATTRIBUTIONS-Rust.md b/ATTRIBUTIONS-Rust.md index 725f4f47..8790b303 100644 --- a/ATTRIBUTIONS-Rust.md +++ b/ATTRIBUTIONS-Rust.md @@ -17183,6 +17183,215 @@ limitations under the License. ``` +## lazy_static - 1.5.0 +**Repository URL**: https://github.com/rust-lang-nursery/lazy-static.rs +**License Type(s)**: Apache-2.0 +### License: https://spdx.org/licenses/Apache-2.0.html +``` + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + +Copyright [yyyy] [name of copyright owner] + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +``` + ## libc - 0.2.185 **Repository URL**: https://github.com/rust-lang/libc **License Type(s)**: Apache-2.0 @@ -27969,6 +28178,33 @@ limitations under the License. ``` +## sharded-slab - 0.1.7 +**Repository URL**: https://github.com/hawkw/sharded-slab +**License Type(s)**: MIT +### License: https://spdx.org/licenses/MIT.html +``` +Copyright (c) 2019 Eliza Weisman + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. + +``` + ## shell-words - 1.1.1 **Repository URL**: https://github.com/tmiasko/shell-words **License Type(s)**: Apache-2.0 @@ -30215,6 +30451,215 @@ limitations under the License. ``` +## thread_local - 1.1.9 +**Repository URL**: https://github.com/Amanieu/thread_local-rs +**License Type(s)**: Apache-2.0 +### License: https://spdx.org/licenses/Apache-2.0.html +``` + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + +Copyright [yyyy] [name of copyright owner] + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +``` + ## tinystr - 0.8.3 **Repository URL**: https://github.com/unicode-org/icu4x **License Type(s)**: Unicode-3.0 @@ -31929,6 +32374,39 @@ DEALINGS IN THE SOFTWARE. ``` +## tracing-subscriber - 0.3.23 +**Repository URL**: https://github.com/tokio-rs/tracing +**License Type(s)**: MIT +### License: https://spdx.org/licenses/MIT.html +``` +Copyright (c) 2019 Tokio Contributors + +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. + +``` + ## try-lock - 0.2.5 **Repository URL**: https://github.com/seanmonstar/try-lock **License Type(s)**: MIT diff --git a/Cargo.lock b/Cargo.lock index d8017154..1df8dfaf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1040,6 +1040,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "lazy_static" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" + [[package]] name = "libc" version = "0.2.185" @@ -1212,6 +1218,8 @@ dependencies = [ "tokio", "tokio-stream", "tonic", + "tracing", + "tracing-subscriber", "typed-builder", "uuid", "wasm-bindgen", @@ -2158,6 +2166,15 @@ dependencies = [ "digest", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shell-words" version = "1.1.1" @@ -2320,6 +2337,15 @@ dependencies = [ "syn", ] +[[package]] +name = "thread_local" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185" +dependencies = [ + "cfg-if", +] + [[package]] name = "tinystr" version = "0.8.3" @@ -2563,6 +2589,17 @@ dependencies = [ "once_cell", ] +[[package]] +name = "tracing-subscriber" +version = "0.3.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7f578e5945fb242538965c2d0b04418d38ec25c79d160cd279bf0731c8d319" +dependencies = [ + "sharded-slab", + "thread_local", + "tracing-core", +] + [[package]] name = "try-lock" version = "0.2.5" diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 898b1614..e16c1798 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -58,6 +58,8 @@ bitflags = { version = "2", features = ["serde"] } thiserror = "2" tokio = { version = "1", default-features = false, features = ["rt", "macros", "sync"] } tokio-stream = { version = "0.1", default-features = false } +tracing = "0.1" +tracing-subscriber = { version = "0.3", default-features = false, features = ["registry", "std"] } typed-builder = "0.23.2" opentelemetry = { version = "0.31", default-features = false, features = ["trace"], optional = true } opentelemetry_sdk = { version = "0.31", default-features = false, features = ["trace"], optional = true } @@ -74,6 +76,7 @@ web-sys = { version = "0.3", features = ["Headers", "Request", "RequestInit", "R tokio = { version = "1", features = ["rt", "macros", "sync", "test-util", "rt-multi-thread"] } futures = "0.3" opentelemetry_sdk = { version = "0.31", default-features = false, features = ["trace", "testing"] } +tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt", "registry", "std"] } serde_json = "1" [[test]] diff --git a/crates/core/src/api/event.rs b/crates/core/src/api/event.rs index fb73eb8a..5bb8e521 100644 --- a/crates/core/src/api/event.rs +++ b/crates/core/src/api/event.rs @@ -424,7 +424,7 @@ impl Event { /// Return this event as canonical JSON. pub fn to_json_string(&self) -> serde_json::Result { - serde_json::to_string(&self.try_to_json_value()?) + serde_json::to_string(self) } /// Return the lifecycle phase for scope events. diff --git a/crates/core/src/api/runtime.rs b/crates/core/src/api/runtime.rs index 371fd6dc..3aeb5026 100644 --- a/crates/core/src/api/runtime.rs +++ b/crates/core/src/api/runtime.rs @@ -21,4 +21,5 @@ pub use scope_stack::{ restore_thread_scope_stack, scope_stack_active, set_thread_scope_stack, sync_thread_scope_stack, task_scope_push, task_scope_remove, task_scope_top, }; +pub(crate) use state::EventSubscriberSnapshot; pub use state::NemoRelayContextState; diff --git a/crates/core/src/api/runtime/scope_stack.rs b/crates/core/src/api/runtime/scope_stack.rs index 41340745..c6310f34 100644 --- a/crates/core/src/api/runtime/scope_stack.rs +++ b/crates/core/src/api/runtime/scope_stack.rs @@ -200,7 +200,13 @@ impl ScopeStack { self.stack .iter() .filter_map(|handle| self.scope_registries.get(&handle.uuid)) - .flat_map(|registries| registries.event_subscribers.values().cloned()) + .flat_map(|registries| { + registries + .event_subscribers + .values() + .chain(registries.anonymous_event_subscribers.values()) + .cloned() + }) .collect() } } diff --git a/crates/core/src/api/runtime/state.rs b/crates/core/src/api/runtime/state.rs index 45398668..0cc64685 100644 --- a/crates/core/src/api/runtime/state.rs +++ b/crates/core/src/api/runtime/state.rs @@ -10,7 +10,8 @@ use std::any::Any; use std::collections::HashMap; -use std::sync::Arc; +use std::panic::{AssertUnwindSafe, catch_unwind, resume_unwind}; +use std::sync::{Arc, Mutex, OnceLock}; use crate::api::event::{ BaseEvent, CategoryProfile, Event, EventCategory, MarkEvent, ScopeCategory, ScopeEvent, @@ -26,6 +27,7 @@ use crate::api::runtime::callbacks::{ ToolInterceptFn, ToolSanitizeFn, }; use crate::api::scope::{CreateScopeHandleParams, EndScopeHandleParams, ScopeHandle, ScopeType}; +use crate::api::subscriber::EVENT_TRACE_TARGET; use crate::api::tool::ToolHandle; use crate::api::tool::{CreateToolHandleParams, EndToolHandleParams}; use crate::codec::request::AnnotatedLlmRequest; @@ -33,12 +35,360 @@ use crate::codec::response::AnnotatedLlmResponse; use crate::context::registries::{ merge_execution_intercept_callables, merge_guardrail_entries, merge_intercept_entries, }; +use crate::error::{FlowError, Result}; use crate::json::{Json, merge_json}; use crate::registry::SortedRegistry; use chrono::{Duration, Utc}; use serde_json::json; +use tracing::Span; +use tracing::dispatcher::{self, Dispatch}; use uuid::Uuid; +#[derive(Clone)] +struct TracedSpan { + span: Span, + dispatch: Dispatch, +} + +/// Snapshot of subscribers visible to one emitted event. +/// +/// Global subscribers are stored behind a copy-on-write `Arc>` so event +/// emission can clone one pointer instead of cloning every global callback on +/// every event. Scope-local subscribers are still cloned per event because +/// their visibility depends on the active scope stack. +pub(crate) struct EventSubscriberSnapshot { + global: Arc>, + scope_local: Vec, +} + +impl EventSubscriberSnapshot { + fn new(global: Arc>, scope_local: &[EventSubscriberFn]) -> Self { + Self { + global, + scope_local: scope_local.to_vec(), + } + } + + #[cfg(test)] + pub(crate) fn len(&self) -> usize { + self.global.len() + self.scope_local.len() + } + + fn for_each(&self, mut f: impl FnMut(&EventSubscriberFn)) { + for subscriber in self.global.iter().chain(self.scope_local.iter()) { + f(subscriber); + } + } +} + +fn traced_spans() -> &'static Mutex> { + static SPANS: OnceLock>> = OnceLock::new(); + SPANS.get_or_init(|| Mutex::new(HashMap::new())) +} + +fn current_host_dispatch() -> Dispatch { + dispatcher::get_default(Clone::clone) +} + +fn emit_runtime_event(event: &Event, subscribers: &EventSubscriberSnapshot) { + let inserted_traced_scope = + should_emit_tracing_event(event) && emit_tracing_event_best_effort(event); + + let delivery = catch_unwind(AssertUnwindSafe(|| { + subscribers.for_each(|subscriber| subscriber(event)); + })); + if let Err(payload) = delivery { + if inserted_traced_scope { + remove_traced_span(event.uuid()); + } + resume_unwind(payload); + } +} + +fn should_emit_tracing_event(event: &Event) -> bool { + host_tracing_enabled() || has_traced_span_context(event) +} + +fn host_tracing_enabled() -> bool { + catch_unwind(AssertUnwindSafe( + || tracing::enabled!(target: EVENT_TRACE_TARGET, tracing::Level::INFO), + )) + .unwrap_or(false) +} + +fn has_traced_span_context(event: &Event) -> bool { + let tracked_uuid = match event.scope_category() { + Some(ScopeCategory::End) => Some(event.uuid()), + Some(ScopeCategory::Start) | None => event.parent_uuid(), + }; + tracked_uuid.is_some_and(|uuid| { + traced_spans() + .lock() + .is_ok_and(|guard| guard.contains_key(&uuid)) + }) +} + +fn emit_tracing_event_best_effort(event: &Event) -> bool { + match event.scope_category() { + Some(ScopeCategory::Start) => emit_scope_start(event), + Some(ScopeCategory::End) => { + emit_scope_end(event); + false + } + None => { + emit_mark(event); + false + } + } +} + +fn emit_scope_start(event: &Event) -> bool { + let mut inserted_span = false; + let result = catch_unwind(AssertUnwindSafe(|| { + let parent = event.parent_uuid().and_then(|uuid| { + traced_spans() + .lock() + .ok() + .and_then(|guard| guard.get(&uuid).cloned()) + }); + let dispatch = parent + .as_ref() + .map(|parent| parent.dispatch.clone()) + .unwrap_or_else(current_host_dispatch); + let event_json = event.to_json_string().unwrap_or_default(); + let parent_uuid = event.parent_uuid().map(|uuid| uuid.to_string()); + let category = event + .category() + .map(|category| category.as_str()) + .unwrap_or(""); + let scope_category = scope_category(event); + let input = event_data(event.input()); + + let span = dispatcher::with_default(&dispatch, || match parent.as_ref() { + Some(parent) => tracing::info_span!( + target: EVENT_TRACE_TARGET, + parent: &parent.span, + "nemo_relay.scope", + atof.version = crate::api::event::ATOF_VERSION, + event.kind = event.kind(), + event.uuid = %event.uuid(), + event.parent_uuid = parent_uuid.as_deref().unwrap_or(""), + event.name = event.name(), + event.category = category, + event.scope_category = scope_category, + event.json = event_json.as_str(), + event.input = input.as_deref().unwrap_or(""), + event.output = tracing::field::Empty, + ), + None => tracing::info_span!( + target: EVENT_TRACE_TARGET, + "nemo_relay.scope", + atof.version = crate::api::event::ATOF_VERSION, + event.kind = event.kind(), + event.uuid = %event.uuid(), + event.parent_uuid = parent_uuid.as_deref().unwrap_or(""), + event.name = event.name(), + event.category = category, + event.scope_category = scope_category, + event.json = event_json.as_str(), + event.input = input.as_deref().unwrap_or(""), + event.output = tracing::field::Empty, + ), + }); + + if let Ok(mut guard) = traced_spans().lock() { + guard.insert( + event.uuid(), + TracedSpan { + span: span.clone(), + dispatch: dispatch.clone(), + }, + ); + inserted_span = true; + } + + dispatcher::with_default(&dispatch, || { + let _entered = span.enter(); + tracing::info!( + target: EVENT_TRACE_TARGET, + { + atof.version = crate::api::event::ATOF_VERSION, + event.kind = event.kind(), + event.uuid = %event.uuid(), + event.parent_uuid = parent_uuid.as_deref().unwrap_or(""), + event.name = event.name(), + event.category = category, + event.scope_category = scope_category, + event.json = event_json.as_str(), + event.input = input.as_deref().unwrap_or(""), + }, + "nemo_relay.event" + ); + }); + })); + + match result { + Ok(()) => inserted_span, + Err(_) => { + if inserted_span { + remove_traced_span(event.uuid()); + } + false + } + } +} + +fn remove_traced_span(uuid: Uuid) { + let _ = traced_spans().lock().map(|mut guard| guard.remove(&uuid)); +} + +fn emit_scope_end(event: &Event) { + let traced_span = traced_spans() + .lock() + .ok() + .and_then(|guard| guard.get(&event.uuid()).cloned()); + let dispatch = traced_span + .as_ref() + .map(|span| span.dispatch.clone()) + .unwrap_or_else(current_host_dispatch); + let event_json = event.to_json_string().unwrap_or_default(); + let parent_uuid = event.parent_uuid().map(|uuid| uuid.to_string()); + let category = event + .category() + .map(|category| category.as_str()) + .unwrap_or(""); + let scope_category = scope_category(event); + let output = event_data(event.output()); + + let _ = catch_unwind(AssertUnwindSafe(|| { + dispatcher::with_default(&dispatch, || match traced_span.as_ref() { + Some(traced_span) => { + traced_span + .span + .record("event.output", output.as_deref().unwrap_or("")); + let _entered = traced_span.span.enter(); + emit_scope_end_tracing_event( + event, + parent_uuid.as_deref().unwrap_or(""), + category, + scope_category, + event_json.as_str(), + output.as_deref().unwrap_or(""), + ); + } + None => emit_scope_end_tracing_event( + event, + parent_uuid.as_deref().unwrap_or(""), + category, + scope_category, + event_json.as_str(), + output.as_deref().unwrap_or(""), + ), + }); + })); + + if let Some(uuid) = traced_span.as_ref().map(|_| event.uuid()) { + remove_traced_span(uuid); + } +} + +fn emit_scope_end_tracing_event( + event: &Event, + parent_uuid: &str, + category: &str, + scope_category: &str, + event_json: &str, + output: &str, +) { + tracing::info!( + target: EVENT_TRACE_TARGET, + { + atof.version = crate::api::event::ATOF_VERSION, + event.kind = event.kind(), + event.uuid = %event.uuid(), + event.parent_uuid = parent_uuid, + event.name = event.name(), + event.category = category, + event.scope_category = scope_category, + event.json = event_json, + event.output = output, + }, + "nemo_relay.event" + ); +} + +fn emit_mark(event: &Event) { + let parent = event.parent_uuid().and_then(|uuid| { + traced_spans() + .lock() + .ok() + .and_then(|guard| guard.get(&uuid).cloned()) + }); + let dispatch = parent + .as_ref() + .map(|parent| parent.dispatch.clone()) + .unwrap_or_else(current_host_dispatch); + let event_json = event.to_json_string().unwrap_or_default(); + let parent_uuid = event.parent_uuid().map(|uuid| uuid.to_string()); + let category = event + .category() + .map(|category| category.as_str()) + .unwrap_or(""); + let data = event_data(event.data()); + + let _ = catch_unwind(AssertUnwindSafe(|| { + dispatcher::with_default(&dispatch, || match parent.as_ref() { + Some(parent) => { + let _entered = parent.span.enter(); + tracing::info!( + target: EVENT_TRACE_TARGET, + { + atof.version = crate::api::event::ATOF_VERSION, + event.kind = event.kind(), + event.uuid = %event.uuid(), + event.parent_uuid = parent_uuid.as_deref().unwrap_or(""), + event.name = event.name(), + event.category = category, + event.scope_category = "", + event.json = event_json.as_str(), + event.data = data.as_deref().unwrap_or(""), + }, + "nemo_relay.event" + ); + } + None => { + tracing::info!( + target: EVENT_TRACE_TARGET, + { + atof.version = crate::api::event::ATOF_VERSION, + event.kind = event.kind(), + event.uuid = %event.uuid(), + event.parent_uuid = parent_uuid.as_deref().unwrap_or(""), + event.name = event.name(), + event.category = category, + event.scope_category = "", + event.json = event_json.as_str(), + event.data = data.as_deref().unwrap_or(""), + }, + "nemo_relay.event" + ); + } + }); + })); +} + +fn scope_category(event: &Event) -> &'static str { + match event.scope_category() { + Some(ScopeCategory::Start) => "start", + Some(ScopeCategory::End) => "end", + None => "", + } +} + +fn event_data(data: Option<&serde_json::Value>) -> Option { + data.and_then(|value| serde_json::to_string(value).ok()) +} + /// Process-global runtime state backing middleware and event emission. /// /// The public API layer stores one shared instance of this type for the @@ -69,7 +419,15 @@ pub struct NemoRelayContextState { pub(crate) llm_stream_execution_intercepts: SortedRegistry>, /// Global lifecycle subscribers notified after runtime events are emitted. - pub(crate) event_subscribers: HashMap, + event_subscribers: HashMap, + /// Anonymous global lifecycle subscribers owned by closeable handles. + anonymous_event_subscribers: HashMap, + /// Copy-on-write snapshot of global subscribers used by event emission. + event_subscriber_snapshot: Arc>, + /// Monotonic version bumped whenever global subscriber registrations change. + event_subscriber_generation: u64, + /// Generation represented by `event_subscriber_snapshot`. + event_subscriber_snapshot_generation: u64, /// Arbitrary binding- or integration-specific runtime extensions. pub(crate) extensions: HashMap>, } @@ -94,6 +452,10 @@ impl NemoRelayContextState { llm_execution_intercepts: SortedRegistry::new(), llm_stream_execution_intercepts: SortedRegistry::new(), event_subscribers: HashMap::new(), + anonymous_event_subscribers: HashMap::new(), + event_subscriber_snapshot: Arc::new(Vec::new()), + event_subscriber_generation: 0, + event_subscriber_snapshot_generation: 0, extensions: HashMap::new(), } } @@ -162,23 +524,96 @@ impl NemoRelayContextState { pub(crate) fn collect_event_subscribers( &self, scope_local_subscribers: &[EventSubscriberFn], - ) -> Vec { - let mut subscribers = - Vec::with_capacity(self.event_subscribers.len() + scope_local_subscribers.len()); + ) -> EventSubscriberSnapshot { + EventSubscriberSnapshot::new( + self.global_event_subscriber_snapshot(), + scope_local_subscribers, + ) + } + + pub(crate) fn insert_anonymous_event_subscriber( + &mut self, + id: Uuid, + callback: EventSubscriberFn, + ) { + self.anonymous_event_subscribers.insert(id, callback); + self.refresh_event_subscriber_snapshot(); + } + + pub(crate) fn remove_anonymous_event_subscriber(&mut self, id: &Uuid) -> bool { + let removed = self.anonymous_event_subscribers.remove(id).is_some(); + if removed { + self.refresh_event_subscriber_snapshot(); + } + removed + } + + pub(crate) fn register_event_subscriber( + &mut self, + name: &str, + callback: EventSubscriberFn, + ) -> Result<()> { + if self.event_subscribers.contains_key(name) { + return Err(FlowError::AlreadyExists(format!( + "{name} subscriber already exists" + ))); + } + self.event_subscribers.insert(name.to_string(), callback); + self.refresh_event_subscriber_snapshot(); + Ok(()) + } + + pub(crate) fn deregister_event_subscriber(&mut self, name: &str) -> bool { + let removed = self.event_subscribers.remove(name).is_some(); + if removed { + self.refresh_event_subscriber_snapshot(); + } + removed + } + + #[cfg(test)] + pub(crate) fn event_subscribers_is_empty(&self) -> bool { + self.event_subscribers.is_empty() + } + + #[cfg(test)] + pub(crate) fn event_subscriber_names(&self) -> Vec { + self.event_subscribers.keys().cloned().collect() + } + + fn build_global_event_subscriber_snapshot(&self) -> Vec { + let mut subscribers = Vec::with_capacity( + self.event_subscribers.len() + self.anonymous_event_subscribers.len(), + ); subscribers.extend(self.event_subscribers.values().cloned()); - subscribers.extend(scope_local_subscribers.iter().cloned()); + subscribers.extend(self.anonymous_event_subscribers.values().cloned()); subscribers } + fn global_event_subscriber_snapshot(&self) -> Arc> { + let expected_len = self.event_subscribers.len() + self.anonymous_event_subscribers.len(); + if self.event_subscriber_snapshot_generation == self.event_subscriber_generation + && self.event_subscriber_snapshot.len() == expected_len + { + self.event_subscriber_snapshot.clone() + } else { + Arc::new(self.build_global_event_subscriber_snapshot()) + } + } + + pub(crate) fn refresh_event_subscriber_snapshot(&mut self) { + self.event_subscriber_generation = self.event_subscriber_generation.wrapping_add(1); + self.event_subscriber_snapshot = Arc::new(self.build_global_event_subscriber_snapshot()); + self.event_subscriber_snapshot_generation = self.event_subscriber_generation; + } + /// Deliver an event to every subscriber in order. /// /// # Parameters /// - `event`: Fully constructed lifecycle event to deliver. /// - `subscribers`: Subscribers that should observe the event. - pub(crate) fn emit_event(event: &Event, subscribers: &[EventSubscriberFn]) { - for subscriber in subscribers { - subscriber(event); - } + pub(crate) fn emit_event(event: &Event, subscribers: &EventSubscriberSnapshot) { + emit_runtime_event(event, subscribers); } /// Build a standalone mark event. @@ -535,7 +970,7 @@ impl NemoRelayContextState { parent_uuid: Option, metadata: Option, input: Json, - subscribers: &[EventSubscriberFn], + subscribers: &EventSubscriberSnapshot, ) -> ScopeHandle { let handle = ScopeHandle::builder() .name(name) @@ -564,7 +999,7 @@ impl NemoRelayContextState { fn emit_guardrail_scope_end( handle: &ScopeHandle, output: Json, - subscribers: &[EventSubscriberFn], + subscribers: &EventSubscriberSnapshot, ) { let event = Event::Scope(ScopeEvent::new( BaseEvent::builder() @@ -681,7 +1116,7 @@ impl NemoRelayContextState { name: &str, args: &Json, entries: &[Guardrail], - subscribers: &[EventSubscriberFn], + subscribers: &EventSubscriberSnapshot, parent_uuid: Option, metadata: Option, ) -> crate::error::Result> { @@ -874,7 +1309,7 @@ impl NemoRelayContextState { pub(crate) fn llm_conditional_execution_snapshot_chain( request: &LlmRequest, entries: &[Guardrail], - subscribers: &[EventSubscriberFn], + subscribers: &EventSubscriberSnapshot, parent_uuid: Option, metadata: Option, ) -> crate::error::Result> { diff --git a/crates/core/src/api/shared.rs b/crates/core/src/api/shared.rs index 7793808d..0621bdeb 100644 --- a/crates/core/src/api/shared.rs +++ b/crates/core/src/api/shared.rs @@ -6,8 +6,8 @@ use std::sync::Arc; use uuid::Uuid; use crate::api::llm::LlmRequest; -use crate::api::runtime::EventSubscriberFn; use crate::api::runtime::global_context; +use crate::api::runtime::{EventSubscriberFn, EventSubscriberSnapshot}; use crate::api::runtime::{current_scope_stack, task_scope_top}; use crate::api::scope::ScopeHandle; use crate::codec::request::AnnotatedLlmRequest; @@ -25,7 +25,7 @@ pub(crate) fn resolve_parent_uuid(parent: Option<&ScopeHandle>) -> Option pub(crate) fn snapshot_event_subscribers( scope_local_subscribers: Vec, -) -> Result> { +) -> Result { let context = global_context(); let state = context .read() diff --git a/crates/core/src/api/subscriber.rs b/crates/core/src/api/subscriber.rs index 96482d61..96dbbb16 100644 --- a/crates/core/src/api/subscriber.rs +++ b/crates/core/src/api/subscriber.rs @@ -1,11 +1,350 @@ // SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-License-Identifier: Apache-2.0 +use crate::api::event::Event; use crate::api::runtime::EventSubscriberFn; +use crate::api::runtime::ScopeStackHandle; use crate::api::runtime::current_scope_stack; use crate::api::runtime::global_context; use crate::api::shared::ensure_runtime_owner; use crate::error::{FlowError, Result}; +use std::fmt; +use std::sync::Mutex; +use std::sync::atomic::{AtomicBool, Ordering}; +use tracing::field::{Field, Visit}; +use tracing::span::{Attributes, Id, Record}; +use tracing::subscriber::Interest; +use tracing::{Event as TracingEvent, Metadata, Subscriber as TracingSubscriber}; +use tracing_subscriber::layer::Context as LayerContext; +use uuid::Uuid; + +/// `tracing` target used by NeMo Relay lifecycle event records. +/// +/// External tracing layers can filter on this target before decoding records +/// with [`event_from_tracing`]. +pub const EVENT_TRACE_TARGET: &str = "nemo_relay::events"; + +/// `tracing` field containing the canonical ATOF JSON representation. +/// +/// The field is part of NeMo Relay's Rust tracing integration contract. Prefer +/// [`event_from_tracing`] over reading this field directly when a library wants +/// a canonical [`Event`]. +pub const EVENT_JSON_FIELD: &str = "event.json"; + +/// Callback-backed NeMo Relay event subscriber. +/// +/// This adapter consumes the structured `tracing` records emitted by the core +/// runtime, reconstructs the canonical NeMo Relay [`Event`], and invokes the +/// configured callback. It implements [`tracing::Subscriber`] and +/// [`tracing_subscriber::Layer`] so Rust hosts can install it directly as a +/// tracing collector or compose it into an existing subscriber stack. +pub struct Subscriber { + callback: EventSubscriberFn, + next_span_id: Mutex, +} + +impl Subscriber { + /// Create a tracing-compatible subscriber from a NeMo Relay event callback. + pub fn new(callback: EventSubscriberFn) -> Self { + Self { + callback, + next_span_id: Mutex::new(1), + } + } + + /// Return a clone of the callback used by this subscriber. + pub fn callback(&self) -> EventSubscriberFn { + self.callback.clone() + } + + /// Convert this subscriber back into its callback. + pub fn into_callback(self) -> EventSubscriberFn { + self.callback + } + + fn observe_tracing_event(&self, event: &TracingEvent<'_>) { + if let Some(event) = event_from_tracing(event) { + (self.callback)(&event); + } + } +} + +impl TracingSubscriber for Subscriber { + fn enabled(&self, metadata: &Metadata<'_>) -> bool { + is_nemo_relay_event(metadata) + } + + fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest { + if is_nemo_relay_event(metadata) { + Interest::always() + } else { + Interest::never() + } + } + + fn new_span(&self, _span: &Attributes<'_>) -> Id { + let mut next_span_id = self + .next_span_id + .lock() + .unwrap_or_else(|error| error.into_inner()); + let id = *next_span_id; + *next_span_id = next_span_id.saturating_add(1); + Id::from_u64(id) + } + + fn record(&self, _span: &Id, _values: &Record<'_>) {} + + fn record_follows_from(&self, _span: &Id, _follows: &Id) {} + + fn event(&self, event: &TracingEvent<'_>) { + self.observe_tracing_event(event); + } + + fn enter(&self, _span: &Id) {} + + fn exit(&self, _span: &Id) {} +} + +impl tracing_subscriber::Layer for Subscriber +where + S: TracingSubscriber, +{ + fn on_event(&self, event: &TracingEvent<'_>, _ctx: LayerContext<'_, S>) { + self.observe_tracing_event(event); + } +} + +/// Create a tracing-subscriber layer that consumes NeMo Relay lifecycle events. +/// +/// This is the canonical Rust integration point for libraries that want to +/// receive NeMo Relay events through `tracing-subscriber` composition instead +/// of registering directly in the NeMo Relay subscriber registry. +pub fn tracing_layer(callback: EventSubscriberFn) -> Subscriber { + Subscriber::new(callback) +} + +/// Return `true` when tracing metadata belongs to a NeMo Relay lifecycle event. +/// +/// External layers can use this as a cheap filter before calling +/// [`event_from_tracing`]. +pub fn is_nemo_relay_event(metadata: &Metadata<'_>) -> bool { + metadata.target() == EVENT_TRACE_TARGET +} + +/// Decode a canonical NeMo Relay [`Event`] from a `tracing` event record. +/// +/// Returns `None` when the tracing event is not a NeMo Relay lifecycle record or +/// when the record does not contain a valid canonical event payload. +pub fn event_from_tracing(event: &TracingEvent<'_>) -> Option { + if !is_nemo_relay_event(event.metadata()) { + return None; + } + + let mut visitor = EventJsonVisitor::default(); + event.record(&mut visitor); + let event_json = visitor.event_json?; + serde_json::from_str::(&event_json).ok() +} + +#[derive(Default)] +struct EventJsonVisitor { + event_json: Option, +} + +impl Visit for EventJsonVisitor { + fn record_str(&mut self, field: &Field, value: &str) { + if field.name() == EVENT_JSON_FIELD { + self.event_json = Some(value.to_owned()); + } + } + + fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) { + if field.name() == EVENT_JSON_FIELD && self.event_json.is_none() { + self.event_json = Some(format!("{value:?}")); + } + } +} + +/// Closeable handle for an anonymous global event subscriber. +/// +/// Dropping the handle performs a best-effort deregistration. Call +/// [`SubscriberHandle::close`] when the caller needs to know whether the +/// subscriber was removed. +pub struct SubscriberHandle { + id: Uuid, + closed: AtomicBool, +} + +impl SubscriberHandle { + fn new(id: Uuid) -> Self { + Self { + id, + closed: AtomicBool::new(false), + } + } + + /// Return the runtime-generated identifier for this subscription. + /// + /// The identifier is opaque and is intended for diagnostics only. + pub fn id(&self) -> Uuid { + self.id + } + + /// Deregister this subscriber. + /// + /// Returns `true` when the subscriber was still registered and was removed. + /// Returns `false` when the handle had already been closed. + pub fn close(&self) -> Result { + if self + .closed + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) + .is_err() + { + return Ok(false); + } + + match close_global_subscription(self.id) { + Ok(removed) => Ok(removed), + Err(error) => { + self.closed.store(false, Ordering::Release); + Err(error) + } + } + } +} + +impl Drop for SubscriberHandle { + fn drop(&mut self) { + let _ = self.close(); + } +} + +/// Closeable handle for an anonymous scope-local event subscriber. +/// +/// Dropping the handle performs a best-effort deregistration. Scope pop also +/// removes the subscriber automatically, so closing after scope cleanup returns +/// `Ok(false)`. +pub struct ScopeSubscriberHandle { + scope_uuid: Uuid, + id: Uuid, + scope_stack: ScopeStackHandle, + closed: AtomicBool, +} + +impl ScopeSubscriberHandle { + fn new(scope_uuid: Uuid, id: Uuid, scope_stack: ScopeStackHandle) -> Self { + Self { + scope_uuid, + id, + scope_stack, + closed: AtomicBool::new(false), + } + } + + /// Return the runtime-generated identifier for this subscription. + /// + /// The identifier is opaque and is intended for diagnostics only. + pub fn id(&self) -> Uuid { + self.id + } + + /// Return the UUID of the scope that owns this subscription. + pub fn scope_uuid(&self) -> Uuid { + self.scope_uuid + } + + /// Deregister this scope-local subscriber. + /// + /// Returns `true` when the subscriber was still registered and was removed. + /// Returns `false` when the handle had already been closed or the owning + /// scope has already been popped. + pub fn close(&self) -> Result { + if self + .closed + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) + .is_err() + { + return Ok(false); + } + + match close_scope_subscription(&self.scope_stack, self.scope_uuid, self.id) { + Ok(removed) => Ok(removed), + Err(error) => { + self.closed.store(false, Ordering::Release); + Err(error) + } + } + } +} + +impl Drop for ScopeSubscriberHandle { + fn drop(&mut self) { + let _ = self.close(); + } +} + +/// Register an anonymous global lifecycle event subscriber. +/// +/// The returned handle owns the registration and can be closed explicitly or +/// dropped to deregister the subscriber. +pub fn subscribe(callback: EventSubscriberFn) -> Result { + ensure_runtime_owner()?; + let id = Uuid::now_v7(); + let context = global_context(); + let mut state = context + .write() + .map_err(|error| FlowError::Internal(error.to_string()))?; + state.insert_anonymous_event_subscriber(id, callback); + Ok(SubscriberHandle::new(id)) +} + +/// Register an anonymous scope-local lifecycle event subscriber. +/// +/// The returned handle owns the registration and captures the active scope +/// stack so it can be closed even if another scope stack is current later. +pub fn scope_subscribe( + scope_uuid: &Uuid, + callback: EventSubscriberFn, +) -> Result { + ensure_runtime_owner()?; + let id = Uuid::now_v7(); + let scope_stack = current_scope_stack(); + { + let mut guard = scope_stack + .write() + .map_err(|error| FlowError::Internal(error.to_string()))?; + let registries = guard + .local_registries_mut(scope_uuid) + .ok_or_else(|| FlowError::NotFound(format!("scope {scope_uuid} not found")))?; + registries.anonymous_event_subscribers.insert(id, callback); + } + Ok(ScopeSubscriberHandle::new(*scope_uuid, id, scope_stack)) +} + +fn close_global_subscription(id: Uuid) -> Result { + ensure_runtime_owner()?; + let context = global_context(); + let mut state = context + .write() + .map_err(|error| FlowError::Internal(error.to_string()))?; + Ok(state.remove_anonymous_event_subscriber(&id)) +} + +fn close_scope_subscription( + scope_stack: &ScopeStackHandle, + scope_uuid: Uuid, + id: Uuid, +) -> Result { + ensure_runtime_owner()?; + let mut guard = scope_stack + .write() + .map_err(|error| FlowError::Internal(error.to_string()))?; + let registries = match guard.local_registries_mut(&scope_uuid) { + Some(registries) => registries, + None => return Ok(false), + }; + Ok(registries.anonymous_event_subscribers.remove(&id).is_some()) +} /// Register a global lifecycle event subscriber. /// @@ -31,13 +370,7 @@ pub fn register_subscriber(name: &str, callback: EventSubscriberFn) -> Result<() let mut state = context .write() .map_err(|error| FlowError::Internal(error.to_string()))?; - if state.event_subscribers.contains_key(name) { - return Err(FlowError::AlreadyExists(format!( - "{name} subscriber already exists" - ))); - } - state.event_subscribers.insert(name.to_string(), callback); - Ok(()) + state.register_event_subscriber(name, callback) } /// Deregister a global lifecycle event subscriber. @@ -62,7 +395,7 @@ pub fn deregister_subscriber(name: &str) -> Result { let mut state = context .write() .map_err(|error| FlowError::Internal(error.to_string()))?; - Ok(state.event_subscribers.remove(name).is_some()) + Ok(state.deregister_event_subscriber(name)) } /// Register a scope-local lifecycle event subscriber. diff --git a/crates/core/src/context/registries.rs b/crates/core/src/context/registries.rs index 2a0d2fde..437b96ac 100644 --- a/crates/core/src/context/registries.rs +++ b/crates/core/src/context/registries.rs @@ -16,6 +16,7 @@ use crate::api::runtime::{ ToolExecutionFn, ToolInterceptFn, ToolSanitizeFn, }; use crate::registry::SortedRegistry; +use uuid::Uuid; /// Scope-owned middleware registries and subscribers. /// @@ -49,6 +50,8 @@ pub(crate) struct ScopeLocalRegistries { SortedRegistry>, /// Scope-local lifecycle subscribers visible while the owning scope is active. pub(crate) event_subscribers: HashMap, + /// Anonymous scope-local lifecycle subscribers owned by closeable handles. + pub(crate) anonymous_event_subscribers: HashMap, } impl ScopeLocalRegistries { @@ -71,6 +74,7 @@ impl ScopeLocalRegistries { llm_execution_intercepts: SortedRegistry::new(), llm_stream_execution_intercepts: SortedRegistry::new(), event_subscribers: HashMap::new(), + anonymous_event_subscribers: HashMap::new(), } } } diff --git a/crates/core/tests/integration/api_surface_tests.rs b/crates/core/tests/integration/api_surface_tests.rs index 3054be47..0b77d4dd 100644 --- a/crates/core/tests/integration/api_surface_tests.rs +++ b/crates/core/tests/integration/api_surface_tests.rs @@ -5,7 +5,11 @@ #![allow(clippy::await_holding_lock)] +use std::fmt; +use std::io::Write; +use std::panic::{AssertUnwindSafe, catch_unwind}; use std::pin::Pin; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use chrono::{DateTime, TimeDelta, Utc}; @@ -46,12 +50,13 @@ use nemo_relay::api::registry::{ use nemo_relay::api::runtime::NemoRelayContextState; use nemo_relay::api::runtime::global_context; use nemo_relay::api::runtime::{LlmExecutionNextFn, LlmStreamExecutionNextFn, ToolExecutionNextFn}; -use nemo_relay::api::runtime::{create_scope_stack, set_thread_scope_stack}; +use nemo_relay::api::runtime::{create_scope_stack, current_scope_stack, set_thread_scope_stack}; use nemo_relay::api::scope::ScopeType; use nemo_relay::api::scope::{event, pop_scope, push_scope}; use nemo_relay::api::subscriber::{ - deregister_subscriber, register_subscriber, scope_deregister_subscriber, - scope_register_subscriber, + Subscriber as NemoSubscriber, deregister_subscriber, event_from_tracing, is_nemo_relay_event, + register_subscriber, scope_deregister_subscriber, scope_register_subscriber, scope_subscribe, + subscribe, tracing_layer, }; use nemo_relay::api::tool::ToolAttributes; use nemo_relay::api::tool::{ @@ -62,9 +67,129 @@ use nemo_relay::error::{FlowError, Result}; use nemo_relay::json::Json; use serde_json::{Map, json}; use tokio_stream::Stream; +use tracing::field::{Field, Visit}; +use tracing_subscriber::prelude::*; static TEST_MUTEX: Mutex<()> = Mutex::new(()); +struct TraceBuffer(Arc>>); + +impl Write for TraceBuffer { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.0.lock().unwrap().extend_from_slice(buf); + Ok(buf.len()) + } + + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } +} + +struct RecordingTracingSubscriber { + events: Arc>>, + next_span_id: AtomicU64, + panic_on_any_event: bool, + panic_on_scope_category: Option<&'static str>, +} + +impl RecordingTracingSubscriber { + fn new(events: Arc>>) -> Self { + Self { + events, + next_span_id: AtomicU64::new(1), + panic_on_any_event: false, + panic_on_scope_category: None, + } + } + + fn panic_on_any_event(events: Arc>>) -> Self { + Self { + panic_on_any_event: true, + ..Self::new(events) + } + } + + fn panic_on_scope_category( + events: Arc>>, + scope_category: &'static str, + ) -> Self { + Self { + panic_on_scope_category: Some(scope_category), + ..Self::new(events) + } + } +} + +impl tracing::Subscriber for RecordingTracingSubscriber { + fn enabled(&self, metadata: &tracing::Metadata<'_>) -> bool { + metadata.target() == "nemo_relay::events" + } + + fn register_callsite( + &self, + metadata: &'static tracing::Metadata<'static>, + ) -> tracing::subscriber::Interest { + if self.enabled(metadata) { + tracing::subscriber::Interest::always() + } else { + tracing::subscriber::Interest::never() + } + } + + fn new_span(&self, _span: &tracing::span::Attributes<'_>) -> tracing::span::Id { + tracing::span::Id::from_u64(self.next_span_id.fetch_add(1, Ordering::Relaxed)) + } + + fn record(&self, _span: &tracing::span::Id, _values: &tracing::span::Record<'_>) {} + + fn record_follows_from(&self, _span: &tracing::span::Id, _follows: &tracing::span::Id) {} + + fn event(&self, event: &tracing::Event<'_>) { + let mut visitor = TracingEventJsonVisitor::default(); + event.record(&mut visitor); + + let mut should_panic = self.panic_on_any_event; + if let Some(event_json) = visitor.event_json + && let Ok(event) = serde_json::from_str::(&event_json) + { + if let Some(name) = event.get("name").and_then(Json::as_str) { + self.events.lock().unwrap().push(name.to_owned()); + } + if let Some(scope_category) = self.panic_on_scope_category { + should_panic |= event + .get("scope_category") + .and_then(Json::as_str) + .is_some_and(|value| value == scope_category); + } + } + + assert!(!should_panic, "test tracing subscriber panic"); + } + + fn enter(&self, _span: &tracing::span::Id) {} + + fn exit(&self, _span: &tracing::span::Id) {} +} + +#[derive(Default)] +struct TracingEventJsonVisitor { + event_json: Option, +} + +impl Visit for TracingEventJsonVisitor { + fn record_str(&mut self, field: &Field, value: &str) { + if field.name() == "event.json" { + self.event_json = Some(value.to_owned()); + } + } + + fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) { + if field.name() == "event.json" && self.event_json.is_none() { + self.event_json = Some(format!("{value:?}")); + } + } +} + fn reset_global() { let ctx = global_context(); let mut state = ctx.write().unwrap(); @@ -114,6 +239,525 @@ fn expect_not_found(error: FlowError, needle: &str) { } } +#[test] +fn test_anonymous_subscriber_handle_receives_events_and_closes_idempotently() { + let _lock = TEST_MUTEX.lock().unwrap(); + reset_global(); + setup_isolated_thread(); + + let events = Arc::new(Mutex::new(Vec::new())); + let sink = events.clone(); + let subscription = subscribe(Arc::new(move |event| { + sink.lock().unwrap().push(event.name().to_owned()); + })) + .unwrap(); + + event( + nemo_relay::api::scope::EmitMarkEventParams::builder() + .name("anonymous-subscription-before-close") + .build(), + ) + .unwrap(); + assert!(subscription.close().unwrap()); + assert!(!subscription.close().unwrap()); + event( + nemo_relay::api::scope::EmitMarkEventParams::builder() + .name("anonymous-subscription-after-close") + .build(), + ) + .unwrap(); + + assert_eq!( + events.lock().unwrap().as_slice(), + ["anonymous-subscription-before-close"] + ); +} + +#[test] +fn test_scope_subscriber_handle_closes_and_scope_pop_cleans_up() { + let _lock = TEST_MUTEX.lock().unwrap(); + reset_global(); + setup_isolated_thread(); + + let scope_handle = push_scope( + nemo_relay::api::scope::PushScopeParams::builder() + .name("scope-subscription-owner") + .scope_type(ScopeType::Agent) + .build(), + ) + .unwrap(); + + let explicit_events = Arc::new(Mutex::new(Vec::new())); + let explicit_sink = explicit_events.clone(); + let explicit = scope_subscribe( + &scope_handle.uuid, + Arc::new(move |event| explicit_sink.lock().unwrap().push(event.name().to_owned())), + ) + .unwrap(); + + event( + nemo_relay::api::scope::EmitMarkEventParams::builder() + .name("scope-subscription-before-close") + .parent(&scope_handle) + .build(), + ) + .unwrap(); + assert!(explicit.close().unwrap()); + assert!(!explicit.close().unwrap()); + event( + nemo_relay::api::scope::EmitMarkEventParams::builder() + .name("scope-subscription-after-close") + .parent(&scope_handle) + .build(), + ) + .unwrap(); + + assert_eq!( + explicit_events.lock().unwrap().as_slice(), + ["scope-subscription-before-close"] + ); + + let cleanup_events = Arc::new(Mutex::new(Vec::new())); + let cleanup_sink = cleanup_events.clone(); + let cleanup = scope_subscribe( + &scope_handle.uuid, + Arc::new(move |event| cleanup_sink.lock().unwrap().push(event.name().to_owned())), + ) + .unwrap(); + + pop_scope( + nemo_relay::api::scope::PopScopeParams::builder() + .handle_uuid(&scope_handle.uuid) + .build(), + ) + .unwrap(); + assert!(!cleanup.close().unwrap()); + + assert_eq!( + cleanup_events.lock().unwrap().as_slice(), + ["scope-subscription-owner"] + ); +} + +#[test] +fn test_runtime_tracing_emits_scoped_trace_events_without_global_subscriber() { + let _lock = TEST_MUTEX.lock().unwrap(); + reset_global(); + setup_isolated_thread(); + + let output = Arc::new(Mutex::new(Vec::new())); + let writer_output = output.clone(); + let subscriber = tracing_subscriber::fmt() + .with_ansi(false) + .without_time() + .with_target(false) + .with_writer(move || TraceBuffer(writer_output.clone())) + .finish(); + + tracing::subscriber::with_default(subscriber, || { + let scope_handle = push_scope( + nemo_relay::api::scope::PushScopeParams::builder() + .name("runtime-tracing-scope") + .scope_type(ScopeType::Agent) + .build(), + ) + .unwrap(); + event( + nemo_relay::api::scope::EmitMarkEventParams::builder() + .name("runtime-tracing-mark") + .parent(&scope_handle) + .data(json!({"step": 1})) + .build(), + ) + .unwrap(); + pop_scope( + nemo_relay::api::scope::PopScopeParams::builder() + .handle_uuid(&scope_handle.uuid) + .output(json!({"done": true})) + .build(), + ) + .unwrap(); + }); + + let rendered = String::from_utf8(output.lock().unwrap().clone()).unwrap(); + assert!(rendered.contains("atof.version")); + assert!(rendered.contains("event.uuid")); + assert!(rendered.contains("runtime-tracing-scope")); + assert!(rendered.contains("runtime-tracing-mark")); +} + +#[test] +fn test_subscriber_adapter_implements_tracing_subscriber_trait() { + let _lock = TEST_MUTEX.lock().unwrap(); + reset_global(); + setup_isolated_thread(); + + fn assert_tracing_subscriber(_subscriber: &T) {} + + let events = Arc::new(Mutex::new(Vec::new())); + let sink = events.clone(); + let subscriber = NemoSubscriber::new(Arc::new(move |event| { + sink.lock().unwrap().push(event.name().to_owned()); + })); + assert_tracing_subscriber(&subscriber); + + tracing::subscriber::with_default(subscriber, || { + event( + nemo_relay::api::scope::EmitMarkEventParams::builder() + .name("tracing-subscriber-adapter-mark") + .build(), + ) + .unwrap(); + }); + + assert_eq!( + events.lock().unwrap().as_slice(), + ["tracing-subscriber-adapter-mark"] + ); +} + +#[test] +fn test_subscriber_adapter_composes_as_tracing_subscriber_layer() { + let _lock = TEST_MUTEX.lock().unwrap(); + reset_global(); + setup_isolated_thread(); + + let events = Arc::new(Mutex::new(Vec::new())); + let sink = events.clone(); + let layer = tracing_layer(Arc::new(move |event| { + sink.lock().unwrap().push(event.name().to_owned()); + })); + let subscriber = tracing_subscriber::registry().with(layer); + + tracing::subscriber::with_default(subscriber, || { + event( + nemo_relay::api::scope::EmitMarkEventParams::builder() + .name("tracing-subscriber-layer-mark") + .build(), + ) + .unwrap(); + tracing::info!("external-layer-event"); + }); + + assert_eq!( + events.lock().unwrap().as_slice(), + ["tracing-subscriber-layer-mark"] + ); +} + +#[test] +fn test_public_tracing_event_decoder_supports_custom_layers() { + let _lock = TEST_MUTEX.lock().unwrap(); + reset_global(); + setup_isolated_thread(); + + struct DecodingLayer { + events: Arc>>, + } + + impl tracing_subscriber::Layer for DecodingLayer + where + S: tracing::Subscriber, + { + fn on_event( + &self, + event: &tracing::Event<'_>, + _ctx: tracing_subscriber::layer::Context<'_, S>, + ) { + if is_nemo_relay_event(event.metadata()) + && let Some(event) = event_from_tracing(event) + { + self.events.lock().unwrap().push(event.name().to_owned()); + } + } + } + + let events = Arc::new(Mutex::new(Vec::new())); + let subscriber = tracing_subscriber::registry().with(DecodingLayer { + events: events.clone(), + }); + + tracing::subscriber::with_default(subscriber, || { + event( + nemo_relay::api::scope::EmitMarkEventParams::builder() + .name("public-tracing-decoder-mark") + .build(), + ) + .unwrap(); + tracing::info!("external-event"); + }); + + assert_eq!( + events.lock().unwrap().as_slice(), + ["public-tracing-decoder-mark"] + ); +} + +#[test] +fn test_subscriber_adapter_ignores_external_tracing_events() { + let _lock = TEST_MUTEX.lock().unwrap(); + reset_global(); + setup_isolated_thread(); + + let observed = Arc::new(Mutex::new(Vec::new())); + let observed_sink = observed.clone(); + let subscriber = NemoSubscriber::new(Arc::new(move |event| { + observed_sink.lock().unwrap().push(event.name().to_owned()); + })); + + tracing::subscriber::with_default(subscriber, || { + tracing::info!("external-host-event"); + }); + + assert!(observed.lock().unwrap().is_empty()); +} + +#[test] +fn test_tracing_first_delivery_forwards_to_host_and_compat_subscribers() { + let _lock = TEST_MUTEX.lock().unwrap(); + reset_global(); + setup_isolated_thread(); + + let compat_events = capture_events("trace-compat-events"); + let output = Arc::new(Mutex::new(Vec::new())); + let writer_output = output.clone(); + let subscriber = tracing_subscriber::fmt() + .with_ansi(false) + .without_time() + .with_target(false) + .with_writer(move || TraceBuffer(writer_output.clone())) + .finish(); + + tracing::subscriber::with_default(subscriber, || { + event( + nemo_relay::api::scope::EmitMarkEventParams::builder() + .name("trace-first-compat-mark") + .data(json!({"compat": true})) + .build(), + ) + .unwrap(); + }); + + deregister_subscriber("trace-compat-events").unwrap(); + + let captured = compat_events.lock().unwrap(); + assert_eq!(captured.len(), 1); + assert_eq!(captured[0].name(), "trace-first-compat-mark"); + assert_eq!(captured[0].data().unwrap(), &json!({"compat": true})); + + let rendered = String::from_utf8(output.lock().unwrap().clone()).unwrap(); + assert!(rendered.contains("nemo_relay.event")); + assert!(rendered.contains("trace-first-compat-mark")); +} + +#[test] +fn test_host_tracing_panics_do_not_skip_compat_subscriber_delivery() { + let _lock = TEST_MUTEX.lock().unwrap(); + reset_global(); + setup_isolated_thread(); + + let compat_events = capture_events("panic-host-compat-events"); + let traced_events = Arc::new(Mutex::new(Vec::new())); + let subscriber = RecordingTracingSubscriber::panic_on_any_event(traced_events); + + let result = catch_unwind(AssertUnwindSafe(|| { + tracing::subscriber::with_default(subscriber, || { + event( + nemo_relay::api::scope::EmitMarkEventParams::builder() + .name("host-tracing-panic-mark") + .build(), + ) + .unwrap(); + }); + })); + + deregister_subscriber("panic-host-compat-events").unwrap(); + + assert!(result.is_ok()); + let captured = compat_events.lock().unwrap(); + assert_eq!(captured.len(), 1); + assert_eq!(captured[0].name(), "host-tracing-panic-mark"); +} + +#[test] +fn test_scope_start_compat_panic_cleans_traced_span_context() { + let _lock = TEST_MUTEX.lock().unwrap(); + reset_global(); + setup_isolated_thread(); + + register_subscriber( + "panic-on-scope-start", + Arc::new(|event| { + if event.scope_category() == Some(ScopeCategory::Start) { + panic!("test compatibility subscriber panic"); + } + }), + ) + .unwrap(); + + let traced_events = Arc::new(Mutex::new(Vec::new())); + let subscriber = RecordingTracingSubscriber::new(traced_events.clone()); + let result = catch_unwind(AssertUnwindSafe(|| { + tracing::subscriber::with_default(subscriber, || { + push_scope( + nemo_relay::api::scope::PushScopeParams::builder() + .name("compat-panic-scope") + .scope_type(ScopeType::Agent) + .build(), + ) + .unwrap(); + }); + })); + assert!(result.is_err()); + + deregister_subscriber("panic-on-scope-start").unwrap(); + let scope_handle = current_scope_stack().read().unwrap().top().clone(); + event( + nemo_relay::api::scope::EmitMarkEventParams::builder() + .name("after-compat-panic-mark") + .parent(&scope_handle) + .build(), + ) + .unwrap(); + + assert_eq!( + traced_events.lock().unwrap().as_slice(), + ["compat-panic-scope"] + ); + + pop_scope( + nemo_relay::api::scope::PopScopeParams::builder() + .handle_uuid(&scope_handle.uuid) + .build(), + ) + .unwrap(); +} + +#[test] +fn test_scope_end_tracing_panic_cleans_traced_span_context() { + let _lock = TEST_MUTEX.lock().unwrap(); + reset_global(); + setup_isolated_thread(); + + let traced_events = Arc::new(Mutex::new(Vec::new())); + let subscriber = + RecordingTracingSubscriber::panic_on_scope_category(traced_events.clone(), "end"); + + let scope_handle = tracing::subscriber::with_default(subscriber, || { + let scope_handle = push_scope( + nemo_relay::api::scope::PushScopeParams::builder() + .name("panic-cleanup-scope") + .scope_type(ScopeType::Agent) + .build(), + ) + .unwrap(); + + let result = catch_unwind(AssertUnwindSafe(|| { + pop_scope( + nemo_relay::api::scope::PopScopeParams::builder() + .handle_uuid(&scope_handle.uuid) + .build(), + ) + .unwrap(); + })); + assert!(result.is_ok()); + scope_handle + }); + + event( + nemo_relay::api::scope::EmitMarkEventParams::builder() + .name("stale-context-mark") + .parent(&scope_handle) + .build(), + ) + .unwrap(); + + assert_eq!( + traced_events.lock().unwrap().as_slice(), + ["panic-cleanup-scope", "panic-cleanup-scope"] + ); +} + +#[test] +fn test_compat_subscribers_do_not_store_noop_tracing_context() { + let _lock = TEST_MUTEX.lock().unwrap(); + reset_global(); + setup_isolated_thread(); + + let subscription = subscribe(Arc::new(|_| {})).unwrap(); + let scope_handle = push_scope( + nemo_relay::api::scope::PushScopeParams::builder() + .name("compat-only-scope") + .scope_type(ScopeType::Agent) + .build(), + ) + .unwrap(); + + let traced_events = Arc::new(Mutex::new(Vec::new())); + let subscriber = RecordingTracingSubscriber::new(traced_events.clone()); + tracing::subscriber::with_default(subscriber, || { + event( + nemo_relay::api::scope::EmitMarkEventParams::builder() + .name("host-inside-compat-only-scope") + .parent(&scope_handle) + .build(), + ) + .unwrap(); + }); + + pop_scope( + nemo_relay::api::scope::PopScopeParams::builder() + .handle_uuid(&scope_handle.uuid) + .build(), + ) + .unwrap(); + subscription.close().unwrap(); + + assert_eq!( + traced_events.lock().unwrap().as_slice(), + ["host-inside-compat-only-scope"] + ); +} + +#[test] +fn test_tracing_first_nested_compat_delivery_is_not_duplicated() { + let _lock = TEST_MUTEX.lock().unwrap(); + reset_global(); + setup_isolated_thread(); + + let names = Arc::new(Mutex::new(Vec::new())); + let sink = names.clone(); + let emitted_nested = Arc::new(AtomicBool::new(false)); + let nested_flag = emitted_nested.clone(); + let subscription = subscribe(Arc::new(move |observed| { + let name = observed.name().to_owned(); + sink.lock().unwrap().push(name.clone()); + + if name == "trace-first-outer" && !nested_flag.swap(true, Ordering::SeqCst) { + nemo_relay::api::scope::event( + nemo_relay::api::scope::EmitMarkEventParams::builder() + .name("trace-first-inner") + .build(), + ) + .unwrap(); + } + })) + .unwrap(); + + event( + nemo_relay::api::scope::EmitMarkEventParams::builder() + .name("trace-first-outer") + .build(), + ) + .unwrap(); + subscription.close().unwrap(); + + assert_eq!( + names.lock().unwrap().as_slice(), + ["trace-first-outer", "trace-first-inner"] + ); +} + #[test] fn test_manual_lifecycle_timestamp_overrides() { let _lock = TEST_MUTEX.lock().unwrap(); diff --git a/crates/core/tests/unit/context_tests.rs b/crates/core/tests/unit/context_tests.rs index 281735ea..c716f408 100644 --- a/crates/core/tests/unit/context_tests.rs +++ b/crates/core/tests/unit/context_tests.rs @@ -208,12 +208,13 @@ fn conditional_guardrail_snapshots_keep_names_and_callbacks_after_deregister() { captured.lock().unwrap().push(event.clone()); }); let subscribers = [subscriber]; + let subscriber_snapshot = state.collect_event_subscribers(&subscribers); let rejection = NemoRelayContextState::tool_conditional_execution_snapshot_chain( "snapshot_target", &json!({}), &entries, - &subscribers, + &subscriber_snapshot, None, None, ) @@ -283,15 +284,17 @@ fn context_state_supports_extensions_events_and_builders() { let events = Arc::new(Mutex::new(Vec::::new())); let subscriber_events = events.clone(); - state.event_subscribers.insert( - "capture".to_string(), - Arc::new(move |event: &Event| { - subscriber_events - .lock() - .unwrap() - .push(event.kind().to_string()); - }), - ); + state + .register_event_subscriber( + "capture", + Arc::new(move |event: &Event| { + subscriber_events + .lock() + .unwrap() + .push(event.kind().to_string()); + }), + ) + .unwrap(); let event = state.create_event(crate::api::event::MarkEvent::new( crate::api::event::BaseEvent::builder().name("mark").build(), None, diff --git a/crates/core/tests/unit/observability/plugin_component_tests.rs b/crates/core/tests/unit/observability/plugin_component_tests.rs index c12f50d6..9ed3ebf2 100644 --- a/crates/core/tests/unit/observability/plugin_component_tests.rs +++ b/crates/core/tests/unit/observability/plugin_component_tests.rs @@ -306,7 +306,7 @@ fn empty_and_disabled_config_register_nothing() { futures::executor::block_on(initialize_plugins(config)).unwrap(); let state = global_context(); - assert!(state.read().unwrap().event_subscribers.is_empty()); + assert!(state.read().unwrap().event_subscribers_is_empty()); } #[test] @@ -487,13 +487,7 @@ fn atof_enabled_writes_jsonl_and_teardown_flushes() { { let state = global_context(); - let names = state - .read() - .unwrap() - .event_subscribers - .keys() - .cloned() - .collect::>(); + let names = state.read().unwrap().event_subscriber_names(); assert_eq!(names, vec!["__nemo_relay_plugin__observability__atof"]); } @@ -624,7 +618,7 @@ fn atif_completed_top_level_agent_is_evicted_after_write() { .unwrap() .complete_scope_write(agent.uuid, Ok(())); if let Some((scope_uuid, name)) = scope_subscriber { - let _ = scope_deregister_subscriber(&scope_uuid, &name); + let _ = crate::api::subscriber::scope_deregister_subscriber(&scope_uuid, &name); } let dispatcher = manager.lock().unwrap(); @@ -742,13 +736,7 @@ fn otlp_sections_register_inferred_subscribers_with_full_config() { futures::executor::block_on(initialize_plugins(config)).unwrap(); let state = global_context(); - let names = state - .read() - .unwrap() - .event_subscribers - .keys() - .cloned() - .collect::>(); + let names = state.read().unwrap().event_subscriber_names(); assert!(names.contains(&"__nemo_relay_plugin__observability__opentelemetry".to_string())); assert!(names.contains(&"__nemo_relay_plugin__observability__openinference".to_string())); clear_plugin_configuration().unwrap(); diff --git a/crates/ffi/nemo_relay.h b/crates/ffi/nemo_relay.h index 1d62ec42..d5ddbf6a 100644 --- a/crates/ffi/nemo_relay.h +++ b/crates/ffi/nemo_relay.h @@ -184,6 +184,11 @@ typedef struct FfiScopeStack FfiScopeStack; */ typedef struct FfiStream FfiStream; +/** + * Opaque handle to a closeable event subscriber registration. + */ +typedef struct FfiSubscriptionHandle FfiSubscriptionHandle; + /** * Opaque handle to a captured thread-local scope stack binding. */ @@ -1812,6 +1817,59 @@ NemoRelayStatus nemo_relay_scope_stack_restore_thread(struct FfiThreadScopeStack */ bool nemo_relay_scope_stack_active(void); +/** + * Register an anonymous global event subscriber and return a closeable handle. + * + * # Parameters + * - `cb`: Event callback. The `FfiEvent` is valid only during the call. + * - `user_data`: Opaque pointer passed to `cb`. + * - `free_fn`: Optional destructor for `user_data` after registration ownership + * has been accepted. + * - `out`: Receives the subscription handle on success. + * + * # Safety + * `cb` must be a valid function pointer. `out` must be non-null and valid for + * writes. + */ +NemoRelayStatus nemo_relay_subscribe(NemoRelayEventSubscriberCb cb, + void *user_data, + NemoRelayFreeFn free_fn, + struct FfiSubscriptionHandle **out); + +/** + * Register an anonymous scope-local event subscriber and return a closeable handle. + * + * # Parameters + * - `scope_uuid`: UUID of the active scope that owns the subscriber. + * - `cb`: Event callback. The `FfiEvent` is valid only during the call. + * - `user_data`: Opaque pointer passed to `cb`. + * - `free_fn`: Optional destructor for `user_data` after registration ownership + * has been accepted. + * - `out`: Receives the subscription handle on success. + * + * # Safety + * `scope_uuid` must be a valid C string. `cb` must be a valid function + * pointer. `out` must be non-null and valid for writes. + */ +NemoRelayStatus nemo_relay_scope_subscribe(const char *scope_uuid, + NemoRelayEventSubscriberCb cb, + void *user_data, + NemoRelayFreeFn free_fn, + struct FfiSubscriptionHandle **out); + +/** + * Close a subscription handle. + * + * This function is idempotent. Closing after automatic scope cleanup succeeds. + * `removed` receives true only when this call removed a live subscriber. + * + * # Safety + * `handle` must be a valid pointer returned by `nemo_relay_subscribe` or + * `nemo_relay_scope_subscribe`. `removed` must be non-null and valid for + * writes. + */ +NemoRelayStatus nemo_relay_subscription_close(struct FfiSubscriptionHandle *handle, bool *removed); + /** * Begin a manual tool call lifecycle span. * @@ -2059,6 +2117,19 @@ void nemo_relay_llm_request_free(struct FfiLLMRequest *ptr); */ void nemo_relay_event_free(struct FfiEvent *ptr); +/** + * Free a subscription handle returned by `nemo_relay_subscribe` or + * `nemo_relay_scope_subscribe`. + * + * Dropping the handle performs best-effort deregistration if it has not + * already been closed. + * + * # Safety + * `ptr` must be a valid pointer returned by an `nemo_relay_*subscribe` + * function, or null. + */ +void nemo_relay_subscription_free(struct FfiSubscriptionHandle *ptr); + /** * Free a scope stack handle previously returned by `nemo_relay_scope_stack_create`. * diff --git a/crates/ffi/src/api/mod.rs b/crates/ffi/src/api/mod.rs index a1d6fffd..a63a3b7e 100644 --- a/crates/ffi/src/api/mod.rs +++ b/crates/ffi/src/api/mod.rs @@ -70,6 +70,7 @@ mod plugin; mod scope; mod scope_registry; mod scope_stack; +mod subscription; mod tool_lifecycle; mod tool_registry; @@ -80,6 +81,7 @@ pub use plugin::*; pub use scope::*; pub use scope_registry::*; pub use scope_stack::*; +pub use subscription::*; pub use tool_lifecycle::*; pub use tool_registry::*; diff --git a/crates/ffi/src/api/subscription.rs b/crates/ffi/src/api/subscription.rs new file mode 100644 index 00000000..375bdcf7 --- /dev/null +++ b/crates/ffi/src/api/subscription.rs @@ -0,0 +1,145 @@ +// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Closeable event subscription FFI functions. + +use libc::c_char; +use nemo_relay::api::subscriber as core_subscriber_api; + +use crate::callable::{ + NemoRelayEventSubscriberCb, NemoRelayFreeFn, wrap_event_subscriber_deferred, +}; +use crate::convert::c_str_to_string; +use crate::error::{NemoRelayStatus, clear_last_error, status_from_error}; +use crate::types::{FfiSubscription, FfiSubscriptionHandle}; + +/// Register an anonymous global event subscriber and return a closeable handle. +/// +/// # Parameters +/// - `cb`: Event callback. The `FfiEvent` is valid only during the call. +/// - `user_data`: Opaque pointer passed to `cb`. +/// - `free_fn`: Optional destructor for `user_data` after registration ownership +/// has been accepted. +/// - `out`: Receives the subscription handle on success. +/// +/// # Safety +/// `cb` must be a valid function pointer. `out` must be non-null and valid for +/// writes. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn nemo_relay_subscribe( + cb: NemoRelayEventSubscriberCb, + user_data: *mut libc::c_void, + free_fn: NemoRelayFreeFn, + out: *mut *mut FfiSubscriptionHandle, +) -> NemoRelayStatus { + clear_last_error(); + if out.is_null() { + return NemoRelayStatus::NullPointer; + } + unsafe { + *out = std::ptr::null_mut(); + } + let (wrapped, ownership) = wrap_event_subscriber_deferred(cb, user_data, free_fn); + match core_subscriber_api::subscribe(wrapped) { + Ok(handle) => { + ownership.accept(); + unsafe { + *out = Box::into_raw(Box::new(FfiSubscriptionHandle(FfiSubscription::Global( + handle, + )))); + } + NemoRelayStatus::Ok + } + Err(error) => status_from_error(&error), + } +} + +/// Register an anonymous scope-local event subscriber and return a closeable handle. +/// +/// # Parameters +/// - `scope_uuid`: UUID of the active scope that owns the subscriber. +/// - `cb`: Event callback. The `FfiEvent` is valid only during the call. +/// - `user_data`: Opaque pointer passed to `cb`. +/// - `free_fn`: Optional destructor for `user_data` after registration ownership +/// has been accepted. +/// - `out`: Receives the subscription handle on success. +/// +/// # Safety +/// `scope_uuid` must be a valid C string. `cb` must be a valid function +/// pointer. `out` must be non-null and valid for writes. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn nemo_relay_scope_subscribe( + scope_uuid: *const c_char, + cb: NemoRelayEventSubscriberCb, + user_data: *mut libc::c_void, + free_fn: NemoRelayFreeFn, + out: *mut *mut FfiSubscriptionHandle, +) -> NemoRelayStatus { + clear_last_error(); + if out.is_null() { + return NemoRelayStatus::NullPointer; + } + unsafe { + *out = std::ptr::null_mut(); + } + let scope_uuid = match c_str_to_string(scope_uuid) { + Ok(value) => value, + Err(status) => return status, + }; + let uuid = match uuid::Uuid::parse_str(&scope_uuid) { + Ok(uuid) => uuid, + Err(_) => return NemoRelayStatus::InvalidArg, + }; + let (wrapped, ownership) = wrap_event_subscriber_deferred(cb, user_data, free_fn); + match core_subscriber_api::scope_subscribe(&uuid, wrapped) { + Ok(handle) => { + ownership.accept(); + unsafe { + *out = Box::into_raw(Box::new(FfiSubscriptionHandle(FfiSubscription::Scope( + handle, + )))); + } + NemoRelayStatus::Ok + } + Err(error) => status_from_error(&error), + } +} + +/// Close a subscription handle. +/// +/// This function is idempotent. Closing after automatic scope cleanup succeeds. +/// `removed` receives true only when this call removed a live subscriber. +/// +/// # Safety +/// `handle` must be a valid pointer returned by `nemo_relay_subscribe` or +/// `nemo_relay_scope_subscribe`. `removed` must be non-null and valid for +/// writes. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn nemo_relay_subscription_close( + handle: *mut FfiSubscriptionHandle, + removed: *mut bool, +) -> NemoRelayStatus { + clear_last_error(); + if removed.is_null() { + return NemoRelayStatus::NullPointer; + } + unsafe { + *removed = false; + } + if handle.is_null() { + return NemoRelayStatus::NullPointer; + } + let result = match unsafe { &*handle } { + FfiSubscriptionHandle(FfiSubscription::Global(handle)) => handle.close(), + FfiSubscriptionHandle(FfiSubscription::Scope(handle)) => handle.close(), + }; + match result { + Ok(value) => { + unsafe { + *removed = value; + } + NemoRelayStatus::Ok + } + Err(error) => status_from_error(&error), + } +} diff --git a/crates/ffi/src/callable.rs b/crates/ffi/src/callable.rs index ea3bf4c2..36dcc16b 100644 --- a/crates/ffi/src/callable.rs +++ b/crates/ffi/src/callable.rs @@ -20,6 +20,7 @@ use std::ffi::{CStr, CString}; use std::future::Future; use std::pin::Pin; use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; use libc::c_char; use nemo_relay::api::runtime::{ @@ -214,10 +215,12 @@ pub type NemoRelayPluginRegisterCb = unsafe extern "C" fn( // --------------------------------------------------------------------------- /// RAII wrapper around a C user-data pointer and its associated free function. -/// Ensures the free function is called exactly once when dropped. +/// When ownership is armed, ensures the free function is called exactly once +/// when dropped. struct UserData { ptr: *mut libc::c_void, free_fn: NemoRelayFreeFn, + free_on_drop: AtomicBool, } unsafe impl Send for UserData {} @@ -225,7 +228,10 @@ unsafe impl Sync for UserData {} impl Drop for UserData { fn drop(&mut self) { - if let Some(free) = self.free_fn { + if self.free_on_drop.load(Ordering::Acquire) + && !self.ptr.is_null() + && let Some(free) = self.free_fn + { unsafe { free(self.ptr) }; } } @@ -234,13 +240,25 @@ impl Drop for UserData { fn make_user_data( user_data: *mut libc::c_void, free_fn: NemoRelayFreeFn, + free_on_drop: bool, ) -> std::sync::Arc { std::sync::Arc::new(UserData { ptr: user_data, free_fn, + free_on_drop: AtomicBool::new(free_on_drop), }) } +pub(crate) struct DeferredUserData { + user_data: Arc, +} + +impl DeferredUserData { + pub(crate) fn accept(self) { + self.user_data.free_on_drop.store(true, Ordering::Release); + } +} + // --------------------------------------------------------------------------- // Wrapper functions: C callback -> core trait objects // --------------------------------------------------------------------------- @@ -251,7 +269,7 @@ pub fn wrap_tool_sanitize_fn( user_data: *mut libc::c_void, free_fn: NemoRelayFreeFn, ) -> ToolSanitizeFn { - let ud = make_user_data(user_data, free_fn); + let ud = make_user_data(user_data, free_fn, true); Arc::new(move |name: &str, args: Json| { let c_name = CString::new(name).unwrap_or_default(); let c_args = json_to_c_string(&args); @@ -269,7 +287,7 @@ pub fn wrap_tool_conditional_fn( user_data: *mut libc::c_void, free_fn: NemoRelayFreeFn, ) -> ToolConditionalFn { - let ud = make_user_data(user_data, free_fn); + let ud = make_user_data(user_data, free_fn, true); Arc::new(move |name: &str, args: &Json| { clear_last_error(); let c_name = CString::new(name).unwrap_or_default(); @@ -295,7 +313,7 @@ pub fn wrap_tool_request_intercept_fn( user_data: *mut libc::c_void, free_fn: NemoRelayFreeFn, ) -> ToolInterceptFn { - let ud = make_user_data(user_data, free_fn); + let ud = make_user_data(user_data, free_fn, true); Arc::new(move |name: &str, args: Json| { clear_last_error(); let c_name = CString::new(name).unwrap_or_default(); @@ -315,7 +333,7 @@ pub fn wrap_tool_exec_fn( user_data: *mut libc::c_void, free_fn: NemoRelayFreeFn, ) -> Box Pin> + Send>> + Send + Sync> { - let ud = make_user_data(user_data, free_fn); + let ud = make_user_data(user_data, free_fn, true); Box::new(move |args: Json| { let ud = ud.clone(); Box::pin(async move { @@ -342,7 +360,7 @@ pub fn wrap_tool_exec_intercept_fn( + Send + Sync, > { - let ud = make_user_data(user_data, free_fn); + let ud = make_user_data(user_data, free_fn, true); Arc::new(move |_name: &str, args: Json, next: ToolExecutionNextFn| { let ud = ud.clone(); Box::pin(async move { @@ -403,7 +421,7 @@ pub fn wrap_llm_exec_intercept_fn( + Send + Sync, > { - let ud = make_user_data(user_data, free_fn); + let ud = make_user_data(user_data, free_fn, true); Arc::new( move |_name: &str, request: LlmRequest, next: LlmExecutionNextFn| { let ud = ud.clone(); @@ -477,7 +495,7 @@ pub fn wrap_llm_stream_exec_intercept_fn( > + Send + Sync, > { - let ud = make_user_data(user_data, free_fn); + let ud = make_user_data(user_data, free_fn, true); Arc::new( move |_name: &str, request: LlmRequest, next: LlmStreamExecutionNextFn| { let ud = ud.clone(); @@ -546,7 +564,7 @@ pub fn wrap_json_fn( user_data: *mut libc::c_void, free_fn: NemoRelayFreeFn, ) -> Box Json + Send + Sync> { - let ud = make_user_data(user_data, free_fn); + let ud = make_user_data(user_data, free_fn, true); Box::new(move |value: Json| { let c_json = json_to_c_string(&value); let result_ptr = unsafe { cb(ud.ptr, c_json) }; @@ -566,7 +584,7 @@ pub fn wrap_llm_request_intercept_fn( user_data: *mut libc::c_void, free_fn: NemoRelayFreeFn, ) -> LlmRequestInterceptFn { - let ud = make_user_data(user_data, free_fn); + let ud = make_user_data(user_data, free_fn, true); Arc::new( move |name: &str, request: LlmRequest, annotated: Option| { clear_last_error(); @@ -644,7 +662,7 @@ pub fn wrap_llm_response_fn( user_data: *mut libc::c_void, free_fn: NemoRelayFreeFn, ) -> LlmSanitizeResponseFn { - let ud = make_user_data(user_data, free_fn); + let ud = make_user_data(user_data, free_fn, true); Arc::new(move |response: Json| { let c_json = json_to_c_string(&response); let result_ptr = unsafe { cb(ud.ptr, c_json) }; @@ -661,7 +679,7 @@ pub fn wrap_llm_sanitize_request_fn( user_data: *mut libc::c_void, free_fn: NemoRelayFreeFn, ) -> LlmSanitizeRequestFn { - let ud = make_user_data(user_data, free_fn); + let ud = make_user_data(user_data, free_fn, true); Arc::new(move |request: LlmRequest| { let ffi_req = Box::into_raw(Box::new(FfiLLMRequest(request))); let result_ptr = unsafe { cb(ud.ptr, ffi_req) }; @@ -686,7 +704,7 @@ pub fn wrap_llm_conditional_fn( user_data: *mut libc::c_void, free_fn: NemoRelayFreeFn, ) -> LlmConditionalFn { - let ud = make_user_data(user_data, free_fn); + let ud = make_user_data(user_data, free_fn, true); Arc::new(move |request: &LlmRequest| { clear_last_error(); let ffi_req = FfiLLMRequest(request.clone()); @@ -711,7 +729,7 @@ pub fn wrap_llm_exec_fn( user_data: *mut libc::c_void, free_fn: NemoRelayFreeFn, ) -> Box Pin> + Send>> + Send + Sync> { - let ud = make_user_data(user_data, free_fn); + let ud = make_user_data(user_data, free_fn, true); Box::new(move |request: LlmRequest| { let ud = ud.clone(); Box::pin(async move { @@ -744,7 +762,7 @@ pub fn wrap_llm_stream_exec_fn( > + Send + Sync, > { - let ud = make_user_data(user_data, free_fn); + let ud = make_user_data(user_data, free_fn, true); Box::new(move |request: LlmRequest| { let ud = ud.clone(); Box::pin(async move { @@ -809,7 +827,26 @@ pub fn wrap_event_subscriber( user_data: *mut libc::c_void, free_fn: NemoRelayFreeFn, ) -> EventSubscriberFn { - let ud = make_user_data(user_data, free_fn); + let ud = make_user_data(user_data, free_fn, true); + wrap_event_subscriber_user_data(cb, ud) +} + +pub(crate) fn wrap_event_subscriber_deferred( + cb: NemoRelayEventSubscriberCb, + user_data: *mut libc::c_void, + free_fn: NemoRelayFreeFn, +) -> (EventSubscriberFn, DeferredUserData) { + let ud = make_user_data(user_data, free_fn, false); + ( + wrap_event_subscriber_user_data(cb, ud.clone()), + DeferredUserData { user_data: ud }, + ) +} + +fn wrap_event_subscriber_user_data( + cb: NemoRelayEventSubscriberCb, + ud: Arc, +) -> EventSubscriberFn { Arc::new(move |event: &Event| { let ffi_event = FfiEvent(event.clone()); unsafe { cb(ud.ptr, &ffi_event) }; @@ -887,7 +924,7 @@ pub fn wrap_codec_fn( user_data: *mut libc::c_void, free_fn: NemoRelayFreeFn, ) -> Arc { - let ud = make_user_data(user_data, free_fn); + let ud = make_user_data(user_data, free_fn, true); Arc::new(FfiCodec { decode_cb, encode_cb, diff --git a/crates/ffi/src/types/mod.rs b/crates/ffi/src/types/mod.rs index fa9c6bd5..2e259586 100644 --- a/crates/ffi/src/types/mod.rs +++ b/crates/ffi/src/types/mod.rs @@ -12,6 +12,7 @@ use libc::c_char; use nemo_relay::api::runtime::{ScopeStackHandle, ThreadScopeStackBinding}; +use nemo_relay::api::subscriber::{ScopeSubscriberHandle, SubscriberHandle}; use nemo_relay::plugin::PluginRegistrationContext; use serde_json::Value as Json; @@ -47,6 +48,8 @@ pub struct FfiLLMHandle(pub LlmHandle); pub struct FfiLLMRequest(pub LlmRequest); /// Opaque wrapper around a lifecycle event emitted by the runtime. pub struct FfiEvent(pub Event); +/// Opaque handle to a closeable event subscriber registration. +pub struct FfiSubscriptionHandle(pub FfiSubscription); /// Opaque handle to an isolated scope stack for per-request/per-task isolation. pub struct FfiScopeStack(pub ScopeStackHandle); /// Opaque handle to a captured thread-local scope stack binding. @@ -85,6 +88,14 @@ pub struct FfiCodecHandle { pub(crate) response_codec: std::sync::Arc, } +/// Event subscriber registration owned by an FFI subscription handle. +pub enum FfiSubscription { + /// Global anonymous subscriber. + Global(SubscriberHandle), + /// Scope-local anonymous subscriber. + Scope(ScopeSubscriberHandle), +} + // --------------------------------------------------------------------------- // Enums exposed to C // --------------------------------------------------------------------------- @@ -212,6 +223,22 @@ pub unsafe extern "C" fn nemo_relay_event_free(ptr: *mut FfiEvent) { } } +/// Free a subscription handle returned by `nemo_relay_subscribe` or +/// `nemo_relay_scope_subscribe`. +/// +/// Dropping the handle performs best-effort deregistration if it has not +/// already been closed. +/// +/// # Safety +/// `ptr` must be a valid pointer returned by an `nemo_relay_*subscribe` +/// function, or null. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn nemo_relay_subscription_free(ptr: *mut FfiSubscriptionHandle) { + if !ptr.is_null() { + drop(unsafe { Box::from_raw(ptr) }); + } +} + /// Free a scope stack handle previously returned by `nemo_relay_scope_stack_create`. /// /// # Safety diff --git a/crates/ffi/tests/unit/api/registry_tests.rs b/crates/ffi/tests/unit/api/registry_tests.rs index a9ff0a92..47b511c7 100644 --- a/crates/ffi/tests/unit/api/registry_tests.rs +++ b/crates/ffi/tests/unit/api/registry_tests.rs @@ -5,6 +5,19 @@ use super::*; +unsafe extern "C" fn subscription_log_cb(user_data: *mut libc::c_void, event: *const FfiEvent) { + let log = unsafe { &*(user_data as *const Arc>>) }; + let name = unsafe { take_string(nemo_relay_event_name(event)) }.unwrap_or_default(); + log.lock().unwrap_or_else(|e| e.into_inner()).push(name); +} + +unsafe extern "C" fn subscription_log_free(user_data: *mut libc::c_void) { + *lock_unpoisoned(plugin_frees()) += 1; + if !user_data.is_null() { + drop(unsafe { Box::from_raw(user_data as *mut Arc>>) }); + } +} + #[test] fn test_ffi_open_telemetry_subscriber_lifecycle_and_errors() { let _lock = TEST_MUTEX.lock().unwrap_or_else(|e| e.into_inner()); @@ -409,6 +422,234 @@ fn test_ffi_helper_rejection_and_null_name_paths() { } } +#[test] +fn test_ffi_subscription_handle_close_free_and_callback_lifetime() { + let _lock = TEST_MUTEX.lock().unwrap_or_else(|e| e.into_inner()); + reset_globals(); + + unsafe { + let stack = fresh_scope_stack(); + let mut subscription: *mut FfiSubscriptionHandle = ptr::null_mut(); + let events: Arc>> = Arc::new(Mutex::new(Vec::new())); + let user_data = Box::into_raw(Box::new(Arc::clone(&events))) as *mut libc::c_void; + assert_eq!( + nemo_relay_subscribe( + subscription_log_cb, + user_data, + Some(subscription_log_free), + &mut subscription, + ), + NemoRelayStatus::Ok + ); + assert!(!subscription.is_null()); + + let before = cstring("ffi_subscription_before_close"); + assert_eq!( + nemo_relay_event(before.as_ptr(), ptr::null(), ptr::null(), ptr::null()), + NemoRelayStatus::Ok + ); + let mut removed = false; + assert_eq!( + nemo_relay_subscription_close(subscription, &mut removed), + NemoRelayStatus::Ok + ); + assert!(removed); + assert_eq!( + nemo_relay_subscription_close(subscription, &mut removed), + NemoRelayStatus::Ok + ); + assert!(!removed); + assert_eq!( + nemo_relay_subscription_close(subscription, ptr::null_mut()), + NemoRelayStatus::NullPointer + ); + let after = cstring("ffi_subscription_after_close"); + assert_eq!( + nemo_relay_event(after.as_ptr(), ptr::null(), ptr::null(), ptr::null()), + NemoRelayStatus::Ok + ); + nemo_relay_subscription_free(subscription); + + let names = events.lock().unwrap_or_else(|e| e.into_inner()).clone(); + assert_eq!(names, vec!["ffi_subscription_before_close"]); + assert_eq!(*lock_unpoisoned(plugin_frees()), 1); + + assert_eq!( + nemo_relay_subscription_close(ptr::null_mut(), &mut removed), + NemoRelayStatus::NullPointer + ); + nemo_relay_scope_stack_free(stack); + } +} + +#[test] +fn test_ffi_scope_subscription_handle_closes_and_scope_pop_cleans_up() { + let _lock = TEST_MUTEX.lock().unwrap_or_else(|e| e.into_inner()); + reset_globals(); + + unsafe { + let stack = fresh_scope_stack(); + let scope_name = cstring("ffi_scope_subscription_owner"); + let mut scope = ptr::null_mut(); + assert_eq!( + nemo_relay_push_scope( + scope_name.as_ptr(), + NemoRelayScopeType::Agent, + ptr::null(), + 0, + ptr::null(), + ptr::null(), + ptr::null(), + &mut scope, + ), + NemoRelayStatus::Ok + ); + let scope_uuid = cstring(&take_string(nemo_relay_scope_handle_uuid(scope)).unwrap()); + + let mut explicit: *mut FfiSubscriptionHandle = ptr::null_mut(); + let events: Arc>> = Arc::new(Mutex::new(Vec::new())); + let explicit_data = Box::into_raw(Box::new(Arc::clone(&events))) as *mut libc::c_void; + assert_eq!( + nemo_relay_scope_subscribe( + scope_uuid.as_ptr(), + subscription_log_cb, + explicit_data, + Some(subscription_log_free), + &mut explicit, + ), + NemoRelayStatus::Ok + ); + let before = cstring("ffi_scope_subscription_before_close"); + assert_eq!( + nemo_relay_event(before.as_ptr(), scope, ptr::null(), ptr::null()), + NemoRelayStatus::Ok + ); + let mut removed = false; + assert_eq!( + nemo_relay_subscription_close(explicit, &mut removed), + NemoRelayStatus::Ok + ); + assert!(removed); + assert_eq!( + nemo_relay_subscription_close(explicit, &mut removed), + NemoRelayStatus::Ok + ); + assert!(!removed); + let after = cstring("ffi_scope_subscription_after_close"); + assert_eq!( + nemo_relay_event(after.as_ptr(), scope, ptr::null(), ptr::null()), + NemoRelayStatus::Ok + ); + nemo_relay_subscription_free(explicit); + + let mut cleanup: *mut FfiSubscriptionHandle = ptr::null_mut(); + let cleanup_data = Box::into_raw(Box::new(Arc::clone(&events))) as *mut libc::c_void; + assert_eq!( + nemo_relay_scope_subscribe( + scope_uuid.as_ptr(), + subscription_log_cb, + cleanup_data, + Some(subscription_log_free), + &mut cleanup, + ), + NemoRelayStatus::Ok + ); + assert_eq!( + nemo_relay_pop_scope(scope, ptr::null()), + NemoRelayStatus::Ok + ); + assert_eq!( + nemo_relay_subscription_close(cleanup, &mut removed), + NemoRelayStatus::Ok + ); + assert!(!removed); + nemo_relay_subscription_free(cleanup); + + let names = events.lock().unwrap_or_else(|e| e.into_inner()).clone(); + assert_eq!( + names, + vec![ + "ffi_scope_subscription_before_close", + "ffi_scope_subscription_owner" + ] + ); + assert_eq!(*lock_unpoisoned(plugin_frees()), 2); + + assert_eq!( + nemo_relay_scope_subscribe( + scope_uuid.as_ptr(), + subscription_log_cb, + ptr::null_mut(), + None, + ptr::null_mut(), + ), + NemoRelayStatus::NullPointer + ); + + nemo_relay_scope_handle_free(scope); + nemo_relay_scope_stack_free(stack); + } +} + +#[test] +fn test_ffi_subscription_validation_failures_do_not_consume_callback_data() { + let _lock = TEST_MUTEX.lock().unwrap_or_else(|e| e.into_inner()); + reset_globals(); + + unsafe { + let stack = fresh_scope_stack(); + let invalid_uuid = cstring("not-a-uuid"); + let mut subscription: *mut FfiSubscriptionHandle = + std::ptr::NonNull::::dangling().as_ptr(); + let events: Arc>> = Arc::new(Mutex::new(Vec::new())); + let user_data = Box::into_raw(Box::new(Arc::clone(&events))) as *mut libc::c_void; + + assert_eq!( + nemo_relay_subscribe( + subscription_log_cb, + user_data, + Some(subscription_log_free), + ptr::null_mut(), + ), + NemoRelayStatus::NullPointer + ); + assert_eq!(*lock_unpoisoned(plugin_frees()), 0); + + assert_eq!( + nemo_relay_scope_subscribe( + invalid_uuid.as_ptr(), + subscription_log_cb, + user_data, + Some(subscription_log_free), + &mut subscription, + ), + NemoRelayStatus::InvalidArg + ); + assert!(subscription.is_null()); + subscription = std::ptr::NonNull::::dangling().as_ptr(); + assert_eq!(*lock_unpoisoned(plugin_frees()), 0); + + let missing_uuid = cstring(&Uuid::now_v7().to_string()); + assert_eq!( + nemo_relay_scope_subscribe( + missing_uuid.as_ptr(), + subscriber_cb, + ptr::null_mut(), + Some(subscription_log_free), + &mut subscription, + ), + NemoRelayStatus::NotFound + ); + assert!(subscription.is_null()); + assert_eq!(*lock_unpoisoned(plugin_frees()), 0); + + subscription_log_free(user_data); + assert_eq!(*lock_unpoisoned(plugin_frees()), 1); + + nemo_relay_scope_stack_free(stack); + } +} + #[test] fn test_ffi_registration_name_and_uuid_error_sweep() { let _lock = TEST_MUTEX.lock().unwrap_or_else(|e| e.into_inner()); diff --git a/crates/ffi/tests/unit/api_tests.rs b/crates/ffi/tests/unit/api_tests.rs index 2f3af3b2..dc93f0c8 100644 --- a/crates/ffi/tests/unit/api_tests.rs +++ b/crates/ffi/tests/unit/api_tests.rs @@ -17,20 +17,20 @@ use crate::convert::nemo_relay_string_free; use crate::error::{NemoRelayStatus, nemo_relay_last_error}; use crate::types::{ FfiAtifExporter, FfiEvent, FfiLLMHandle, FfiLLMRequest, FfiOpenTelemetrySubscriber, - FfiScopeStack, FfiToolHandle, nemo_relay_atif_exporter_free, nemo_relay_event_data, - nemo_relay_event_input, nemo_relay_event_metadata, nemo_relay_event_model_name, - nemo_relay_event_name, nemo_relay_event_output, nemo_relay_event_parent_uuid, - nemo_relay_event_scope_type, nemo_relay_event_timestamp, nemo_relay_event_tool_call_id, - nemo_relay_event_uuid, nemo_relay_llm_handle_attributes, nemo_relay_llm_handle_free, - nemo_relay_llm_handle_name, nemo_relay_llm_handle_parent_uuid, nemo_relay_llm_handle_uuid, - nemo_relay_llm_request_content, nemo_relay_llm_request_free, nemo_relay_llm_request_headers, - nemo_relay_llm_request_new, nemo_relay_otel_subscriber_free, + FfiScopeStack, FfiSubscriptionHandle, FfiToolHandle, nemo_relay_atif_exporter_free, + nemo_relay_event_data, nemo_relay_event_input, nemo_relay_event_metadata, + nemo_relay_event_model_name, nemo_relay_event_name, nemo_relay_event_output, + nemo_relay_event_parent_uuid, nemo_relay_event_scope_type, nemo_relay_event_timestamp, + nemo_relay_event_tool_call_id, nemo_relay_event_uuid, nemo_relay_llm_handle_attributes, + nemo_relay_llm_handle_free, nemo_relay_llm_handle_name, nemo_relay_llm_handle_parent_uuid, + nemo_relay_llm_handle_uuid, nemo_relay_llm_request_content, nemo_relay_llm_request_free, + nemo_relay_llm_request_headers, nemo_relay_llm_request_new, nemo_relay_otel_subscriber_free, nemo_relay_scope_handle_attributes, nemo_relay_scope_handle_data, nemo_relay_scope_handle_free, nemo_relay_scope_handle_metadata, nemo_relay_scope_handle_name, nemo_relay_scope_handle_parent_uuid, nemo_relay_scope_handle_scope_type, - nemo_relay_scope_handle_uuid, nemo_relay_scope_stack_free, nemo_relay_tool_handle_attributes, - nemo_relay_tool_handle_free, nemo_relay_tool_handle_name, nemo_relay_tool_handle_parent_uuid, - nemo_relay_tool_handle_uuid, + nemo_relay_scope_handle_uuid, nemo_relay_scope_stack_free, nemo_relay_subscription_free, + nemo_relay_tool_handle_attributes, nemo_relay_tool_handle_free, nemo_relay_tool_handle_name, + nemo_relay_tool_handle_parent_uuid, nemo_relay_tool_handle_uuid, }; use crate::{api, callable, types}; diff --git a/crates/node/src/api/mod.rs b/crates/node/src/api/mod.rs index a7465ae1..d46ece9f 100644 --- a/crates/node/src/api/mod.rs +++ b/crates/node/src/api/mod.rs @@ -2284,6 +2284,42 @@ pub fn deregister_llm_stream_execution_intercept(name: String) -> Result { // Subscriber registrations // --------------------------------------------------------------------------- +enum SubscriptionInner { + Global(core_subscriber_api::SubscriberHandle), + Scope(core_subscriber_api::ScopeSubscriberHandle), +} + +/// Closeable event subscriber registration. +#[napi] +pub struct Subscription { + inner: StdMutex>, +} + +#[napi] +impl Subscription { + /// Close this subscription. + /// + /// Returns `true` when a live subscriber was removed and `false` when the + /// subscription was already closed or scope cleanup already removed it. + #[napi] + pub fn close(&self) -> Result { + let mut guard = self + .inner + .lock() + .map_err(|_| napi::Error::from_reason("subscription lock poisoned"))?; + let Some(inner) = guard.as_ref() else { + return Ok(false); + }; + let removed = match inner { + SubscriptionInner::Global(handle) => handle.close(), + SubscriptionInner::Scope(handle) => handle.close(), + } + .map_err(to_napi_err)?; + *guard = None; + Ok(removed) + } +} + /// Register a named event subscriber that receives all lifecycle events. /// /// The `callback` receives each event as the canonical JSON event object. Events are @@ -2298,6 +2334,19 @@ pub fn register_subscriber( .map_err(to_napi_err) } +/// Register an anonymous event subscriber and return a closeable handle. +/// +/// The `callback` receives each event as the canonical JSON event object. Events are +/// delivered asynchronously and non-blocking. +#[napi] +pub fn subscribe(callback: ThreadsafeFunction) -> Result { + core_subscriber_api::subscribe(callable::wrap_js_event_subscriber(callback)) + .map(|handle| Subscription { + inner: StdMutex::new(Some(SubscriptionInner::Global(handle))), + }) + .map_err(to_napi_err) +} + /// Deregister an event subscriber by name. /// /// Returns `true` if a subscriber with that name was found and removed. @@ -2798,6 +2847,24 @@ pub fn scope_register_subscriber( .map_err(to_napi_err) } +/// Register an anonymous scope-local event subscriber and return a closeable handle. +/// +/// The `callback` receives each event as the canonical JSON event object. Events are +/// delivered asynchronously and non-blocking. +#[napi] +pub fn scope_subscribe( + scope_uuid: String, + callback: ThreadsafeFunction, +) -> Result { + let uuid = uuid::Uuid::parse_str(&scope_uuid) + .map_err(|e| napi::Error::from_reason(format!("invalid UUID: {e}")))?; + core_subscriber_api::scope_subscribe(&uuid, callable::wrap_js_event_subscriber(callback)) + .map(|handle| Subscription { + inner: StdMutex::new(Some(SubscriptionInner::Scope(handle))), + }) + .map_err(to_napi_err) +} + /// Deregister a scope-local event subscriber by name. /// /// Returns `true` if a subscriber with that name was found and removed from the specified scope. diff --git a/crates/node/tests/scope_tests.mjs b/crates/node/tests/scope_tests.mjs index ffea586b..f972754d 100644 --- a/crates/node/tests/scope_tests.mjs +++ b/crates/node/tests/scope_tests.mjs @@ -8,7 +8,18 @@ import { createRequire } from 'node:module'; const require = createRequire(import.meta.url); const lib = require('../index.js'); -const { getHandle, pushScope, popScope, event, withScope, registerSubscriber, deregisterSubscriber, ScopeType } = lib; +const { + getHandle, + pushScope, + popScope, + event, + withScope, + registerSubscriber, + deregisterSubscriber, + subscribe, + scopeSubscribe, + ScopeType, +} = lib; const SCOPE_ATTR_PARALLEL = 0b01; const SCOPE_ATTR_RELOCATABLE = 0b10; @@ -243,6 +254,62 @@ describe('Subscribers', () => { } }); + it('subscription handle closes idempotently', async () => { + const events = []; + const subscription = subscribe((e) => events.push(e)); + event('node_handle_before_close', null, null, null); + + const deadline = Date.now() + 2000; + while (!events.some((e) => e.name === 'node_handle_before_close') && Date.now() < deadline) { + await new Promise((r) => setTimeout(r, 10)); + } + + assert.equal(subscription.close(), true); + assert.equal(subscription.close(), false); + event('node_handle_after_close', null, null, null); + await new Promise((r) => setTimeout(r, 20)); + + assert.equal(events.some((e) => e.name === 'node_handle_before_close'), true); + assert.equal(events.some((e) => e.name === 'node_handle_after_close'), false); + }); + + it('scope subscription handle closes and scope pop cleans up', async () => { + const scope = pushScope('node_scope_handle_owner', ScopeType.Agent, null, null); + try { + const explicitEvents = []; + const explicit = scopeSubscribe(scope.uuid, (e) => explicitEvents.push(e)); + event('node_scope_handle_before_close', scope, null, null); + + let deadline = Date.now() + 2000; + while (!explicitEvents.some((e) => e.name === 'node_scope_handle_before_close') && Date.now() < deadline) { + await new Promise((r) => setTimeout(r, 10)); + } + + assert.equal(explicit.close(), true); + assert.equal(explicit.close(), false); + event('node_scope_handle_after_close', scope, null, null); + await new Promise((r) => setTimeout(r, 20)); + assert.equal(explicitEvents.some((e) => e.name === 'node_scope_handle_before_close'), true); + assert.equal(explicitEvents.some((e) => e.name === 'node_scope_handle_after_close'), false); + + const cleanupEvents = []; + const cleanup = scopeSubscribe(scope.uuid, (e) => cleanupEvents.push(e)); + popScope(scope); + assert.equal(cleanup.close(), false); + + deadline = Date.now() + 2000; + while (!cleanupEvents.some((e) => e.name === 'node_scope_handle_owner') && Date.now() < deadline) { + await new Promise((r) => setTimeout(r, 10)); + } + assert.equal(cleanupEvents.some((e) => e.name === 'node_scope_handle_owner'), true); + } catch (error) { + try { + popScope(scope); + } catch {} + throw error; + } + }); + it('subscriber event properties', async () => { let captured = null; registerSubscriber('node_prop_collector', (e) => { diff --git a/crates/python/src/py_api/mod.rs b/crates/python/src/py_api/mod.rs index 4912045d..15e08212 100644 --- a/crates/python/src/py_api/mod.rs +++ b/crates/python/src/py_api/mod.rs @@ -164,6 +164,65 @@ pub fn py_scope_stack_active() -> bool { scope_stack_is_active() } +enum PySubscriptionInner { + Global(core_subscriber_api::SubscriberHandle), + Scope(core_subscriber_api::ScopeSubscriberHandle), +} + +/// Closeable event subscriber registration returned by ``subscribe`` helpers. +#[pyclass(name = "Subscription")] +pub struct PySubscription { + inner: Option, +} + +#[pymethods] +impl PySubscription { + /// Close this subscription. + /// + /// Returns ``True`` when a live subscriber was removed and ``False`` when + /// the subscription had already been closed or scope cleanup already + /// removed it. + fn close(&mut self) -> PyResult { + let Some(inner) = self.inner.as_ref() else { + return Ok(false); + }; + let removed = match inner { + PySubscriptionInner::Global(handle) => handle.close(), + PySubscriptionInner::Scope(handle) => handle.close(), + } + .map_err(to_py_err)?; + self.inner = None; + Ok(removed) + } + + /// Enter a Python context manager. + fn __enter__(slf: PyRefMut<'_, Self>) -> PyRefMut<'_, Self> { + slf + } + + /// Exit a Python context manager, closing the subscription. + fn __exit__( + &mut self, + _exc_type: Option<&Bound<'_, PyAny>>, + _exc: Option<&Bound<'_, PyAny>>, + _traceback: Option<&Bound<'_, PyAny>>, + ) -> PyResult { + let _ = self.close()?; + Ok(false) + } +} + +impl Drop for PySubscription { + fn drop(&mut self) { + if let Some(inner) = self.inner.take() { + let _ = match inner { + PySubscriptionInner::Global(handle) => handle.close(), + PySubscriptionInner::Scope(handle) => handle.close(), + }; + } + } +} + // --------------------------------------------------------------------------- // Scope / handle operations // --------------------------------------------------------------------------- @@ -1235,6 +1294,16 @@ fn register_subscriber(name: &str, callback: Py) -> PyResult<()> { .map_err(to_py_err) } +/// Register an anonymous global event subscriber and return a closeable handle. +#[pyfunction] +fn subscribe(callback: Py) -> PyResult { + core_subscriber_api::subscribe(py_callable::wrap_py_event_subscriber(callback)) + .map(|handle| PySubscription { + inner: Some(PySubscriptionInner::Global(handle)), + }) + .map_err(to_py_err) +} + /// Remove a previously registered event subscriber. /// /// Returns ``True`` if a subscriber with that name was found and removed. @@ -1570,6 +1639,17 @@ fn scope_register_subscriber(scope_uuid: &str, name: &str, callback: Py) .map_err(to_py_err) } +/// Register an anonymous scope-local event subscriber and return a closeable handle. +#[pyfunction] +fn scope_subscribe(scope_uuid: &str, callback: Py) -> PyResult { + let uuid = parse_uuid(scope_uuid)?; + core_subscriber_api::scope_subscribe(&uuid, py_callable::wrap_py_event_subscriber(callback)) + .map(|handle| PySubscription { + inner: Some(PySubscriptionInner::Scope(handle)), + }) + .map_err(to_py_err) +} + /// Remove a previously registered scope-local event subscriber. #[pyfunction] fn scope_deregister_subscriber(scope_uuid: &str, name: &str) -> PyResult { @@ -1583,6 +1663,8 @@ fn scope_deregister_subscriber(scope_uuid: &str, name: &str) -> PyResult { /// Register all API functions into the given `PyModule`. pub fn register(m: &Bound<'_, PyModule>) -> PyResult<()> { + m.add_class::()?; + // Scope stack creation / binding / query m.add_function(wrap_pyfunction!(create_scope_stack, m)?)?; m.add_function(wrap_pyfunction!(set_thread_scope_stack, m)?)?; @@ -1680,6 +1762,7 @@ pub fn register(m: &Bound<'_, PyModule>) -> PyResult<()> { // Subscribers m.add_function(wrap_pyfunction!(register_subscriber, m)?)?; + m.add_function(wrap_pyfunction!(subscribe, m)?)?; m.add_function(wrap_pyfunction!(deregister_subscriber, m)?)?; // Scope-local tool guardrails @@ -1768,6 +1851,7 @@ pub fn register(m: &Bound<'_, PyModule>) -> PyResult<()> { // Scope-local subscribers m.add_function(wrap_pyfunction!(scope_register_subscriber, m)?)?; + m.add_function(wrap_pyfunction!(scope_subscribe, m)?)?; m.add_function(wrap_pyfunction!(scope_deregister_subscriber, m)?)?; // Standalone middleware chains diff --git a/crates/python/tests/coverage/py_plugin_coverage_tests.rs b/crates/python/tests/coverage/py_plugin_coverage_tests.rs index f774d5ea..e62351bd 100644 --- a/crates/python/tests/coverage/py_plugin_coverage_tests.rs +++ b/crates/python/tests/coverage/py_plugin_coverage_tests.rs @@ -8,6 +8,7 @@ use super::*; use std::ffi::CString; use std::sync::{Arc, Mutex}; +use nemo_relay::api::subscriber::deregister_subscriber; use nemo_relay::plugin::rollback_registrations; use pyo3::types::PyModule; use serde_json::json; diff --git a/crates/wasm/src/api/mod.rs b/crates/wasm/src/api/mod.rs index 181d9679..ccd02a10 100644 --- a/crates/wasm/src/api/mod.rs +++ b/crates/wasm/src/api/mod.rs @@ -1375,6 +1375,56 @@ pub fn deregister_llm_stream_execution_intercept(name: &str) -> Result FlowResult { + match inner { + SubscriptionInner::Global(handle) => handle.close(), + SubscriptionInner::Scope(handle) => handle.close(), + } +} + +/// Closeable event subscriber registration. +#[wasm_bindgen(js_name = Subscription)] +pub struct Subscription { + inner: Arc>>, +} + +#[wasm_bindgen(js_class = Subscription)] +impl Subscription { + /// Close this subscription. + /// + /// Returns `true` when a live subscriber was removed and `false` when the + /// subscription was already closed or scope cleanup already removed it. + #[wasm_bindgen(js_name = close)] + pub fn close(&self) -> Result { + let mut guard = self + .inner + .lock() + .map_err(|_| JsValue::from_str("subscription lock poisoned"))?; + let Some(inner) = guard.as_ref() else { + return Ok(false); + }; + let removed = close_subscription_inner(inner).map_err(to_js_err)?; + *guard = None; + Ok(removed) + } +} + +impl Drop for Subscription { + fn drop(&mut self) { + let Ok(mut guard) = self.inner.lock() else { + return; + }; + if let Some(inner) = guard.take() { + let _ = close_subscription_inner(&inner); + } + } +} + /// Registers an event subscriber that receives lifecycle events. /// /// - `name` - Unique subscriber name. @@ -1388,6 +1438,20 @@ pub fn register_subscriber( .map_err(to_js_err) } +/// Registers an anonymous event subscriber and returns a closeable handle. +/// +/// - `callback` - JS function `(event) => void` called for each event. +#[wasm_bindgen(js_name = "subscribe")] +pub fn subscribe( + #[wasm_bindgen(unchecked_param_type = "(event: Json) => any")] callback: Function, +) -> Result { + relay_subscriber_api::subscribe(callable::wrap_js_event_subscriber(callback)) + .map(|handle| Subscription { + inner: Arc::new(Mutex::new(Some(SubscriptionInner::Global(handle)))), + }) + .map_err(to_js_err) +} + /// Removes a previously registered event subscriber by name. /// /// Returns `true` if the subscriber was found and removed. @@ -1887,6 +1951,24 @@ pub fn scope_register_subscriber( .map_err(to_js_err) } +/// Registers an anonymous scope-local event subscriber and returns a closeable handle. +/// +/// - `scope_uuid` - UUID of the scope to register on. +/// - `callback` - JS function `(event) => void` called for each event. +#[wasm_bindgen(js_name = "scopeSubscribe")] +pub fn scope_subscribe( + #[wasm_bindgen(js_name = "scopeUuid")] scope_uuid: &str, + #[wasm_bindgen(unchecked_param_type = "(event: Json) => any")] callback: Function, +) -> Result { + let uuid = uuid::Uuid::parse_str(scope_uuid) + .map_err(|e| JsValue::from_str(&format!("invalid UUID: {e}")))?; + relay_subscriber_api::scope_subscribe(&uuid, callable::wrap_js_event_subscriber(callback)) + .map(|handle| Subscription { + inner: Arc::new(Mutex::new(Some(SubscriptionInner::Scope(handle)))), + }) + .map_err(to_js_err) +} + /// Removes a scope-local event subscriber by name. /// /// Returns `true` if the subscriber was found and removed from the specified scope. diff --git a/crates/wasm/tests-js/scope_tests.mjs b/crates/wasm/tests-js/scope_tests.mjs index bacb48d6..0dff7eb8 100644 --- a/crates/wasm/tests-js/scope_tests.mjs +++ b/crates/wasm/tests-js/scope_tests.mjs @@ -134,3 +134,63 @@ test('WebAssembly withScope supports async callbacks', async () => { stack.free(); } }); + +test('WebAssembly subscribe returns a closeable handle', () => { + const stack = resetScopeStack(); + const events = []; + const subscription = wasm.subscribe((event) => events.push(event)); + + try { + wasm.event('wasm_handle_before_close', null, null, null); + assert.equal(subscription.close(), true); + assert.equal(subscription.close(), false); + wasm.event('wasm_handle_after_close', null, null, null); + + assert.deepEqual( + events.map((event) => event.name), + ['wasm_handle_before_close'], + ); + } finally { + subscription.free(); + stack.free(); + } +}); + +test('WebAssembly scopeSubscribe closes and scope pop cleans up', () => { + const stack = resetScopeStack(); + const scope = wasm.pushScope('wasm_scope_handle_owner', wasm.ScopeType.Agent, null, 0, null, null); + let popped = false; + + try { + const explicitEvents = []; + const explicit = wasm.scopeSubscribe(scope.uuid, (event) => explicitEvents.push(event)); + wasm.event('wasm_scope_handle_before_close', scope, null, null); + assert.equal(explicit.close(), true); + assert.equal(explicit.close(), false); + wasm.event('wasm_scope_handle_after_close', scope, null, null); + explicit.free(); + + assert.deepEqual( + explicitEvents.map((event) => event.name), + ['wasm_scope_handle_before_close'], + ); + + const cleanupEvents = []; + const cleanup = wasm.scopeSubscribe(scope.uuid, (event) => cleanupEvents.push(event)); + wasm.popScope(scope); + popped = true; + assert.equal(cleanup.close(), false); + cleanup.free(); + + assert.deepEqual( + cleanupEvents.map((event) => event.name), + ['wasm_scope_handle_owner'], + ); + } finally { + if (!popped) { + wasm.popScope(scope); + } + scope.free(); + stack.free(); + } +}); diff --git a/crates/wasm/wrappers/esm/index.js b/crates/wasm/wrappers/esm/index.js index ea932fb1..4bea99eb 100644 --- a/crates/wasm/wrappers/esm/index.js +++ b/crates/wasm/wrappers/esm/index.js @@ -14,6 +14,7 @@ export { ScopeHandle, ScopeStack, ScopeType, + Subscription, ToolHandle, activePluginReport, clearLastCallbackError, @@ -86,7 +87,9 @@ export { scopeRegisterToolSanitizeRequestGuardrail, scopeRegisterToolSanitizeResponseGuardrail, scopeStackActive, + scopeSubscribe, setThreadScopeStack, + subscribe, toolCall, toolCallEnd, toolCallExecute, diff --git a/docs/about/concepts/subscribers.md b/docs/about/concepts/subscribers.md index 4f08f7dc..d7d8d6b7 100644 --- a/docs/about/concepts/subscribers.md +++ b/docs/about/concepts/subscribers.md @@ -38,18 +38,93 @@ Global subscribers remain active process-wide until they are removed. Scope-local subscribers are owned by one active scope and disappear when that scope closes. +Scope-local subscription handles remember the owning scope stack. This allows a +caller to close the handle from outside the original current-stack context, while +scope pop still remains the cleanup boundary for any open scope-local +registrations. + ### Plugin-Installed Subscribers Plugins can install subscribers as reusable, configuration-driven runtime components. +## Subscription Handles and Named APIs + +Rust applications can use closeable subscription handles for new code: +`subscribe(callback)` returns a global subscriber handle, and +`scope_subscribe(scope_uuid, callback)` returns a scope-local subscriber handle. +Calling `close()` removes the registration and is idempotent. + +Rust also exposes `nemo_relay::api::subscriber::Subscriber`, a callback-backed +adapter that implements both `tracing::Subscriber` and +`tracing_subscriber::Layer`. Host applications can install it directly as a +tracing subscriber or compose it into an existing `tracing-subscriber` registry +alongside formatting, filtering, OpenTelemetry, or other host layers. + +For Rust libraries that want to consume NeMo Relay events through tracing +subscriber mechanisms, the canonical path is +`nemo_relay::api::subscriber::tracing_layer(callback)`. The layer decodes NeMo +Relay tracing records back into canonical `Event` values and invokes the +callback. Libraries should return or document this layer for host applications +to compose; they should not call `tracing::subscriber::set_global_default`. + +```rust +use std::sync::Arc; +use nemo_relay::api::subscriber; +use tracing_subscriber::prelude::*; + +let nemo_events = subscriber::tracing_layer(Arc::new(|event| { + // Consume the canonical NeMo Relay Event. +})); + +let tracing_stack = tracing_subscriber::registry() + .with(nemo_events); + +// Application code owns installation of the tracing subscriber. +tracing::subscriber::set_global_default(tracing_stack)?; +``` + +Libraries that already implement their own `tracing_subscriber::Layer` can use +`subscriber::is_nemo_relay_event(metadata)` as a target filter and +`subscriber::event_from_tracing(event)` to decode the canonical event payload. +These helpers are the supported way to consume NeMo Relay tracing records +without depending on the raw tracing field layout. + +The Rust named APIs remain available under their original names: +`register_subscriber(name, callback)`, `deregister_subscriber(name)`, +`scope_register_subscriber(scope_uuid, name, callback)`, and +`scope_deregister_subscriber(scope_uuid, name)`. Python, JavaScript, Go, and +WebAssembly keep their existing named APIs unchanged and also expose additive +closeable subscription handles for code that does not need a stable subscriber +name. + ## What Subscribers Consume -Subscribers consume the canonical event stream. They do not define the event -model. They react to it. +Subscribers consume the canonical event stream. The Rust core runtime emits each +runtime event through a structured `tracing` record that carries the canonical +ATOF JSON payload, then delivers the original canonical `Event` directly to +registered callbacks. + +This keeps plain subscribers, exporters, and Rust `tracing` integrations aligned +around one runtime source of truth. + +## Rust `tracing` Base + +The Rust core uses `tracing` and `tracing-subscriber` as the internal emission +substrate for lifecycle events. Library code does not install a global tracing +subscriber; Rust host applications remain responsible for configuring their own +`tracing-subscriber` layers, filters, and exporters. + +For each emitted event, NeMo Relay's core runtime emits `nemo_relay.event` +records through the host's current tracing dispatcher when one is installed. +Scope spans remember the dispatcher that created them so child marks and scope +end events can stay attached to the original host span. Active NeMo subscriber +callbacks stored in the runtime registry receive the original canonical `Event` +directly, avoiding a JSON round trip on the compatibility callback path. -This lets plain subscribers, exporters, and tracing adapters share one runtime -source of truth. +`Event`, ATOF JSON, binding callback delivery, and the built-in ATIF, +OpenTelemetry, OpenInference, and ATOF exporters remain NeMo-owned public +behavior. Missing host subscribers do not change runtime behavior. ## Common Subscriber Roles diff --git a/docs/conf.py b/docs/conf.py index 6035b9fa..cc3de0d0 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -632,10 +632,18 @@ def _strip_rustdoc_hostile_lines(crate_dir: Path) -> None: skip_saw_brace = False skip_brace_depth = 0 skip_paren_depth = 0 + skip_attr = False + skip_attr_bracket_depth = 0 for line in _read_text(rust_file).splitlines(): line_stripped = line.strip() + if skip_attr: + skip_attr_bracket_depth += line.count("[") - line.count("]") + if skip_attr_bracket_depth <= 0: + skip_attr = False + continue + if skip_item: skip_item, skip_brace_depth, skip_paren_depth, skip_saw_brace = _rust_skip_item_continues( line, @@ -653,11 +661,12 @@ def _strip_rustdoc_hostile_lines(crate_dir: Path) -> None: skip_paren_depth = 0 continue - if ( - line.lstrip().startswith("#[") - or line_stripped in _REMOVED_RUST_LINES - or _TEST_MODULE_DECL.fullmatch(line_stripped) is not None - ): + if line_stripped in _REMOVED_RUST_LINES or _TEST_MODULE_DECL.fullmatch(line_stripped) is not None: + continue + + if line.lstrip().startswith("#["): + skip_attr_bracket_depth = line.count("[") - line.count("]") + skip_attr = skip_attr_bracket_depth > 0 continue stripped.append(line) diff --git a/go/nemo_relay/callbacks_test.go b/go/nemo_relay/callbacks_test.go index 0da74a49..6157d6ce 100644 --- a/go/nemo_relay/callbacks_test.go +++ b/go/nemo_relay/callbacks_test.go @@ -5,6 +5,7 @@ package nemo_relay import ( "encoding/json" + "sync" "testing" ) @@ -32,3 +33,93 @@ func TestRegisterAndUnregisterClosure(t *testing.T) { t.Fatal("closure registry still contains callback after unregister") } } + +func closureRegistryLen() int { + closureRegistryMu.Lock() + defer closureRegistryMu.Unlock() + return len(closureRegistry) +} + +func TestScopeSubscribeInvalidUUIDDoesNotLeakClosure(t *testing.T) { + before := closureRegistryLen() + + subscription, err := ScopeSubscribe("not-a-uuid", func(event Event) { + _ = event + }) + if err == nil { + t.Fatal("expected ScopeSubscribe to fail for invalid UUID") + } + if subscription != nil { + t.Fatal("expected no subscription for invalid UUID") + } + + after := closureRegistryLen() + if after != before { + t.Fatalf("closure registry length changed after invalid ScopeSubscribe: before=%d after=%d", before, after) + } +} + +func TestScopeSubscribeMissingScopeDoesNotLeakClosure(t *testing.T) { + before := closureRegistryLen() + + subscription, err := ScopeSubscribe("00000000-0000-0000-0000-000000000000", func(event Event) { + _ = event + }) + if err == nil { + t.Fatal("expected ScopeSubscribe to fail for missing scope") + } + if subscription != nil { + t.Fatal("expected no subscription for missing scope") + } + + after := closureRegistryLen() + if after != before { + t.Fatalf("closure registry length changed after missing-scope ScopeSubscribe: before=%d after=%d", before, after) + } +} + +func TestSubscriptionCloseConcurrentIsIdempotent(t *testing.T) { + subscription, err := Subscribe(func(event Event) { + _ = event + }) + if err != nil { + t.Fatalf("Subscribe failed: %v", err) + } + + var wg sync.WaitGroup + results := make(chan struct { + closed bool + err error + }, 32) + for i := 0; i < 32; i++ { + wg.Add(1) + go func() { + defer wg.Done() + closed, err := subscription.Close() + results <- struct { + closed bool + err error + }{closed: closed, err: err} + }() + } + wg.Wait() + close(results) + + closedCount := 0 + for result := range results { + if result.err != nil { + t.Fatalf("Close failed: %v", result.err) + } + if result.closed { + closedCount++ + } + } + if closedCount != 1 { + t.Fatalf("expected exactly one concurrent Close to close the handle, got %d", closedCount) + } + if closed, err := subscription.Close(); err != nil { + t.Fatalf("Close after concurrent closes failed: %v", err) + } else if closed { + t.Fatal("Close after concurrent closes reported a live close") + } +} diff --git a/go/nemo_relay/nemo_relay.go b/go/nemo_relay/nemo_relay.go index aa5f1a94..a5ecaac8 100644 --- a/go/nemo_relay/nemo_relay.go +++ b/go/nemo_relay/nemo_relay.go @@ -38,6 +38,7 @@ typedef struct FfiLLMRequest FfiLLMRequest; typedef struct FfiEvent FfiEvent; typedef struct FfiStream FfiStream; typedef struct FfiCodecHandle FfiCodecHandle; +typedef struct FfiSubscriptionHandle FfiSubscriptionHandle; typedef void (*NemoRelayFreeFn)(void* user_data); @@ -163,6 +164,9 @@ extern int32_t nemo_relay_deregister_llm_stream_execution_intercept(const char* typedef void (*NemoRelayEventSubscriberFn)(void* user_data, const FfiEvent* event); extern int32_t nemo_relay_register_subscriber(const char* name, NemoRelayEventSubscriberFn cb, void* user_data, NemoRelayFreeFn free_fn); extern int32_t nemo_relay_deregister_subscriber(const char* name); +extern int32_t nemo_relay_subscribe(NemoRelayEventSubscriberFn cb, void* user_data, NemoRelayFreeFn free_fn, FfiSubscriptionHandle** out); +extern int32_t nemo_relay_subscription_close(FfiSubscriptionHandle* handle, bool* removed); +extern void nemo_relay_subscription_free(FfiSubscriptionHandle* handle); // Scope-local tool guardrails extern int32_t nemo_relay_scope_register_tool_sanitize_request_guardrail(const char* scope_uuid, const char* name, int32_t priority, NemoRelayToolSanitizeFn cb, void* user_data, NemoRelayFreeFn free_fn); @@ -197,6 +201,7 @@ extern int32_t nemo_relay_scope_deregister_llm_stream_execution_intercept(const // Scope-local subscribers extern int32_t nemo_relay_scope_register_subscriber(const char* scope_uuid, const char* name, NemoRelayEventSubscriberFn cb, void* user_data, NemoRelayFreeFn free_fn); extern int32_t nemo_relay_scope_deregister_subscriber(const char* scope_uuid, const char* name); +extern int32_t nemo_relay_scope_subscribe(const char* scope_uuid, NemoRelayEventSubscriberFn cb, void* user_data, NemoRelayFreeFn free_fn, FfiSubscriptionHandle** out); // Standalone middleware chains extern int32_t nemo_relay_tool_request_intercepts(const char* name, const char* args_json, char** out); @@ -275,6 +280,7 @@ import ( "encoding/json" "errors" "runtime" + "sync" "time" "unsafe" ) @@ -289,6 +295,46 @@ func checkedValue[T any](status int32, value T) (T, error) { return value, nil } +func validateUUID(value string) error { + if len(value) >= len("urn:uuid:") && value[:len("urn:uuid:")] == "urn:uuid:" { + value = value[len("urn:uuid:"):] + } + if len(value) == 38 && value[0] == '{' && value[len(value)-1] == '}' { + value = value[1 : len(value)-1] + } + switch len(value) { + case 32: + for _, ch := range value { + if !isHexDigit(ch) { + return errors.New("invalid UUID") + } + } + return nil + case 36: + for index, ch := range value { + switch index { + case 8, 13, 18, 23: + if ch != '-' { + return errors.New("invalid UUID") + } + default: + if !isHexDigit(ch) { + return errors.New("invalid UUID") + } + } + } + return nil + default: + return errors.New("invalid UUID") + } +} + +func isHexDigit(ch rune) bool { + return ('0' <= ch && ch <= '9') || + ('a' <= ch && ch <= 'f') || + ('A' <= ch && ch <= 'F') +} + var ( getHandleFunc = func() (*ScopeHandle, error) { var out *C.FfiScopeHandle @@ -1447,6 +1493,68 @@ func DeregisterLlmStreamExecutionIntercept(name string) error { // Subscriber registration // --------------------------------------------------------------------------- +// Subscription owns an event subscriber registration returned by [Subscribe] or +// [ScopeSubscribe]. Close removes the registration and releases the callback. +type Subscription struct { + mu sync.Mutex + ptr *C.FfiSubscriptionHandle +} + +func newSubscription(ptr *C.FfiSubscriptionHandle) *Subscription { + if ptr == nil { + return nil + } + sub := &Subscription{ptr: ptr} + runtime.SetFinalizer(sub, func(sub *Subscription) { + _, _ = sub.Close() + }) + return sub +} + +// Close removes the subscription registration. Close is idempotent; calling it +// after the subscription has already been removed is a no-op. The boolean is +// true only when this call closed a live subscription handle. +func (s *Subscription) Close() (bool, error) { + if s == nil { + return false, nil + } + s.mu.Lock() + defer s.mu.Unlock() + if s.ptr == nil { + return false, nil + } + ptr := s.ptr + + var removed C.bool + status := C.nemo_relay_subscription_close(ptr, &removed) + if err := checkStatus(status); err != nil { + return false, err + } + C.nemo_relay_subscription_free(ptr) + s.ptr = nil + runtime.SetFinalizer(s, nil) + return bool(removed), nil +} + +// Subscribe registers an anonymous event subscriber and returns a closeable +// handle. The callback is called for every lifecycle event (Start, End, Mark) +// emitted by the runtime and receives an owned [Event] snapshot that is safe to +// retain after the callback returns. +func Subscribe(fn EventSubscriberFunc) (*Subscription, error) { + id := registerClosure(fn) + var out *C.FfiSubscriptionHandle + status := C.nemo_relay_subscribe( + C.NemoRelayEventSubscriberFn(C.goEventSubscriberTrampoline), + id, + C.NemoRelayFreeFn(C.goFreeTrampoline), + &out, + ) + if status != 0 { + unregisterClosure(id) + } + return checkedValue(int32(status), newSubscription(out)) +} + // RegisterSubscriber registers a named event subscriber that will be called for // every lifecycle event (Start, End, Mark) emitted by the runtime. Subscribers // are identified by a unique name; registering a duplicate returns an @@ -2300,6 +2408,30 @@ func ScopeDeregisterLlmStreamExecutionIntercept(scopeUUID, name string) error { // Scope-local subscriber registration // --------------------------------------------------------------------------- +// ScopeSubscribe registers an anonymous scope-local event subscriber and +// returns a closeable handle. The subscriber receives events visible to the +// given scope and is also removed automatically when that scope is popped. +func ScopeSubscribe(scopeUUID string, fn EventSubscriberFunc) (*Subscription, error) { + if err := validateUUID(scopeUUID); err != nil { + return nil, err + } + id := registerClosure(fn) + cScopeUUID := C.CString(scopeUUID) + defer C.free(unsafe.Pointer(cScopeUUID)) + var out *C.FfiSubscriptionHandle + status := C.nemo_relay_scope_subscribe( + cScopeUUID, + C.NemoRelayEventSubscriberFn(C.goEventSubscriberTrampoline), + id, + C.NemoRelayFreeFn(C.goFreeTrampoline), + &out, + ) + if status != 0 { + unregisterClosure(id) + } + return checkedValue(int32(status), newSubscription(out)) +} + // ScopeRegisterSubscriber registers a scope-local event subscriber. The // callback receives an owned [Event] snapshot that is safe to retain after the // callback returns. diff --git a/go/nemo_relay/otel_test.go b/go/nemo_relay/otel_test.go index f07bc292..03e08309 100644 --- a/go/nemo_relay/otel_test.go +++ b/go/nemo_relay/otel_test.go @@ -118,24 +118,32 @@ func TestOpenTelemetrySubscriberExportsScopeLifecycleAndMarks(t *testing.T) { } defer func() { _ = subscriber.Deregister(name) }() - handle, err := PushScope("otel_scope", ScopeTypeAgent) + stack, err := NewScopeStack() if err != nil { - t.Fatalf("PushScope failed: %v", err) - } - if err := EmitEvent( - "otel_mark", - WithEventParent(handle), - WithEventData(json.RawMessage(`{"step":1}`)), - WithEventMetadata(json.RawMessage(`{"source":"go"}`)), - ); err != nil { - t.Fatalf("EmitEvent failed: %v", err) - } - if err := PopScope(handle); err != nil { - t.Fatalf("PopScope failed: %v", err) - } - if err := subscriber.ForceFlush(); err != nil { - t.Fatalf("ForceFlush failed: %v", err) + t.Fatalf("NewScopeStack failed: %v", err) } + defer stack.Close() + + stack.Run(func() { + handle, err := PushScope("otel_scope", ScopeTypeAgent) + if err != nil { + t.Fatalf("PushScope failed: %v", err) + } + if err := EmitEvent( + "otel_mark", + WithEventParent(handle), + WithEventData(json.RawMessage(`{"step":1}`)), + WithEventMetadata(json.RawMessage(`{"source":"go"}`)), + ); err != nil { + t.Fatalf("EmitEvent failed: %v", err) + } + if err := PopScope(handle); err != nil { + t.Fatalf("PopScope failed: %v", err) + } + if err := subscriber.ForceFlush(); err != nil { + t.Fatalf("ForceFlush failed: %v", err) + } + }) select { case request := <-requests: diff --git a/go/nemo_relay/subscribers/subscribers.go b/go/nemo_relay/subscribers/subscribers.go index 26382c1a..55ba4082 100644 --- a/go/nemo_relay/subscribers/subscribers.go +++ b/go/nemo_relay/subscribers/subscribers.go @@ -5,26 +5,37 @@ // registration. // // Subscribers receive discriminated lifecycle events emitted by the runtime as -// scopes, tool calls, and LLM calls progress. Each subscriber is identified by -// a unique name. +// scopes, tool calls, and LLM calls progress. Subscribers can be named for +// explicit deregistration or anonymous with a closeable subscription handle. // // Example usage: // -// import "github.com/NVIDIA/NeMo-Relay/go/nemo_relay/subscribers" +// import ( +// "fmt" // -// // Register a subscriber that logs every event. -// err := subscribers.Register("logger", func(event nemo_relay.Event) { +// "github.com/NVIDIA/NeMo-Relay/go/nemo_relay" +// "github.com/NVIDIA/NeMo-Relay/go/nemo_relay/subscribers" +// ) +// +// // Subscribe logs every event until the handle is closed. +// sub, err := subscribers.Subscribe(func(event nemo_relay.Event) { // fmt.Printf("[%s] %s: %s\n", event.Timestamp(), event.Kind(), event.Name()) // }) // // // Later, remove it. -// _ = subscribers.Deregister("logger") +// _, _ = sub.Close() package subscribers import ( "github.com/NVIDIA/NeMo-Relay/go/nemo_relay" ) +// Subscribe registers an anonymous event subscriber and returns a closeable +// handle. This is a shorthand for [nemo_relay.Subscribe]. +func Subscribe(fn nemo_relay.EventSubscriberFunc) (*nemo_relay.Subscription, error) { + return nemo_relay.Subscribe(fn) +} + // Register registers a named event subscriber that will be called for every // lifecycle event emitted by the runtime. The name must be unique; // registering a duplicate returns an AlreadyExists error. The callback @@ -42,6 +53,13 @@ func Deregister(name string) error { return nemo_relay.DeregisterSubscriber(name) } +// ScopeSubscribe registers an anonymous scope-local event subscriber and +// returns a closeable handle. This is a shorthand for +// [nemo_relay.ScopeSubscribe]. +func ScopeSubscribe(scopeUUID string, fn nemo_relay.EventSubscriberFunc) (*nemo_relay.Subscription, error) { + return nemo_relay.ScopeSubscribe(scopeUUID, fn) +} + // ScopeRegister registers a scope-local event subscriber that will be called // for lifecycle events within the given scope. This is a shorthand for // [nemo_relay.ScopeRegisterSubscriber]. diff --git a/go/nemo_relay/subscribers/subscribers_test.go b/go/nemo_relay/subscribers/subscribers_test.go index 9dcdc6da..ce7fdeeb 100644 --- a/go/nemo_relay/subscribers/subscribers_test.go +++ b/go/nemo_relay/subscribers/subscribers_test.go @@ -78,6 +78,43 @@ func TestSubscriberShorthands(t *testing.T) { mu.Unlock() } +func TestSubscriptionHandleShorthand(t *testing.T) { + var events []string + var mu sync.Mutex + + subscription, err := subscriberspkg.Subscribe(func(event nemo_relay.Event) { + mu.Lock() + events = append(events, event.Name()) + mu.Unlock() + }) + if err != nil { + t.Fatalf("Subscribe failed: %v", err) + } + + if err := nemo_relay.EmitEvent("subs_handle_before_close"); err != nil { + t.Fatalf("EmitEvent failed: %v", err) + } + if closed, err := subscription.Close(); err != nil { + t.Fatalf("Subscription Close failed: %v", err) + } else if !closed { + t.Fatal("Subscription Close did not report a live close") + } + if closed, err := subscription.Close(); err != nil { + t.Fatalf("second Subscription Close failed: %v", err) + } else if closed { + t.Fatal("second Subscription Close reported a live close") + } + if err := nemo_relay.EmitEvent("subs_handle_after_close"); err != nil { + t.Fatalf("EmitEvent failed: %v", err) + } + + mu.Lock() + defer mu.Unlock() + if len(events) != 1 || events[0] != "subs_handle_before_close" { + t.Fatalf("unexpected subscription events: %#v", events) + } +} + func TestScopeSubscriberShorthands(t *testing.T) { stack, err := nemo_relay.NewScopeStack() if err != nil { @@ -98,3 +135,83 @@ func TestScopeSubscriberShorthands(t *testing.T) { } }) } + +func TestScopeSubscriptionHandleShorthand(t *testing.T) { + stack, err := nemo_relay.NewScopeStack() + if err != nil { + t.Fatalf("NewScopeStack failed: %v", err) + } + defer stack.Close() + + stack.Run(func() { + handle, err := nemo_relay.PushScope("subs_scope_handle_owner", nemo_relay.ScopeTypeAgent) + if err != nil { + t.Fatalf("PushScope failed: %v", err) + } + popped := false + defer func() { + if !popped { + _ = nemo_relay.PopScope(handle) + } + }() + + var explicitEvents []string + var explicitMu sync.Mutex + explicit, err := subscriberspkg.ScopeSubscribe(handle.UUID(), func(event nemo_relay.Event) { + explicitMu.Lock() + explicitEvents = append(explicitEvents, event.Name()) + explicitMu.Unlock() + }) + if err != nil { + t.Fatalf("ScopeSubscribe failed: %v", err) + } + if err := nemo_relay.EmitEvent("subs_scope_handle_before_close"); err != nil { + t.Fatalf("EmitEvent failed: %v", err) + } + if closed, err := explicit.Close(); err != nil { + t.Fatalf("Scope subscription Close failed: %v", err) + } else if !closed { + t.Fatal("Scope subscription Close did not report a live close") + } + if closed, err := explicit.Close(); err != nil { + t.Fatalf("second Scope subscription Close failed: %v", err) + } else if closed { + t.Fatal("second Scope subscription Close reported a live close") + } + if err := nemo_relay.EmitEvent("subs_scope_handle_after_close"); err != nil { + t.Fatalf("EmitEvent failed: %v", err) + } + + explicitMu.Lock() + if len(explicitEvents) != 1 || explicitEvents[0] != "subs_scope_handle_before_close" { + t.Fatalf("unexpected explicit scope subscription events: %#v", explicitEvents) + } + explicitMu.Unlock() + + var cleanupEvents []string + var cleanupMu sync.Mutex + cleanup, err := subscriberspkg.ScopeSubscribe(handle.UUID(), func(event nemo_relay.Event) { + cleanupMu.Lock() + cleanupEvents = append(cleanupEvents, event.Name()) + cleanupMu.Unlock() + }) + if err != nil { + t.Fatalf("ScopeSubscribe cleanup failed: %v", err) + } + if err := nemo_relay.PopScope(handle); err != nil { + t.Fatalf("PopScope failed: %v", err) + } + popped = true + if closed, err := cleanup.Close(); err != nil { + t.Fatalf("cleanup subscription Close failed: %v", err) + } else if closed { + t.Fatal("cleanup subscription Close reported a live close after scope cleanup") + } + + cleanupMu.Lock() + defer cleanupMu.Unlock() + if len(cleanupEvents) != 1 || cleanupEvents[0] != "subs_scope_handle_owner" { + t.Fatalf("unexpected cleanup scope subscription events: %#v", cleanupEvents) + } + }) +} diff --git a/python/nemo_relay/__init__.py b/python/nemo_relay/__init__.py index 733caeb3..ecc48cfe 100644 --- a/python/nemo_relay/__init__.py +++ b/python/nemo_relay/__init__.py @@ -22,7 +22,8 @@ - scope stack helpers such as ``get_scope_stack()``, ``create_scope_stack()``, ``set_thread_scope_stack()``, and ``scope_stack_active()`` - native runtime types such as ``ScopeHandle``, ``ToolHandle``, ``LLMHandle``, - ``LLMRequest``, ``ScopeType``, and the lifecycle event classes + ``LLMRequest``, ``ScopeType``, ``Subscription``, and the lifecycle event + classes - observability helpers such as ``AtifExporter``, ``AtofExporter``, ``OpenTelemetrySubscriber``, and ``OpenInferenceSubscriber`` - JSON and callback type aliases used by middleware, typed wrappers, and @@ -97,6 +98,7 @@ async def main(): ScopeHandle, ScopeStack, ScopeType, + Subscription, ToolAttributes, ToolHandle, ) @@ -441,6 +443,7 @@ def worker() -> None: "ScopeEvent", "MarkEvent", "ScopeHandle", + "Subscription", "ToolHandle", "LLMHandle", "LLMRequest", diff --git a/python/nemo_relay/__init__.pyi b/python/nemo_relay/__init__.pyi index 5ddde9b6..f2129d3b 100644 --- a/python/nemo_relay/__init__.pyi +++ b/python/nemo_relay/__init__.pyi @@ -94,6 +94,9 @@ from nemo_relay._native import ( from nemo_relay._native import ( ScopeType as ScopeType, ) +from nemo_relay._native import ( + Subscription as Subscription, +) from nemo_relay._native import ( ToolAttributes as ToolAttributes, ) diff --git a/python/nemo_relay/_native.pyi b/python/nemo_relay/_native.pyi index 5262919d..c7f04bc3 100644 --- a/python/nemo_relay/_native.pyi +++ b/python/nemo_relay/_native.pyi @@ -724,6 +724,23 @@ class AtifExporter: """Clear collected events without changing subscriber registration.""" ... +class Subscription: + """Closeable event subscriber registration.""" + + def close(self) -> bool: + """Close the subscription. + + Returns: + ``True`` if a live subscriber was removed, otherwise ``False``. + """ + ... + def __enter__(self) -> "Subscription": + """Enter a context manager and return this subscription.""" + ... + def __exit__(self, exc_type: object, exc: object, traceback: object) -> bool: + """Close the subscription when leaving a context manager.""" + ... + class AtofExporterMode: """File write mode for ``AtofExporter``.""" @@ -1756,6 +1773,17 @@ def register_subscriber(name: str, callback: Callable[[ScopeEvent | MarkEvent], """ ... +def subscribe(callback: Callable[[ScopeEvent | MarkEvent], None]) -> Subscription: + """Register an anonymous global event subscriber callback. + + Args: + callback: Function invoked for each emitted scope or mark event. + + Returns: + A closeable subscription handle. + """ + ... + def deregister_subscriber(name: str) -> bool: """Remove a global event subscriber. @@ -2065,6 +2093,21 @@ def scope_register_subscriber( """ ... +def scope_subscribe( + scope_uuid: str, + callback: Callable[[ScopeEvent | MarkEvent], None], +) -> Subscription: + """Register an anonymous scope-local event subscriber callback. + + Args: + scope_uuid: UUID of the owning scope. + callback: Event callback used while the owning scope is active. + + Returns: + A closeable subscription handle. + """ + ... + def scope_deregister_subscriber(scope_uuid: str, name: str) -> bool: """Remove a scope-local event subscriber. diff --git a/python/nemo_relay/scope_local.py b/python/nemo_relay/scope_local.py index 909a5bcc..b9d4eec8 100644 --- a/python/nemo_relay/scope_local.py +++ b/python/nemo_relay/scope_local.py @@ -91,6 +91,9 @@ def redact(tool_name, args): from nemo_relay._native import ( scope_register_tool_sanitize_response_guardrail as _register_tool_sanitize_response, ) +from nemo_relay._native import ( + scope_subscribe as _subscribe, +) # --------------------------------------------------------------------------- # Tool guardrails (scope-local) @@ -588,6 +591,22 @@ def log_event(event): return _register_subscriber(scope_handle.uuid, name, callback) +def subscribe(scope_handle, callback): + """Register a closeable event subscriber for ``scope_handle``. + + Args: + scope_handle: Owning scope handle. The registration is removed when + this scope is popped. + callback: Callable invoked as ``callback(event)`` for each lifecycle + event emitted while the scope remains active. + + Returns: + Subscription: A native handle with ``close()`` and context-manager + support. + """ + return _subscribe(scope_handle.uuid, callback) + + def deregister_subscriber(scope_handle, name): """Remove a scope-local event subscriber. @@ -634,5 +653,6 @@ def deregister_subscriber(scope_handle, name): "deregister_llm_stream_execution", # Subscribers "register_subscriber", + "subscribe", "deregister_subscriber", ] diff --git a/python/nemo_relay/subscribers.py b/python/nemo_relay/subscribers.py index 33b96668..a96395e8 100644 --- a/python/nemo_relay/subscribers.py +++ b/python/nemo_relay/subscribers.py @@ -31,9 +31,12 @@ def log_event(event): from nemo_relay._native import ( register_subscriber as _native_register, ) +from nemo_relay._native import ( + subscribe as _native_subscribe, +) if TYPE_CHECKING: - from nemo_relay import Event + from nemo_relay import Event, Subscription def register(name: str, callback: "Callable[[Event], None]") -> None: @@ -59,6 +62,27 @@ def register(name: str, callback: "Callable[[Event], None]") -> None: return _native_register(name, callback) +def subscribe(callback: "Callable[[Event], None]") -> "Subscription": + """Register a global event subscriber and return a closeable handle. + + Args: + callback: Callable invoked as ``callback(event)`` for every emitted + lifecycle event. + + Returns: + Subscription: A native handle with ``close()`` and context-manager + support. + + Example:: + + import nemo_relay + + with nemo_relay.subscribers.subscribe(lambda event: print(event.kind)): + nemo_relay.scope.event("checkpoint") + """ + return _native_subscribe(callback) + + def deregister(name: str) -> bool: """Remove a previously registered global subscriber. @@ -83,4 +107,4 @@ def deregister(name: str) -> bool: return _native_deregister(name) -__all__ = ["register", "deregister"] +__all__ = ["register", "subscribe", "deregister"] diff --git a/python/tests/test_subscribers.py b/python/tests/test_subscribers.py index 02954cbd..29c55262 100644 --- a/python/tests/test_subscribers.py +++ b/python/tests/test_subscribers.py @@ -15,6 +15,7 @@ ScopeType, llm, scope, + scope_local, subscribers, tools, ) @@ -64,6 +65,47 @@ def test_duplicate_subscriber_raises(self): def test_deregister_nonexistent(self): assert not subscribers.deregister("nonexistent_sub") + def test_subscribe_handle_close_is_idempotent(self): + events = [] + subscription = subscribers.subscribe(lambda e: events.append(e)) + scope.event("py_handle_before_close") + assert subscription.close() + assert not subscription.close() + scope.event("py_handle_after_close") + + assert [e.name for e in events] == ["py_handle_before_close"] + + def test_subscribe_context_manager_closes(self): + events = [] + with subscribers.subscribe(lambda e: events.append(e)) as subscription: + scope.event("py_handle_context") + + assert not subscription.close() + scope.event("py_handle_context_after") + assert [e.name for e in events] == ["py_handle_context"] + + def test_scope_subscribe_handle_close_and_scope_cleanup(self): + handle = scope.push("py_scope_handle_owner", ScopeType.Agent) + popped = False + try: + explicit_events = [] + explicit = scope_local.subscribe(handle, lambda e: explicit_events.append(e)) + scope.event("py_scope_handle_before_close", handle=handle) + assert explicit.close() + assert not explicit.close() + scope.event("py_scope_handle_after_close", handle=handle) + assert [e.name for e in explicit_events] == ["py_scope_handle_before_close"] + + cleanup_events = [] + cleanup = scope_local.subscribe(handle, lambda e: cleanup_events.append(e)) + scope.pop(handle) + popped = True + assert not cleanup.close() + assert [e.name for e in cleanup_events] == ["py_scope_handle_owner"] + finally: + if not popped: + scope.pop(handle) + class TestSubscriberEventDetails: def test_scope_events_have_correct_types(self):