11package com .networknt .aws .lambda .handler .middleware .proxy ;
22
3+ import software .amazon .awssdk .core .retry .RetryPolicy ;
4+ import software .amazon .awssdk .core .retry .conditions .RetryCondition ;
35import com .amazonaws .services .lambda .runtime .events .APIGatewayProxyResponseEvent ;
46import com .networknt .aws .lambda .LightLambdaExchange ;
57import com .networknt .aws .lambda .handler .Handler ;
1618import org .slf4j .Logger ;
1719import org .slf4j .LoggerFactory ;
1820import software .amazon .awssdk .core .SdkBytes ;
21+ import software .amazon .awssdk .core .client .config .ClientOverrideConfiguration ;
22+ import software .amazon .awssdk .http .async .SdkAsyncHttpClient ;
23+ import software .amazon .awssdk .http .nio .netty .NettyNioAsyncHttpClient ;
1924import software .amazon .awssdk .regions .Region ;
20- import software .amazon .awssdk .services .lambda .LambdaClient ;
25+ import software .amazon .awssdk .services .lambda .LambdaAsyncClient ;
2126import software .amazon .awssdk .services .lambda .model .InvokeRequest ;
22- import software .amazon .awssdk .services .lambda .model .LambdaException ;
2327
2428import java .net .URI ;
29+ import java .time .Duration ;
2530import java .util .HashMap ;
2631import java .util .Map ;
32+ import java .util .concurrent .CompletableFuture ;
33+ import java .util .concurrent .ExecutionException ;
2734
2835public class LambdaProxyMiddleware implements MiddlewareHandler {
29- private static LambdaClient client ;
36+ private static LambdaAsyncClient client ;
3037 private static final Logger LOG = LoggerFactory .getLogger (LambdaProxyMiddleware .class );
3138 private static AbstractMetricsMiddleware metricsMiddleware ;
3239
@@ -38,12 +45,24 @@ public class LambdaProxyMiddleware implements MiddlewareHandler {
3845 static final Map <String , PathTemplateMatcher <String >> methodToMatcherMap = new HashMap <>();
3946
4047 public LambdaProxyMiddleware () {
41- var builder = LambdaClient .builder ().region (Region .of (CONFIG .getRegion ()));
42-
48+ SdkAsyncHttpClient asyncHttpClient = NettyNioAsyncHttpClient .builder ()
49+ .readTimeout (Duration .ofMillis (CONFIG .getApiCallAttemptTimeout ()))
50+ .writeTimeout (Duration .ofMillis (CONFIG .getApiCallAttemptTimeout ()))
51+ .connectionTimeout (Duration .ofMillis (CONFIG .getApiCallAttemptTimeout ()))
52+ .build ();
53+ ClientOverrideConfiguration overrideConfig = ClientOverrideConfiguration .builder ()
54+ .apiCallTimeout (Duration .ofMillis (CONFIG .getApiCallTimeout ()))
55+ .apiCallAttemptTimeout (Duration .ofSeconds (CONFIG .getApiCallAttemptTimeout ()))
56+ .build ();
57+
58+ var builder = LambdaAsyncClient .builder ().region (Region .of (CONFIG .getRegion ()))
59+ .httpClient (asyncHttpClient )
60+ .overrideConfiguration (overrideConfig );
4361 if (!StringUtils .isEmpty (CONFIG .getEndpointOverride ()))
4462 builder .endpointOverride (URI .create (CONFIG .getEndpointOverride ()));
45- if (CONFIG .isMetricsInjection ()) lookupMetricsMiddleware ();
4663 client = builder .build ();
64+
65+ if (CONFIG .isMetricsInjection ()) lookupMetricsMiddleware ();
4766 populateMethodToMatcherMap (CONFIG .getFunctions ());
4867 if (LOG .isInfoEnabled ()) LOG .info ("LambdaProxyMiddleware is constructed" );
4968 }
@@ -90,9 +109,8 @@ private void populateMethodToMatcherMap(Map<String, String> functions) {
90109 }
91110 }
92111
93- private String invokeFunction (final LambdaClient client , String functionName , final LightLambdaExchange exchange ) {
112+ private String invokeFunction (final LambdaAsyncClient client , String functionName , final LightLambdaExchange exchange ) {
94113 String serializedEvent = JsonMapper .toJson (exchange .getFinalizedRequest (false ));
95- String response = null ;
96114 try {
97115 var payload = SdkBytes .fromUtf8String (serializedEvent );
98116 var request = InvokeRequest .builder ()
@@ -101,25 +119,27 @@ private String invokeFunction(final LambdaClient client, String functionName, fi
101119 .payload (payload )
102120 .build ();
103121 long startTime = System .nanoTime ();
104- var res = client .invoke (request );
105- if (CONFIG .isMetricsInjection ()) {
106- if (metricsMiddleware == null ) lookupMetricsMiddleware ();
107- if (metricsMiddleware != null ) {
108- if (LOG .isTraceEnabled ()) LOG .trace ("Inject metrics for {}" , CONFIG .getMetricsName ());
109- metricsMiddleware .injectMetrics (exchange , startTime , CONFIG .getMetricsName (), null );
110- }
111- }
112- if (LOG .isDebugEnabled ()) {
113- LOG .debug ("lambda call function error:{}" , res .functionError ());
114- LOG .debug ("lambda logger result:{}" , res .logResult ());
115- LOG .debug ("lambda call status:{}" , res .statusCode ());
116- }
117-
118- response = res .payload ().asUtf8String ();
119- } catch (LambdaException e ) {
122+ CompletableFuture <String > futureResponse = client .invoke (request )
123+ .thenApply (res -> {
124+ if (CONFIG .isMetricsInjection ()) {
125+ if (metricsMiddleware == null ) lookupMetricsMiddleware ();
126+ if (metricsMiddleware != null ) {
127+ if (LOG .isTraceEnabled ()) LOG .trace ("Inject metrics for {}" , CONFIG .getMetricsName ());
128+ metricsMiddleware .injectMetrics (exchange , startTime , CONFIG .getMetricsName (), null );
129+ }
130+ }
131+ if (LOG .isTraceEnabled ()) LOG .trace ("LambdaProxyMiddleware.invokeFunction response: {}" , res );
132+ return res .payload ().asUtf8String ();
133+ })
134+ .exceptionally (e -> {
135+ LOG .error ("Error invoking lambda function: {}" , functionName , e );
136+ return null ;
137+ });
138+ return futureResponse .get ();
139+ } catch (InterruptedException | ExecutionException e ) {
120140 LOG .error ("LambdaException" , e );
121141 }
122- return response ;
142+ return null ;
123143 }
124144
125145 @ Override
0 commit comments