2424import group .rxcloud .cloudruntimes .domain .core .invocation .Metadata ;
2525import group .rxcloud .cloudruntimes .utils .TypeRef ;
2626import okhttp3 .Call ;
27- import okhttp3 .OkHttpClient ;
2827import okhttp3 .Callback ;
28+ import okhttp3 .Headers ;
29+ import okhttp3 .MediaType ;
30+ import okhttp3 .OkHttpClient ;
2931import okhttp3 .Request ;
3032import okhttp3 .RequestBody ;
31- import okhttp3 .MediaType ;
3233import okhttp3 .Response ;
3334import okhttp3 .ResponseBody ;
3435import org .slf4j .Logger ;
4243import java .util .concurrent .CompletableFuture ;
4344
4445/**
45- * The type Capa serialize http spi.
46+ * The Capa http spi with default serializer process .
4647 */
4748public abstract class CapaSerializeHttpSpi extends CapaHttpSpi {
4849
@@ -68,9 +69,17 @@ protected byte[] getRequestWithSerialize(Object requestData) {
6869 try {
6970 return objectSerializer .serialize (requestData );
7071 } catch (IOException e ) {
72+ if (logger .isWarnEnabled ()) {
73+ logger .warn ("[CapaSerializeHttpSpi] serialize rpc request[{}] io error" ,
74+ requestData , e );
75+ }
7176 throw new CapaException (CapaErrorContext .PARAMETER_RPC_REQUEST_SERIALIZE_ERROR ,
7277 "Request Type: " + requestData .getClass ().getName ());
7378 } catch (Exception e ) {
79+ if (logger .isWarnEnabled ()) {
80+ logger .warn ("[CapaSerializeHttpSpi] serialize rpc request[{}] error" ,
81+ requestData , e );
82+ }
7483 throw new CapaException (CapaErrorContext .PARAMETER_RPC_REQUEST_SERIALIZE_ERROR ,
7584 "Request Type: " + requestData .getClass ().getName (), e );
7685 }
@@ -84,7 +93,6 @@ protected byte[] getRequestWithSerialize(Object requestData) {
8493 * @return the request body with byte[] serialize
8594 */
8695 protected RequestBody getRequestBodyWithSerialize (Object requestData , Map <String , String > headers ) {
87- byte [] serializedRequestBody = getRequestWithSerialize (requestData );
8896 final String contentType = headers != null
8997 ? headers .get (Metadata .CONTENT_TYPE )
9098 : null ;
@@ -97,11 +105,27 @@ protected RequestBody getRequestBodyWithSerialize(Object requestData, Map<String
97105 ? REQUEST_BODY_EMPTY_JSON
98106 : RequestBody .Companion .create (new byte [0 ], mediaType );
99107 } else {
108+ byte [] serializedRequestBody = getRequestWithSerialize (requestData );
100109 body = RequestBody .Companion .create (serializedRequestBody , mediaType );
101110 }
102111 return body ;
103112 }
104113
114+ /**
115+ * Gets request headers with given params.
116+ *
117+ * @param headersParams user given params
118+ * @return the request headers
119+ */
120+ protected Headers getRequestHeaderWithParams (Map <String , String > headersParams ) {
121+ okhttp3 .Headers .Builder headersBuilder = new okhttp3 .Headers .Builder ();
122+ if (headersParams == null || headersParams .size () == 0 ) {
123+ return headersBuilder .build ();
124+ }
125+ headersParams .forEach (headersBuilder ::add );
126+ return headersBuilder .build ();
127+ }
128+
105129 /**
106130 * Http async call
107131 */
@@ -130,21 +154,23 @@ protected <T> CompletableFuture<HttpResponse<T>> doAsyncInvoke0(Request request,
130154 * @return the response body with byte[] deserialize
131155 */
132156 protected <T > HttpResponse <T > getResponseBodyWithDeserialize (TypeRef <T > type , HttpResponse <byte []> httpResponse ) {
157+ final int httpResponseStatusCode = httpResponse .getStatusCode ();
158+ final Map <String , String > httpResponseHeaders = httpResponse .getHeaders ();
133159 final byte [] httpResponseBody = httpResponse .getBody ();
134160 try {
135161 T responseObject = objectSerializer .deserialize (httpResponseBody , type );
136- return new HttpResponse <>(responseObject , httpResponse . getHeaders (), httpResponse . getStatusCode () );
162+ return new HttpResponse <>(responseObject , httpResponseHeaders , httpResponseStatusCode );
137163 } catch (IOException e ) {
138164 if (logger .isWarnEnabled ()) {
139- logger .warn ("[CapaSerializeHttpSpi] deserialize rpc response[{}] type[{}] io error" ,
140- httpResponseBody , type , e );
165+ logger .warn ("[CapaSerializeHttpSpi] deserialize rpc statusCode[{}] headers[{}] response[{}] type[{}] io error" ,
166+ httpResponseStatusCode , httpResponseHeaders , httpResponseBody , type , e );
141167 }
142168 throw new CapaException (CapaErrorContext .PARAMETER_RPC_RESPONSE_DESERIALIZE_ERROR ,
143169 "Response Type: " + type , e );
144170 } catch (Exception e ) {
145171 if (logger .isWarnEnabled ()) {
146- logger .warn ("[CapaSerializeHttpSpi] deserialize rpc response[{}] type[{}] error" ,
147- httpResponseBody , type , e );
172+ logger .warn ("[CapaSerializeHttpSpi] deserialize rpc statusCode[{}] headers[{}] response[{}] type[{}] error" ,
173+ httpResponseStatusCode , httpResponseHeaders , httpResponseBody , type , e );
148174 }
149175 throw new CapaException (CapaErrorContext .PARAMETER_RPC_RESPONSE_DESERIALIZE_ERROR ,
150176 "Response Type: " + type , e );
@@ -190,9 +216,15 @@ public void onResponse(Call call, Response response) throws IOException {
190216 return ;
191217 }
192218
193- Map <String , String > mapHeaders = new HashMap <>();
194- // response.headers()
195- // .forEach(pair -> mapHeaders.put(pair.getFirst(), pair.getSecond()));
219+ Map <String , String > mapHeaders ;
220+ Headers responseHeaders = response .headers ();
221+ if (responseHeaders == null || responseHeaders .size () == 0 ) {
222+ mapHeaders = new HashMap <>(2 , 1 );
223+ } else {
224+ mapHeaders = new HashMap <>(responseHeaders .size () << 1 );
225+ responseHeaders .forEach (pair -> mapHeaders .put (pair .getFirst (), pair .getSecond ()));
226+ }
227+
196228 HttpResponse <byte []> httpResponse = new HttpResponse <>(bodyBytes , mapHeaders , response .code ());
197229 future .complete (httpResponse );
198230 }
0 commit comments