1- /*---------------------------------------------------------------------------------------------
2- * Copyright (c) 2024 Huawei Technologies Co., Ltd. All rights reserved.
3- * This file is a part of the ModelEngine Project.
4- * Licensed under the MIT License. See License.txt in the project root for license information.
5- *--------------------------------------------------------------------------------------------* /
1+ /*
2+ * Copyright (c) 2024-2025 Huawei Technologies Co., Ltd. All rights reserved.
3+ * This file is a part of the ModelEngine Project.
4+ * Licensed under the MIT License. See License.txt in the project root for license information.
5+ */
66
77package modelengine .fit .http .server .support ;
88
2222import modelengine .fit .http .entity .Entity ;
2323import modelengine .fit .http .entity .FileEntity ;
2424import modelengine .fit .http .entity .ReadableBinaryEntity ;
25+ import modelengine .fit .http .entity .TextEvent ;
2526import modelengine .fit .http .entity .TextEventStreamEntity ;
2627import modelengine .fit .http .entity .WritableBinaryEntity ;
2728import modelengine .fit .http .entity .support .DefaultWritableBinaryEntity ;
3637import modelengine .fit .http .server .HttpClassicServerResponse ;
3738import modelengine .fit .http .server .InternalServerErrorException ;
3839import modelengine .fit .http .support .AbstractHttpClassicResponse ;
40+ import modelengine .fitframework .flowable .Subscription ;
3941import modelengine .fitframework .resource .UrlUtils ;
4042import modelengine .fitframework .serialization .ObjectSerializer ;
4143import modelengine .fitframework .util .ObjectUtils ;
4547import java .nio .charset .Charset ;
4648import java .nio .charset .StandardCharsets ;
4749import java .util .Optional ;
48- import java .util .concurrent .CountDownLatch ;
49- import java .util .concurrent .atomic .AtomicReference ;
5050
5151/**
5252 * 表示 {@link HttpClassicServerResponse} 的默认实现。
@@ -148,78 +148,106 @@ public void send() {
148148 if (this .entity == null ) {
149149 this .headers ().set (CONTENT_LENGTH , ZERO );
150150 this .serverResponse .writeStartLineAndHeaders ();
151- } else if (this .entity instanceof ReadableBinaryEntity ) {
152- if (this .entity instanceof FileEntity ) {
153- FileEntity actual = cast (this .entity );
154- this .headers ().set (CONTENT_LENGTH , String .valueOf (actual .length ()));
155- } else if (!this .headers ().contains (CONTENT_LENGTH )) {
156- this .headers ().set (TRANSFER_ENCODING , CHUNKED );
157- }
158- this .serverResponse .writeStartLineAndHeaders ();
159- ReadableBinaryEntity readableBinaryEntity = cast (this .entity );
160- byte [] bytes = new byte [512 ];
161- int read ;
162- while ((read = readableBinaryEntity .read (bytes )) > -1 ) {
163- this .serverResponse .writeBody (bytes , 0 , read );
164- }
165- } else if (this .entity instanceof WritableBinaryEntity ) {
166- // WritableBinaryEntity 已经在用户代码层面进行了输出,因此此处什么都不需要处理。
151+ this .serverResponse .flush ();
167152 } else if (this .entity instanceof TextEventStreamEntity ) {
168153 this .headers ().set (CACHE_CONTROL , NO_CACHE );
169154 this .headers ().set (CONNECTION , KEEP_ALIVE );
170155 this .headers ().set (TRANSFER_ENCODING , CHUNKED );
171156 this .serverResponse .writeStartLineAndHeaders ();
172- this .sendTextEventStream (cast (this .entity ));
157+ this .sendTextEventStream (cast (this .entity ), charset );
173158 } else {
174- byte [] entityBytes = this .entitySerializer ().serializeEntity (ObjectUtils .cast (this .entity ), charset );
175- this .headers ().set (CONTENT_LENGTH , String .valueOf (entityBytes .length ));
176- this .serverResponse .writeStartLineAndHeaders ();
177- this .serverResponse .writeBody (entityBytes );
159+ this .sendDirectly (charset );
178160 }
179- this .serverResponse .flush ();
180161 } catch (IOException e ) {
181162 throw new InternalServerErrorException ("Failed to write response." , e );
182163 }
183164 }
184165
185- @ Override
186- public boolean isActive () {
187- return this .serverResponse .isActive ();
188- }
189-
190- private void sendTextEventStream (TextEventStreamEntity eventStreamEntity ) throws IOException {
166+ private void sendTextEventStream (TextEventStreamEntity eventStreamEntity , Charset charset ) throws IOException {
191167 ObjectSerializer objectSerializer = this .jsonSerializer ()
192168 .orElseThrow (() -> new IllegalStateException ("The json serializer cannot be null." ));
193- AtomicReference <Exception > exception = new AtomicReference <>();
194- CountDownLatch latch = new CountDownLatch (1 );
195169 eventStreamEntity .stream ()
196- .map (sse -> sse .serialize (objectSerializer ).getBytes (StandardCharsets .UTF_8 ))
197- .subscribe (null , (subscription , bytes ) -> {
198- try {
199- this .serverResponse .writeBody (bytes );
200- } catch (IOException e ) {
201- subscription .cancel ();
202- exception .set (e );
203- latch .countDown ();
204- }
205- }, subscription -> latch .countDown (), (ignore , e ) -> {
206- exception .set (e );
207- latch .countDown ();
208- });
170+ .map (event -> event .serialize (objectSerializer ).getBytes (charset ))
171+ .subscribe (null ,
172+ (subscription , bytes ) -> this .onSseMessage (subscription , bytes , charset ),
173+ subscription -> this .onSseComplete (charset ),
174+ (subscription , e ) -> this .onSseError (e , charset ));
175+ }
176+
177+ private void onSseMessage (Subscription subscription , byte [] bytes , Charset charset ) {
209178 try {
210- latch . await ( );
211- } catch (InterruptedException e ) {
212- Thread . currentThread (). interrupt ();
213- throw new InternalServerErrorException ( "Failed to execute handler." , e );
179+ this . serverResponse . writeBody ( bytes );
180+ } catch (IOException e ) {
181+ subscription . cancel ();
182+ this . onSseError ( e , charset );
214183 }
215- Exception e = exception .get ();
216- if (e == null ) {
217- return ;
184+ }
185+
186+ private void onSseComplete (Charset charset ) {
187+ try {
188+ this .serverResponse .flush ();
189+ } catch (IOException e ) {
190+ this .onSseError (e , charset );
191+ } finally {
192+ try {
193+ this .close0 ();
194+ } catch (IOException e ) {
195+ // ignore.
196+ }
197+ }
198+ }
199+
200+ private void onSseError (Throwable throwable , Charset charset ) {
201+ try {
202+ TextEvent errorEvent = TextEvent .custom ().event ("error" ).data (throwable .getMessage ()).build ();
203+ ObjectSerializer objectSerializer = this .jsonSerializer ()
204+ .orElseThrow (() -> new IllegalStateException ("The json serializer cannot be null." ));
205+ this .serverResponse .writeBody (errorEvent .serialize (objectSerializer ).getBytes (charset ));
206+ this .serverResponse .flush ();
207+ } catch (IOException e ) {
208+ this .serverResponse .closeChannel ();
209+ InternalServerErrorException internalServerErrorException =
210+ new InternalServerErrorException ("Failed to send error response when sse." , e );
211+ internalServerErrorException .addSuppressed (throwable );
212+ throw internalServerErrorException ;
213+ } finally {
214+ try {
215+ this .close0 ();
216+ } catch (IOException e ) {
217+ // ignore.
218+ }
218219 }
219- if (e instanceof IOException ) {
220- throw (IOException ) e ;
220+ }
221+
222+ private void sendDirectly (Charset charset ) throws IOException {
223+ if (this .entity instanceof ReadableBinaryEntity ) {
224+ if (this .entity instanceof FileEntity ) {
225+ FileEntity actual = cast (this .entity );
226+ this .headers ().set (CONTENT_LENGTH , String .valueOf (actual .length ()));
227+ } else if (!this .headers ().contains (CONTENT_LENGTH )) {
228+ this .headers ().set (TRANSFER_ENCODING , CHUNKED );
229+ }
230+ this .serverResponse .writeStartLineAndHeaders ();
231+ ReadableBinaryEntity readableBinaryEntity = cast (this .entity );
232+ byte [] bytes = new byte [512 ];
233+ int read ;
234+ while ((read = readableBinaryEntity .read (bytes )) > -1 ) {
235+ this .serverResponse .writeBody (bytes , 0 , read );
236+ }
237+ } else if (this .entity instanceof WritableBinaryEntity ) {
238+ // WritableBinaryEntity 已经在用户代码层面进行了输出,因此此处什么都不需要处理。
239+ } else {
240+ byte [] entityBytes = this .entitySerializer ().serializeEntity (ObjectUtils .cast (this .entity ), charset );
241+ this .headers ().set (CONTENT_LENGTH , String .valueOf (entityBytes .length ));
242+ this .serverResponse .writeStartLineAndHeaders ();
243+ this .serverResponse .writeBody (entityBytes );
221244 }
222- throw new InternalServerErrorException ("Failed to execute handler." , e );
245+ this .serverResponse .flush ();
246+ }
247+
248+ @ Override
249+ public boolean isActive () {
250+ return this .serverResponse .isActive ();
223251 }
224252
225253 @ Override
@@ -239,6 +267,13 @@ protected void commit() {
239267
240268 @ Override
241269 public void close () throws IOException {
270+ if (this .entity instanceof TextEventStreamEntity ) {
271+ return ;
272+ }
273+ this .close0 ();
274+ }
275+
276+ private void close0 () throws IOException {
242277 this .serverResponse .close ();
243278 if (this .entity != null ) {
244279 this .entity .close ();
0 commit comments