@@ -2,24 +2,55 @@ const express = require('express');
22const { HTTP } = require ( "cloudevents" ) ;
33const jsonata = require ( 'jsonata' ) ;
44const fs = require ( 'node:fs' ) ;
5+ const fsPromises = require ( 'node:fs' ) . promises ;
6+ const { buffer} = require ( 'node:stream/consumers' ) ;
57
68const port = process . env . PORT = process . env . PORT || 8080 ;
79const k_sink = process . env . K_SINK || undefined ;
810const jsonata_transform_file_name = process . env . JSONATA_TRANSFORM_FILE_NAME || undefined ;
911
12+ // Allow transforming the response received by the endpoint defined by K_SINK
13+ const jsonata_response_transform_file_name = process . env . JSONATA_RESPONSE_TRANSFORM_FILE_NAME || undefined ;
14+
15+ const jsonata_discard_response_body = process . env . JSONATA_DISCARD_RESPONSE_BODY === "true" || false ;
16+
17+ const oidc_token_file = process . env . OIDC_TOKEN_FILE || undefined
18+ if ( oidc_token_file && ! fs . existsSync ( oidc_token_file ) ) {
19+ console . info ( `${ oidc_token_file } file doesn't exist, token will not be forwarded to K_SINK endpoint (if specified)` ) ;
20+ } else if ( oidc_token_file ) {
21+ console . info ( `${ oidc_token_file } file exist, token will be forwarded to K_SINK endpoint (if specified)` ) ;
22+ }
23+
1024if ( ! jsonata_transform_file_name ) {
1125 throw new Error ( "undefined JSONATA_TRANSFORM_FILE_NAME env variable" ) ;
1226}
27+ if ( ! k_sink && jsonata_response_transform_file_name ) {
28+ throw new Error ( "undefined K_SINK env variable with defined JSONATA_RESPONSE_TRANSFORM_FILE_NAME" ) ;
29+ }
30+
31+ if ( k_sink ) {
32+ console . info ( "K_SINK is specified, transformations will be sent to that endpoint" )
33+ }
1334
1435let jsonata_transform = null
36+ let jsonata_response_transform = null
1537
1638try {
17- const jsonata_transform_file_content = fs . readFileSync ( jsonata_transform_file_name , " utf-8" )
39+ const jsonata_transform_file_content = fs . readFileSync ( jsonata_transform_file_name , { encoding : ' utf-8' } )
1840 jsonata_transform = jsonata ( jsonata_transform_file_content ) ;
1941} catch ( error ) {
2042 throw new Error ( `Failed to parse Jsonata transform file in ${ jsonata_transform_file_name } : ${ error } ` ) ;
2143}
2244
45+ if ( jsonata_response_transform_file_name ) {
46+ try {
47+ const jsonata_response_transform_file_content = fs . readFileSync ( jsonata_response_transform_file_name , { encoding : 'utf-8' } ) ;
48+ jsonata_response_transform = jsonata ( jsonata_response_transform_file_content ) ;
49+ } catch ( error ) {
50+ throw new Error ( `Failed to parse Jsonata response transform file in ${ jsonata_response_transform_file_name } : ${ error } ` ) ;
51+ }
52+ }
53+
2354function logDebug ( ...inputs ) {
2455 if ( process . env . NODE_ENV === "development" ) {
2556 console . debug ( ...inputs ) ;
@@ -58,43 +89,102 @@ app.post("/", async (req, res) => {
5889 }
5990 input = JSON . parse ( HTTP . structured ( ceInput ) . body )
6091 } catch ( error ) {
61- logDebug ( `Failed to deserialize CloudEvent, falling back to raw body` , JSON . stringify ( req . rawBody ) , error )
92+ logDebug ( `Failed to deserialize CloudEvent, falling back to raw body` , JSON . stringify ( req . rawBody , null , 2 ) , error )
6293 input = req . rawBody
6394 }
6495
65- logDebug ( "input " , JSON . stringify ( input ) ) ;
96+ logDebug ( "Input " , JSON . stringify ( input ) ) ;
6697
6798 const transformed = await jsonata_transform . evaluate ( input )
68- if ( k_sink ) {
69- logDebug ( `K_SINK is set, sending event to it ${ k_sink } ` )
99+ const transformed_content_type = guessTransformedContentType ( transformed )
70100
71- try {
72- const response = await fetch ( k_sink , {
73- method : "POST" ,
74- headers : {
75- "Content-Type" : "application/json" ,
76- } ,
77- body : JSON . stringify ( transformed ) ,
78- } )
79- logDebug ( `K_SINK received response ${ response . status } ` )
101+ logDebug ( "Transformed input" , JSON . stringify ( transformed , null , 2 ) )
80102
81- return res
82- . status ( response . status )
83- . send ( )
103+ if ( ! k_sink ) {
104+ return res
105+ . header ( "Content-Type" , transformed_content_type )
106+ . status ( 200 )
107+ . send ( JSON . stringify ( transformed ) ) ;
108+ }
109+
110+ logDebug ( `K_SINK is set, sending event to it ${ k_sink } ` )
111+
112+ const k_sink_request_headers = {
113+ "Content-Type" : transformed_content_type
114+ }
115+ if ( oidc_token_file ) {
116+ const token = await fsPromises . readFile ( oidc_token_file , { encoding : 'utf-8' } )
117+ if ( token && token . length > 0 ) {
118+ k_sink_request_headers . Authorization = `Bearer ${ token } `
119+ }
120+ }
121+
122+ const response = await fetch ( k_sink , {
123+ method : "POST" ,
124+ headers : k_sink_request_headers ,
125+ body : JSON . stringify ( transformed ) ,
126+ redirect : 'error' ,
127+ } )
128+
129+ if ( jsonata_discard_response_body ) {
130+ logDebug ( `Received response from K_SINK, discarding response body and responding with ${ response . status } ` )
131+
132+ return res
133+ . status ( response . status )
134+ . send ( )
135+ }
136+
137+ if ( ! jsonata_response_transform_file_name ) {
138+ logDebug ( `Received response from K_SINK (status: ${ response . status } ), propagating response body as response` )
139+
140+ const content_type = response . headers [ "Content-Type" ]
141+ if ( content_type && content_type . length > 0 ) {
142+ res . setHeader ( 'Content-Type' , content_type )
143+ }
144+
145+ return response . body . pipeTo ( new WritableStream ( {
146+ write ( chunk ) {
147+ res . write ( chunk )
148+ } ,
149+ close ( ) {
150+ res . end ( )
151+ } ,
152+ } ) )
153+ }
154+
155+ logDebug ( `Received response from K_SINK ${ response . status } , transforming response body with transformation in ${ jsonata_response_transform_file_name } ` )
156+
157+ const response_buf = await buffer ( response . body )
158+
159+ try {
160+ const response_headers = { }
161+ response . headers . forEach ( ( value , key ) => {
162+ if ( key in response_headers ) {
163+ response_headers [ key ] . push ( value )
164+ return
165+ }
166+ response_headers [ key ] = [ value ]
167+ } )
168+ const ce_input = HTTP . toEvent ( { headers : response_headers , body : response_buf } ) ;
169+ input = JSON . parse ( HTTP . structured ( ce_input ) . body )
170+ } catch ( error ) {
171+ const body = response_buf . toString ( 'utf-8' )
172+ try {
173+ input = JSON . parse ( body )
84174 } catch ( error ) {
85- return res
86- . header ( "Reason" , error . toString ( ) )
87- . status ( 502 )
88- . send ( )
175+ input = body
89176 }
90177 }
91178
92- logDebug ( "Transformed input" , JSON . stringify ( transformed , null , 2 ) )
179+ logDebug ( `Transforming response body with transformation in ${ jsonata_response_transform_file_name } , using input` , JSON . stringify ( input , null , 2 ) )
180+
181+ const transformed_response = await jsonata_response_transform . evaluate ( input )
182+ const transformed_response_content_type = guessTransformedContentType ( transformed_response )
93183
94184 return res
95- . header ( "Content-Type" , "application/json" )
96- . status ( 200 )
97- . send ( transformed ) ;
185+ . header ( "Content-Type" , transformed_response_content_type )
186+ . status ( response . status )
187+ . send ( JSON . stringify ( transformed_response ) )
98188
99189 } catch ( error ) {
100190 console . error ( error ) ;
@@ -105,6 +195,21 @@ app.post("/", async (req, res) => {
105195 }
106196} ) ;
107197
198+ // guessTransformedContentType tries to guess the transformed event content type.
199+ // 1. If the transformed event contains a special "contentype" field, it returns it.
200+ // 2. Otherwise, it tries to find CloudEvents "specversion" attribute and, if it's present, returns
201+ // the CloudEvent structured content type "application/cloudevents+json".
202+ // 3. Lastly, it falls back to "application/json" if none of the above are specified.
203+ function guessTransformedContentType ( transformed ) {
204+ if ( "contenttype" in transformed && transformed [ 'contenttype' ] ) {
205+ return transformed [ 'contenttype' ] . toString ( ) ;
206+ }
207+ if ( "specversion" in transformed ) {
208+ return "application/cloudevents+json"
209+ }
210+ return "application/json" ;
211+ }
212+
108213app . get ( '/healthz' , ( req , res ) => {
109214 res . status ( 200 ) . send ( 'OK' ) ;
110215} ) ;
0 commit comments