1414
1515package com .google .cloud .functions .invoker ;
1616
17+ import static java .nio .charset .StandardCharsets .UTF_8 ;
1718import static java .util .stream .Collectors .joining ;
1819import static java .util .stream .Collectors .toMap ;
1920
2425import com .google .gson .GsonBuilder ;
2526import com .google .gson .TypeAdapter ;
2627import io .cloudevents .CloudEvent ;
27- import io .cloudevents .format .builder .HeadersStep ;
28- import io .cloudevents .v1 .AttributesImpl ;
29- import io .cloudevents .v1 .http .Unmarshallers ;
28+ import io .cloudevents .core .message .MessageReader ;
29+ import io .cloudevents .core .message .impl .GenericStructuredMessageReader ;
30+ import io .cloudevents .core .message .impl .MessageUtils ;
31+ import io .cloudevents .core .message .impl .UnknownEncodingMessageReader ;
3032import java .io .BufferedReader ;
3133import java .io .IOException ;
3234import java .lang .reflect .Type ;
3335import java .time .ZonedDateTime ;
3436import java .time .format .DateTimeFormatter ;
3537import java .util .Arrays ;
3638import java .util .Collections ;
39+ import java .util .List ;
3740import java .util .Map ;
3841import java .util .Optional ;
3942import java .util .logging .Level ;
@@ -78,8 +81,10 @@ public static NewBackgroundFunctionExecutor forClass(Class<?> functionClass) {
7881 executor = new RawFunctionExecutor ((RawBackgroundFunction ) instance );
7982 } else {
8083 BackgroundFunction <?> backgroundFunction = (BackgroundFunction <?>) instance ;
81- Optional <Type > maybeTargetType =
82- backgroundFunctionTypeArgument (backgroundFunction .getClass ());
84+ @ SuppressWarnings ("unchecked" )
85+ Class <? extends BackgroundFunction <?>> c =
86+ (Class <? extends BackgroundFunction <?>>) backgroundFunction .getClass ();
87+ Optional <Type > maybeTargetType = backgroundFunctionTypeArgument (c );
8388 if (!maybeTargetType .isPresent ()) {
8489 // This is probably because the user implemented just BackgroundFunction rather than
8590 // BackgroundFunction<T>.
@@ -99,7 +104,7 @@ public static NewBackgroundFunctionExecutor forClass(Class<?> functionClass) {
99104 * {@code T} can't be determined.
100105 */
101106 static Optional <Type > backgroundFunctionTypeArgument (
102- Class <? extends BackgroundFunction > functionClass ) {
107+ Class <? extends BackgroundFunction <?> > functionClass ) {
103108 // If this is BackgroundFunction<Foo> then the user must have implemented a method
104109 // accept(Foo, Context), so we look for that method and return the type of its first argument.
105110 // We must be careful because the compiler will also have added a synthetic method
@@ -126,35 +131,24 @@ private static Event parseLegacyEvent(HttpServletRequest req) throws IOException
126131 }
127132 }
128133
129- private static Context contextFromCloudEvent (CloudEvent <AttributesImpl , ?> cloudEvent ) {
130- AttributesImpl attributes = cloudEvent .getAttributes ();
131- ZonedDateTime timestamp = attributes .getTime ().orElse (ZonedDateTime .now ());
134+ private static Context contextFromCloudEvent (CloudEvent cloudEvent ) {
135+ ZonedDateTime timestamp = Optional .ofNullable (cloudEvent .getTime ()).orElse (ZonedDateTime .now ());
132136 String timestampString = DateTimeFormatter .ISO_INSTANT .format (timestamp );
133137 // We don't have an obvious replacement for the Context.resource field, which with legacy events
134138 // corresponded to a value present for some proprietary Google event types.
135139 String resource = "{}" ;
136- Map <String , String > attributesMap = AttributesImpl .marshal (attributes );
140+ Map <String , String > attributesMap =
141+ cloudEvent .getAttributeNames ().stream ()
142+ .collect (toMap (a -> a , a -> String .valueOf (cloudEvent .getAttribute (a ))));
137143 return CloudFunctionsContext .builder ()
138- .setEventId (attributes .getId ())
139- .setEventType (attributes .getType ())
144+ .setEventId (cloudEvent .getId ())
145+ .setEventType (cloudEvent .getType ())
140146 .setResource (resource )
141147 .setTimestamp (timestampString )
142148 .setAttributes (attributesMap )
143149 .build ();
144150 }
145151
146- /**
147- * Convert the HTTP headers from the given request into a Map. The headers of interest are
148- * the CE-* headers defined for CloudEvents in the binary encoding (where the metadata is in
149- * the HTTP headers and the payload is the HTTP body), plus Content-Type. In both cases we don't
150- * need to worry about repeated headers, so {@link HttpServletRequest#getHeader(String)} is fine.
151- */
152- private static Map <String , Object > httpHeaderMap (HttpServletRequest req ) {
153- return Collections .list (req .getHeaderNames ())
154- .stream ()
155- .collect (toMap (header -> header , req ::getHeader ));
156- }
157-
158152 /**
159153 * A background function, either "raw" or "typed". A raw background function is one where the user
160154 * code receives a String parameter that is the JSON payload of the triggering event. A typed
@@ -187,10 +181,7 @@ final ClassLoader functionClassLoader() {
187181 abstract void serviceLegacyEvent (HttpServletRequest req )
188182 throws Exception ;
189183
190- abstract void serviceCloudEvent (
191- HttpServletRequest req ,
192- HeadersStep <AttributesImpl , CloudEventDataT , String > unmarshaller )
193- throws Exception ;
184+ abstract void serviceCloudEvent (CloudEvent cloudEvent ) throws Exception ;
194185
195186 abstract Class <CloudEventDataT > cloudEventDataType ();
196187 }
@@ -210,18 +201,9 @@ void serviceLegacyEvent(HttpServletRequest req) throws Exception {
210201 }
211202
212203 @ Override
213- void serviceCloudEvent (
214- HttpServletRequest req , HeadersStep <AttributesImpl , Map <?, ?>, String > unmarshaller )
215- throws Exception {
216- Map <String , Object > httpHeaders = httpHeaderMap (req );
217- String body = req .getReader ().lines ().collect (joining ("\n " ));
218- CloudEvent <AttributesImpl , Map <?, ?>> cloudEvent =
219- unmarshaller
220- .withHeaders (() -> httpHeaders )
221- .withPayload (() -> body )
222- .unmarshal ();
204+ void serviceCloudEvent (CloudEvent cloudEvent ) throws Exception {
223205 Context context = contextFromCloudEvent (cloudEvent );
224- String jsonData = cloudEvent .getData (). map ( data -> new Gson (). toJson ( data )). orElse ( "{}" );
206+ String jsonData = cloudEvent .getData () == null ? "{}" : new String ( cloudEvent . getData (), UTF_8 );
225207 function .accept (jsonData , context );
226208 }
227209
@@ -259,18 +241,12 @@ void serviceLegacyEvent(HttpServletRequest req) throws Exception {
259241 }
260242
261243 @ Override
262- void serviceCloudEvent (
263- HttpServletRequest req , HeadersStep <AttributesImpl , T , String > unmarshaller )
264- throws Exception {
265- Map <String , Object > httpHeaders = httpHeaderMap (req );
266- String body = req .getReader ().lines ().collect (joining ("\n " ));
267- CloudEvent <AttributesImpl , T > cloudEvent =
268- unmarshaller
269- .withHeaders (() -> httpHeaders )
270- .withPayload (() -> body ).unmarshal ();
271- if (cloudEvent .getData ().isPresent ()) {
244+ void serviceCloudEvent (CloudEvent cloudEvent ) throws Exception {
245+ if (cloudEvent .getData () != null ) {
246+ String data = new String (cloudEvent .getData (), UTF_8 );
247+ T payload = new Gson ().fromJson (data , type );
272248 Context context = contextFromCloudEvent (cloudEvent );
273- function .accept (cloudEvent . getData (). get () , context );
249+ function .accept (payload , context );
274250 } else {
275251 throw new IllegalStateException ("Event has no \" data\" component" );
276252 }
@@ -296,10 +272,9 @@ public void service(HttpServletRequest req, HttpServletResponse res) throws IOEx
296272 ClassLoader oldContextLoader = Thread .currentThread ().getContextClassLoader ();
297273 try {
298274 Thread .currentThread ().setContextClassLoader (functionExecutor .functionClassLoader ());
299- if (contentType != null && contentType .startsWith ("application/cloudevents+json" )) {
300- serviceCloudEvent (req , CloudEventKind .STRUCTURED );
301- } else if (req .getHeader ("ce-specversion" ) != null ) {
302- serviceCloudEvent (req , CloudEventKind .BINARY );
275+ if ((contentType != null && contentType .startsWith ("application/cloudevents+json" ))
276+ || req .getHeader ("ce-specversion" ) != null ) {
277+ serviceCloudEvent (req );
303278 } else {
304279 serviceLegacyEvent (req );
305280 }
@@ -320,23 +295,19 @@ private enum CloudEventKind {BINARY, STRUCTURED}
320295 * @param <CloudEventT> a fake type parameter, which corresponds to the type parameter of
321296 * {@link FunctionExecutor}.
322297 */
323- private <CloudEventT > void serviceCloudEvent (
324- HttpServletRequest req , CloudEventKind kind ) throws Exception {
298+ private <CloudEventT > void serviceCloudEvent (HttpServletRequest req ) throws Exception {
325299 @ SuppressWarnings ("unchecked" )
326300 FunctionExecutor <CloudEventT > executor = (FunctionExecutor <CloudEventT >) functionExecutor ;
327- Class <CloudEventT > cloudEventDataType = executor .cloudEventDataType ();
328- HeadersStep <AttributesImpl , CloudEventT , String > unmarshaller ;
329- switch (kind ) {
330- case BINARY :
331- unmarshaller = Unmarshallers .binary (cloudEventDataType );
332- break ;
333- case STRUCTURED :
334- unmarshaller = Unmarshallers .structured (cloudEventDataType );
335- break ;
336- default :
337- throw new AssertionError (kind );
338- }
339- executor .serviceCloudEvent (req , unmarshaller );
301+ Map <String , List <String >> headers = CloudEventsServletBinaryMessageReader .headerMap (req );
302+ byte [] body = req .getInputStream ().readAllBytes ();
303+ List <String > listOfNull = Collections .singletonList (null );
304+ MessageReader reader = MessageUtils .parseStructuredOrBinaryMessage (
305+ () -> headers .getOrDefault ("content-type" , listOfNull ).get (0 ),
306+ format -> new GenericStructuredMessageReader (format , body ),
307+ () -> headers .getOrDefault ("ce-specversion" , listOfNull ).get (0 ),
308+ unusedSpecVersion -> CloudEventsServletBinaryMessageReader .from (req , body ),
309+ UnknownEncodingMessageReader ::new );
310+ executor .serviceCloudEvent (reader .toEvent ());
340311 }
341312
342313 private void serviceLegacyEvent (HttpServletRequest req ) throws Exception {
0 commit comments