44 * SPDX-License-Identifier: Apache-2.0
55 */
66
7+ import { randomUUID } from 'node:crypto' ;
78import { EventEmitter } from 'node:events' ;
89import type { PolicyEngine } from '../policy/policy-engine.js' ;
9- import { PolicyDecision } from '../policy/types.js' ;
10- import { MessageBusType , type Message } from './types.js' ;
10+ import { PolicyDecision , getHookSource } from '../policy/types.js' ;
11+ import {
12+ MessageBusType ,
13+ type Message ,
14+ type HookExecutionRequest ,
15+ type HookPolicyDecision ,
16+ } from './types.js' ;
1117import { safeJsonStringify } from '../utils/safeJsonStringify.js' ;
1218
1319export class MessageBus extends EventEmitter {
@@ -83,6 +89,39 @@ export class MessageBus extends EventEmitter {
8389 default :
8490 throw new Error ( `Unknown policy decision: ${ decision } ` ) ;
8591 }
92+ } else if ( message . type === MessageBusType . HOOK_EXECUTION_REQUEST ) {
93+ // Handle hook execution requests through policy evaluation
94+ const hookRequest = message as HookExecutionRequest ;
95+ const decision = await this . policyEngine . checkHook ( hookRequest ) ;
96+
97+ // Map decision to allow/deny for observability (ASK_USER treated as deny for hooks)
98+ const effectiveDecision =
99+ decision === PolicyDecision . ALLOW ? 'allow' : 'deny' ;
100+
101+ // Emit policy decision for observability
102+ this . emitMessage ( {
103+ type : MessageBusType . HOOK_POLICY_DECISION ,
104+ eventName : hookRequest . eventName ,
105+ hookSource : getHookSource ( hookRequest . input ) ,
106+ decision : effectiveDecision ,
107+ reason :
108+ decision !== PolicyDecision . ALLOW
109+ ? 'Hook execution denied by policy'
110+ : undefined ,
111+ } as HookPolicyDecision ) ;
112+
113+ // If allowed, emit the request for hook system to handle
114+ if ( decision === PolicyDecision . ALLOW ) {
115+ this . emitMessage ( message ) ;
116+ } else {
117+ // If denied or ASK_USER, emit error response (hooks don't support interactive confirmation)
118+ this . emitMessage ( {
119+ type : MessageBusType . HOOK_EXECUTION_RESPONSE ,
120+ correlationId : hookRequest . correlationId ,
121+ success : false ,
122+ error : new Error ( 'Hook execution denied by policy' ) ,
123+ } ) ;
124+ }
86125 } else {
87126 // For all other message types, just emit them
88127 this . emitMessage ( message ) ;
@@ -105,4 +144,46 @@ export class MessageBus extends EventEmitter {
105144 ) : void {
106145 this . off ( type , listener ) ;
107146 }
147+
148+ /**
149+ * Request-response pattern: Publish a message and wait for a correlated response
150+ * This enables synchronous-style communication over the async MessageBus
151+ * The correlation ID is generated internally and added to the request
152+ */
153+ async request < TRequest extends Message , TResponse extends Message > (
154+ request : Omit < TRequest , 'correlationId' > ,
155+ responseType : TResponse [ 'type' ] ,
156+ timeoutMs : number = 60000 ,
157+ ) : Promise < TResponse > {
158+ const correlationId = randomUUID ( ) ;
159+
160+ return new Promise < TResponse > ( ( resolve , reject ) => {
161+ const timeoutId = setTimeout ( ( ) => {
162+ cleanup ( ) ;
163+ reject ( new Error ( `Request timed out waiting for ${ responseType } ` ) ) ;
164+ } , timeoutMs ) ;
165+
166+ const cleanup = ( ) => {
167+ clearTimeout ( timeoutId ) ;
168+ this . unsubscribe ( responseType , responseHandler ) ;
169+ } ;
170+
171+ const responseHandler = ( response : TResponse ) => {
172+ // Check if this response matches our request
173+ if (
174+ 'correlationId' in response &&
175+ response . correlationId === correlationId
176+ ) {
177+ cleanup ( ) ;
178+ resolve ( response ) ;
179+ }
180+ } ;
181+
182+ // Subscribe to responses
183+ this . subscribe < TResponse > ( responseType , responseHandler ) ;
184+
185+ // Publish the request with correlation ID
186+ this . publish ( { ...request , correlationId } as TRequest ) ;
187+ } ) ;
188+ }
108189}
0 commit comments