@@ -9,22 +9,31 @@ use tokio_stream::StreamExt;
99use tower:: { service_fn, MakeService , Service , ServiceExt } ;
1010use tracing:: { error, trace} ;
1111
12- use crate :: { logs:: * , requests, Error , ExtensionError , LambdaEvent , NextEvent } ;
12+ use crate :: {
13+ logs:: * ,
14+ requests:: { self , Api } ,
15+ telemetry_wrapper, Error , ExtensionError , LambdaEvent , LambdaTelemetry , NextEvent ,
16+ } ;
1317
1418const DEFAULT_LOG_PORT_NUMBER : u16 = 9002 ;
19+ const DEFAULT_TELEMETRY_PORT_NUMBER : u16 = 9003 ;
1520
16- /// An Extension that runs event and log processors
17- pub struct Extension < ' a , E , L > {
21+ /// An Extension that runs event, log and telemetry processors
22+ pub struct Extension < ' a , E , L , T > {
1823 extension_name : Option < & ' a str > ,
1924 events : Option < & ' a [ & ' a str ] > ,
2025 events_processor : E ,
2126 log_types : Option < & ' a [ & ' a str ] > ,
2227 logs_processor : Option < L > ,
2328 log_buffering : Option < LogBuffering > ,
2429 log_port_number : u16 ,
30+ telemetry_types : Option < & ' a [ & ' a str ] > ,
31+ telemetry_processor : Option < T > ,
32+ telemetry_buffering : Option < LogBuffering > ,
33+ telemetry_port_number : u16 ,
2534}
2635
27- impl < ' a > Extension < ' a , Identity < LambdaEvent > , MakeIdentity < Vec < LambdaLog > > > {
36+ impl < ' a > Extension < ' a , Identity < LambdaEvent > , MakeIdentity < Vec < LambdaLog > > , MakeIdentity < Vec < LambdaTelemetry > > > {
2837 /// Create a new base [`Extension`] with a no-op events processor
2938 pub fn new ( ) -> Self {
3039 Extension {
@@ -35,17 +44,23 @@ impl<'a> Extension<'a, Identity<LambdaEvent>, MakeIdentity<Vec<LambdaLog>>> {
3544 log_buffering : None ,
3645 logs_processor : None ,
3746 log_port_number : DEFAULT_LOG_PORT_NUMBER ,
47+ telemetry_types : None ,
48+ telemetry_buffering : None ,
49+ telemetry_processor : None ,
50+ telemetry_port_number : DEFAULT_TELEMETRY_PORT_NUMBER ,
3851 }
3952 }
4053}
4154
42- impl < ' a > Default for Extension < ' a , Identity < LambdaEvent > , MakeIdentity < Vec < LambdaLog > > > {
55+ impl < ' a > Default
56+ for Extension < ' a , Identity < LambdaEvent > , MakeIdentity < Vec < LambdaLog > > , MakeIdentity < Vec < LambdaTelemetry > > >
57+ {
4358 fn default ( ) -> Self {
4459 Self :: new ( )
4560 }
4661}
4762
48- impl < ' a , E , L > Extension < ' a , E , L >
63+ impl < ' a , E , L , T > Extension < ' a , E , L , T >
4964where
5065 E : Service < LambdaEvent > ,
5166 E :: Future : Future < Output = Result < ( ) , E :: Error > > ,
5873 L :: Error : Into < Box < dyn std:: error:: Error + Send + Sync > > + fmt:: Debug ,
5974 L :: MakeError : Into < Box < dyn std:: error:: Error + Send + Sync > > + fmt:: Debug ,
6075 L :: Future : Send ,
76+
77+ // Fixme: 'static bound might be too restrictive
78+ T : MakeService < ( ) , Vec < LambdaTelemetry > , Response = ( ) > + Send + Sync + ' static ,
79+ T :: Service : Service < Vec < LambdaTelemetry > , Response = ( ) > + Send + Sync ,
80+ <T :: Service as Service < Vec < LambdaTelemetry > > >:: Future : Send + ' a ,
81+ T :: Error : Into < Box < dyn std:: error:: Error + Send + Sync > > + fmt:: Debug ,
82+ T :: MakeError : Into < Box < dyn std:: error:: Error + Send + Sync > > + fmt:: Debug ,
83+ T :: Future : Send ,
6184{
6285 /// Create a new [`Extension`] with a given extension name
6386 pub fn with_extension_name ( self , extension_name : & ' a str ) -> Self {
77100 }
78101
79102 /// Create a new [`Extension`] with a service that receives Lambda events.
80- pub fn with_events_processor < N > ( self , ep : N ) -> Extension < ' a , N , L >
103+ pub fn with_events_processor < N > ( self , ep : N ) -> Extension < ' a , N , L , T >
81104 where
82105 N : Service < LambdaEvent > ,
83106 N :: Future : Future < Output = Result < ( ) , N :: Error > > ,
@@ -91,11 +114,15 @@ where
91114 log_buffering : self . log_buffering ,
92115 logs_processor : self . logs_processor ,
93116 log_port_number : self . log_port_number ,
117+ telemetry_types : self . telemetry_types ,
118+ telemetry_buffering : self . telemetry_buffering ,
119+ telemetry_processor : self . telemetry_processor ,
120+ telemetry_port_number : self . telemetry_port_number ,
94121 }
95122 }
96123
97124 /// Create a new [`Extension`] with a service that receives Lambda logs.
98- pub fn with_logs_processor < N , NS > ( self , lp : N ) -> Extension < ' a , E , N >
125+ pub fn with_logs_processor < N , NS > ( self , lp : N ) -> Extension < ' a , E , N , T >
99126 where
100127 N : Service < ( ) > ,
101128 N :: Future : Future < Output = Result < NS , N :: Error > > ,
@@ -109,6 +136,10 @@ where
109136 log_types : self . log_types ,
110137 log_buffering : self . log_buffering ,
111138 log_port_number : self . log_port_number ,
139+ telemetry_types : self . telemetry_types ,
140+ telemetry_buffering : self . telemetry_buffering ,
141+ telemetry_processor : self . telemetry_processor ,
142+ telemetry_port_number : self . telemetry_port_number ,
112143 }
113144 }
114145
@@ -137,6 +168,53 @@ where
137168 }
138169 }
139170
171+ /// Create a new [`Extension`] with a service that receives Lambda telemetry data.
172+ pub fn with_telemetry_processor < N , NS > ( self , lp : N ) -> Extension < ' a , E , L , N >
173+ where
174+ N : Service < ( ) > ,
175+ N :: Future : Future < Output = Result < NS , N :: Error > > ,
176+ N :: Error : Into < Box < dyn std:: error:: Error + Send + Sync > > + fmt:: Display ,
177+ {
178+ Extension {
179+ telemetry_processor : Some ( lp) ,
180+ events_processor : self . events_processor ,
181+ extension_name : self . extension_name ,
182+ events : self . events ,
183+ log_types : self . log_types ,
184+ log_buffering : self . log_buffering ,
185+ logs_processor : self . logs_processor ,
186+ log_port_number : self . log_port_number ,
187+ telemetry_types : self . telemetry_types ,
188+ telemetry_buffering : self . telemetry_buffering ,
189+ telemetry_port_number : self . telemetry_port_number ,
190+ }
191+ }
192+
193+ /// Create a new [`Extension`] with a list of telemetry types to subscribe.
194+ /// The only accepted telemetry types are `function`, `platform`, and `extension`.
195+ pub fn with_telemetry_types ( self , telemetry_types : & ' a [ & ' a str ] ) -> Self {
196+ Extension {
197+ telemetry_types : Some ( telemetry_types) ,
198+ ..self
199+ }
200+ }
201+
202+ /// Create a new [`Extension`] with specific configuration to buffer telemetry.
203+ pub fn with_telemetry_buffering ( self , lb : LogBuffering ) -> Self {
204+ Extension {
205+ telemetry_buffering : Some ( lb) ,
206+ ..self
207+ }
208+ }
209+
210+ /// Create a new [`Extension`] with a different port number to listen to telemetry.
211+ pub fn with_telemetry_port_number ( self , port_number : u16 ) -> Self {
212+ Extension {
213+ telemetry_port_number : port_number,
214+ ..self
215+ }
216+ }
217+
140218 /// Execute the given extension
141219 pub async fn run ( self ) -> Result < ( ) , Error > {
142220 let client = & Client :: builder ( ) . build ( ) ?;
@@ -166,7 +244,8 @@ where
166244 trace ! ( "Log processor started" ) ;
167245
168246 // Call Logs API to start receiving events
169- let req = requests:: subscribe_logs_request (
247+ let req = requests:: subscribe_request (
248+ Api :: LogsApi ,
170249 extension_id,
171250 self . log_types ,
172251 self . log_buffering ,
@@ -179,6 +258,41 @@ where
179258 trace ! ( "Registered extension with Logs API" ) ;
180259 }
181260
261+ if let Some ( mut telemetry_processor) = self . telemetry_processor {
262+ trace ! ( "Telemetry processor found" ) ;
263+ // Spawn task to run processor
264+ let addr = SocketAddr :: from ( ( [ 0 , 0 , 0 , 0 ] , self . telemetry_port_number ) ) ;
265+ let make_service = service_fn ( move |_socket : & AddrStream | {
266+ trace ! ( "Creating new telemetry processor Service" ) ;
267+ let service = telemetry_processor. make_service ( ( ) ) ;
268+ async move {
269+ let service = Arc :: new ( Mutex :: new ( service. await ?) ) ;
270+ Ok :: < _ , T :: MakeError > ( service_fn ( move |req| telemetry_wrapper ( service. clone ( ) , req) ) )
271+ }
272+ } ) ;
273+ let server = Server :: bind ( & addr) . serve ( make_service) ;
274+ tokio:: spawn ( async move {
275+ if let Err ( e) = server. await {
276+ error ! ( "Error while running telemetry processor: {}" , e) ;
277+ }
278+ } ) ;
279+ trace ! ( "Telemetry processor started" ) ;
280+
281+ // Call Telemetry API to start receiving events
282+ let req = requests:: subscribe_request (
283+ Api :: TelemetryApi ,
284+ extension_id,
285+ self . telemetry_types ,
286+ self . telemetry_buffering ,
287+ self . telemetry_port_number ,
288+ ) ?;
289+ let res = client. call ( req) . await ?;
290+ if res. status ( ) != http:: StatusCode :: OK {
291+ return Err ( ExtensionError :: boxed ( "unable to initialize the telemetry api" ) ) ;
292+ }
293+ trace ! ( "Registered extension with Telemetry API" ) ;
294+ }
295+
182296 let incoming = async_stream:: stream! {
183297 loop {
184298 trace!( "Waiting for next event (incoming loop)" ) ;
0 commit comments