11package com .google .cloud .functions .invoker ;
22
3+ import static java .util .stream .Collectors .joining ;
4+ import static java .util .stream .Collectors .toMap ;
5+
36import com .google .cloud .functions .BackgroundFunction ;
47import com .google .cloud .functions .Context ;
58import com .google .cloud .functions .RawBackgroundFunction ;
69import com .google .gson .Gson ;
710import com .google .gson .GsonBuilder ;
8- import com .google .gson .JsonParseException ;
911import com .google .gson .TypeAdapter ;
12+ import io .cloudevents .CloudEvent ;
13+ import io .cloudevents .format .builder .HeadersStep ;
14+ import io .cloudevents .v1 .AttributesImpl ;
15+ import io .cloudevents .v1 .http .Unmarshallers ;
1016import java .io .BufferedReader ;
1117import java .io .IOException ;
1218import java .lang .reflect .Type ;
19+ import java .time .ZonedDateTime ;
20+ import java .time .format .DateTimeFormatter ;
1321import java .util .Arrays ;
22+ import java .util .Collections ;
23+ import java .util .Map ;
1424import java .util .Optional ;
1525import java .util .logging .Level ;
1626import java .util .logging .Logger ;
1929import javax .servlet .http .HttpServletResponse ;
2030
2131/** Executes the user's background function. */
22- public class NewBackgroundFunctionExecutor extends HttpServlet {
32+ public final class NewBackgroundFunctionExecutor extends HttpServlet {
2333 private static final Logger logger = Logger .getLogger ("com.google.cloud.functions.invoker" );
2434
25- private final RawBackgroundFunction function ;
35+ private final FunctionExecutor <?> functionExecutor ;
2636
27- private NewBackgroundFunctionExecutor (RawBackgroundFunction function ) {
28- this .function = function ;
37+ private NewBackgroundFunctionExecutor (FunctionExecutor <?> functionExecutor ) {
38+ this .functionExecutor = functionExecutor ;
2939 }
3040
3141 /**
@@ -58,24 +68,24 @@ public static Optional<NewBackgroundFunctionExecutor> forTarget(
5868 } catch (ReflectiveOperationException e ) {
5969 throw new RuntimeException ("Could not construct an instance of " + target + ": " + e , e );
6070 }
61- RawBackgroundFunction function =
62- (instance instanceof RawBackgroundFunction )
63- ? ( RawBackgroundFunction ) instance
64- : asRaw (( BackgroundFunction <?>) instance );
65- return Optional . of ( new NewBackgroundFunctionExecutor ( function )) ;
66- }
67-
68- private static < T > RawBackgroundFunction asRaw ( BackgroundFunction < T > backgroundFunction ) {
69- Optional < Type > maybeTargetType = backgroundFunctionTypeArgument ( backgroundFunction . getClass ());
70- if (! maybeTargetType . isPresent ()) {
71- // This is probably because the user implemented just BackgroundFunction rather than
72- // BackgroundFunction<T>.
73- throw new RuntimeException (
74- "Could not determine the payload type for BackgroundFunction of type "
75- + backgroundFunction . getClass (). getName ()
76- + "; must implement BackgroundFunction<T> for some T" );
71+ FunctionExecutor <?> executor ;
72+ if (instance instanceof RawBackgroundFunction ) {
73+ executor = new RawFunctionExecutor (( RawBackgroundFunction ) instance );
74+ } else {
75+ BackgroundFunction <?> backgroundFunction = ( BackgroundFunction <?>) instance ;
76+ Optional < Type > maybeTargetType =
77+ backgroundFunctionTypeArgument ( backgroundFunction . getClass ());
78+ if (! maybeTargetType . isPresent () ) {
79+ // This is probably because the user implemented just BackgroundFunction rather than
80+ // BackgroundFunction<T>.
81+ throw new RuntimeException (
82+ "Could not determine the payload type for BackgroundFunction of type "
83+ + instance . getClass (). getName ()
84+ + "; must implement BackgroundFunction<T> for some T" );
85+ }
86+ executor = new TypedFunctionExecutor <>( maybeTargetType . get (), backgroundFunction );
7787 }
78- return new AsRaw < T >( maybeTargetType . get (), backgroundFunction );
88+ return Optional . of ( new NewBackgroundFunctionExecutor ( executor ) );
7989 }
8090
8191 /**
@@ -97,55 +107,224 @@ static Optional<Type> backgroundFunctionTypeArgument(
97107 .findFirst ();
98108 }
99109
110+ private static Event parseLegacyEvent (HttpServletRequest req ) throws IOException {
111+ try (BufferedReader bodyReader = req .getReader ()) {
112+ // A Type Adapter is required to set the type of the JsonObject because CloudFunctionsContext
113+ // is abstract and Gson default behavior instantiates the type provided.
114+ TypeAdapter <CloudFunctionsContext > typeAdapter =
115+ CloudFunctionsContext .typeAdapter (new Gson ());
116+ Gson gson = new GsonBuilder ()
117+ .registerTypeAdapter (CloudFunctionsContext .class , typeAdapter )
118+ .registerTypeAdapter (Event .class , new Event .EventDeserializer ())
119+ .create ();
120+ return gson .fromJson (bodyReader , Event .class );
121+ }
122+ }
123+
124+ private static Context contextFromCloudEvent (CloudEvent <AttributesImpl , ?> cloudEvent ) {
125+ AttributesImpl attributes = cloudEvent .getAttributes ();
126+ ZonedDateTime timestamp = attributes .getTime ().orElse (ZonedDateTime .now ());
127+ String timestampString = DateTimeFormatter .ISO_INSTANT .format (timestamp );
128+ // We don't have an obvious replacement for the Context.resource field, which with legacy events
129+ // corresponded to a value present for some proprietary Google event types.
130+ String resource = "{}" ;
131+ return CloudFunctionsContext .builder ()
132+ .setEventId (attributes .getId ())
133+ .setEventType (attributes .getType ())
134+ .setResource (resource )
135+ .setTimestamp (timestampString )
136+ .build ();
137+ }
138+
100139 /**
101- * Wraps a typed {@link BackgroundFunction} as a {@link RawBackgroundFunction} that takes its
102- * input JSON string and deserializes it into the payload type of the {@link BackgroundFunction}/
140+ * Convert the HTTP headers from the given request into a Map. The headers of interest are
141+ * the CE-* headers defined for CloudEvents in the binary encoding (where the metadata is in
142+ * the HTTP headers and the payload is the HTTP body), plus Content-Type. In both cases we don't
143+ * need to worry about repeated headers, so {@link HttpServletRequest#getHeader(String)} is fine.
103144 */
104- private static class AsRaw <T > implements RawBackgroundFunction {
105- private final Gson gson = new Gson ();
106- private final Type targetType ;
107- private final BackgroundFunction <T > backgroundFunction ;
145+ private static Map <String , Object > httpHeaderMap (HttpServletRequest req ) {
146+ return Collections .list (req .getHeaderNames ())
147+ .stream ()
148+ .collect (toMap (header -> header , req ::getHeader ));
149+ }
108150
109- private AsRaw (Type targetType , BackgroundFunction <T > backgroundFunction ) {
110- this .targetType = targetType ;
111- this .backgroundFunction = backgroundFunction ;
151+ /**
152+ * A background function, either "raw" or "typed". A raw background function is one where the user
153+ * code receives a String parameter that is the JSON payload of the triggering event. A typed
154+ * background function is one where the payload is deserialized into a user-provided class whose
155+ * field names correspond to the keys of the JSON object.
156+ *
157+ * <p>In addition to these two flavours, events can be either "legacy events" or "CloudEvents".
158+ * Legacy events are the only kind that GCF originally supported, and use proprietary encodings
159+ * for the various triggers. CloudEvents are ones that follow the standards defined by
160+ * <a href="https://cloudevents.io">cloudevents.io</a>.
161+ *
162+ * @param <CloudEventDataT> the type to be used in the {@link Unmarshallers} call when
163+ * unmarshalling this event, if it is a CloudEvent.
164+ */
165+ private abstract static class FunctionExecutor <CloudEventDataT > {
166+ private final String functionName ;
167+
168+ FunctionExecutor (String functionName ) {
169+ this .functionName = functionName ;
170+ }
171+
172+ final String functionName () {
173+ return functionName ;
174+ }
175+
176+ abstract void serviceLegacyEvent (HttpServletRequest req )
177+ throws IOException ;
178+
179+ abstract void serviceCloudEvent (
180+ HttpServletRequest req ,
181+ HeadersStep <AttributesImpl , CloudEventDataT , String > unmarshaller )
182+ throws IOException ;
183+
184+ abstract Class <CloudEventDataT > cloudEventDataType ();
185+ }
186+
187+ private static class RawFunctionExecutor extends FunctionExecutor <Map <?, ?>> {
188+ private final RawBackgroundFunction function ;
189+
190+ RawFunctionExecutor (RawBackgroundFunction function ) {
191+ super (function .getClass ().getCanonicalName ());
192+ this .function = function ;
112193 }
113194
114195 @ Override
115- public void accept (String json , Context context ) {
116- T payload ;
117- try {
118- payload = gson .fromJson (json , targetType );
119- } catch (JsonParseException e ) {
120- logger .log (Level .WARNING ,
121- "Could not convert payload to target type " + targetType .getTypeName (), e );
122- return ;
196+ void serviceLegacyEvent (HttpServletRequest req ) throws IOException {
197+ Event event = parseLegacyEvent (req );
198+ function .accept (new Gson ().toJson (event .getData ()), event .getContext ());
199+ }
200+
201+ @ Override
202+ void serviceCloudEvent (
203+ HttpServletRequest req , HeadersStep <AttributesImpl , Map <?, ?>, String > unmarshaller )
204+ throws IOException {
205+ Map <String , Object > httpHeaders = httpHeaderMap (req );
206+ String body = req .getReader ().lines ().collect (joining ("\n " ));
207+ CloudEvent <AttributesImpl , Map <?, ?>> cloudEvent =
208+ unmarshaller
209+ .withHeaders (() -> httpHeaders )
210+ .withPayload (() -> body )
211+ .unmarshal ();
212+ Context context = contextFromCloudEvent (cloudEvent );
213+ String jsonData = cloudEvent .getData ().map (data -> new Gson ().toJson (data )).orElse ("{}" );
214+ function .accept (jsonData , context );
215+ }
216+
217+ @ Override
218+ Class <Map <?, ?>> cloudEventDataType () {
219+ // This messing about with casts and @SuppressWarnings allows us to limit the use of the raw
220+ // Map type to just here.
221+ @ SuppressWarnings ("unchecked" )
222+ Class <Map <?, ?>> c = (Class <Map <?, ?>>) (Class <?>) Map .class ;
223+ return c ;
224+ }
225+ }
226+
227+ private static class TypedFunctionExecutor <T > extends FunctionExecutor <T > {
228+ private final Type type ; // T
229+ private final BackgroundFunction <T > function ;
230+
231+ private TypedFunctionExecutor (Type type , BackgroundFunction <T > function ) {
232+ super (function .getClass ().getCanonicalName ());
233+ this .type = type ;
234+ this .function = function ;
235+ }
236+
237+ static <T > TypedFunctionExecutor <T > of (Type type , BackgroundFunction <?> instance ) {
238+ @ SuppressWarnings ("unchecked" )
239+ BackgroundFunction <T > function = (BackgroundFunction <T >) instance ;
240+ return new TypedFunctionExecutor <>(type , function );
241+ }
242+
243+ @ Override
244+ void serviceLegacyEvent (HttpServletRequest req ) throws IOException {
245+ Event event = parseLegacyEvent (req );
246+ T payload = new Gson ().fromJson (event .getData (), type );
247+ function .accept (payload , event .getContext ());
248+ }
249+
250+ @ Override
251+ void serviceCloudEvent (
252+ HttpServletRequest req , HeadersStep <AttributesImpl , T , String > unmarshaller )
253+ throws IOException {
254+ Map <String , Object > httpHeaders = httpHeaderMap (req );
255+ String body = req .getReader ().lines ().collect (joining ("\n " ));
256+ CloudEvent <AttributesImpl , T > cloudEvent =
257+ unmarshaller
258+ .withHeaders (() -> httpHeaders )
259+ .withPayload (() -> body ).unmarshal ();
260+ if (cloudEvent .getData ().isPresent ()) {
261+ Context context = contextFromCloudEvent (cloudEvent );
262+ function .accept (cloudEvent .getData ().get (), context );
263+ } else {
264+ throw new IllegalStateException ("Event has no \" data\" component" );
123265 }
124- backgroundFunction .accept (payload , context );
266+ }
267+
268+ @ Override
269+ Class <T > cloudEventDataType () {
270+ if (!(type instanceof Class <?>)) {
271+ throw new IllegalStateException (
272+ "CloudEvents SDK currently does not permit deserializing types other than classes:"
273+ + " cannot deserialize " + type );
274+ }
275+ @ SuppressWarnings ("unchecked" )
276+ Class <T > c = (Class <T >) type ;
277+ return c ;
125278 }
126279 }
127280
128- /** Executes the user's background function, can handle all HTTP type methods. */
281+ /** Executes the user's background function. This can handle all HTTP methods. */
129282 @ Override
130283 public void service (HttpServletRequest req , HttpServletResponse res ) throws IOException {
131- BufferedReader body = req .getReader ();
132-
133- // A Type Adapter is required to set the type of the JsonObject because CloudFunctionsContext
134- // is abstract and Gson default behavior instantiates the type provided.
135- TypeAdapter <CloudFunctionsContext > typeAdapter =
136- CloudFunctionsContext .typeAdapter (new Gson ());
137- Gson gson = new GsonBuilder ()
138- .registerTypeAdapter (CloudFunctionsContext .class , typeAdapter )
139- .registerTypeAdapter (Event .class , new Event .EventDeserializer ())
140- .create ();
141-
142- Event event = gson .fromJson (body , Event .class );
284+ String contentType = req .getContentType ();
143285 try {
144- function .accept (gson .toJson (event .getData ()), event .getContext ());
286+ if (contentType != null && contentType .startsWith ("application/cloudevents+json" )) {
287+ serviceCloudEvent (req , CloudEventKind .STRUCTURED );
288+ } else if (req .getHeader ("ce-specversion" ) != null ) {
289+ serviceCloudEvent (req , CloudEventKind .BINARY );
290+ } else {
291+ serviceLegacyEvent (req );
292+ }
145293 res .setStatus (HttpServletResponse .SC_OK );
146294 } catch (Throwable t ) {
147295 res .setStatus (HttpServletResponse .SC_INTERNAL_SERVER_ERROR );
148- logger .log (Level .WARNING , "Failed to execute " + function .getClass ().getName (), t );
296+ logger .log (Level .WARNING , "Failed to execute " + functionExecutor .functionName (), t );
297+ }
298+ }
299+
300+ private enum CloudEventKind {BINARY , STRUCTURED }
301+
302+ /**
303+ * Service a CloudEvent.
304+ *
305+ * @param <CloudEventT> a fake type parameter, which corresponds to the type parameter of
306+ * {@link FunctionExecutor}.
307+ */
308+ private <CloudEventT > void serviceCloudEvent (
309+ HttpServletRequest req , CloudEventKind kind ) throws IOException {
310+ @ SuppressWarnings ("unchecked" )
311+ FunctionExecutor <CloudEventT > executor = (FunctionExecutor <CloudEventT >) functionExecutor ;
312+ Class <CloudEventT > cloudEventDataType = executor .cloudEventDataType ();
313+ HeadersStep <AttributesImpl , CloudEventT , String > unmarshaller ;
314+ switch (kind ) {
315+ case BINARY :
316+ unmarshaller = Unmarshallers .binary (cloudEventDataType );
317+ break ;
318+ case STRUCTURED :
319+ unmarshaller = Unmarshallers .structured (cloudEventDataType );
320+ break ;
321+ default :
322+ throw new AssertionError (kind );
149323 }
324+ executor .serviceCloudEvent (req , unmarshaller );
325+ }
326+
327+ private void serviceLegacyEvent (HttpServletRequest req ) throws IOException {
328+ functionExecutor .serviceLegacyEvent (req );
150329 }
151330}
0 commit comments