Skip to content

Commit 2c39003

Browse files
authored
refactor(ext/http): use concrete error types (denoland#26377)
1 parent 8ca8174 commit 2c39003

File tree

7 files changed

+305
-130
lines changed

7 files changed

+305
-130
lines changed

ext/http/http_next.rs

Lines changed: 83 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ use crate::service::SignallingRc;
1919
use crate::websocket_upgrade::WebSocketUpgrade;
2020
use crate::LocalExecutor;
2121
use cache_control::CacheControl;
22-
use deno_core::error::AnyError;
2322
use deno_core::external;
2423
use deno_core::futures::future::poll_fn;
2524
use deno_core::futures::TryFutureExt;
@@ -146,12 +145,32 @@ macro_rules! clone_external {
146145
}};
147146
}
148147

148+
#[derive(Debug, thiserror::Error)]
149+
pub enum HttpNextError {
150+
#[error(transparent)]
151+
Resource(deno_core::error::AnyError),
152+
#[error("{0}")]
153+
Io(#[from] io::Error),
154+
#[error(transparent)]
155+
WebSocketUpgrade(crate::websocket_upgrade::WebSocketUpgradeError),
156+
#[error("{0}")]
157+
Hyper(#[from] hyper::Error),
158+
#[error(transparent)]
159+
JoinError(#[from] tokio::task::JoinError),
160+
#[error(transparent)]
161+
Canceled(#[from] deno_core::Canceled),
162+
#[error(transparent)]
163+
HttpPropertyExtractor(deno_core::error::AnyError),
164+
#[error(transparent)]
165+
UpgradeUnavailable(#[from] crate::service::UpgradeUnavailableError),
166+
}
167+
149168
#[op2(fast)]
150169
#[smi]
151170
pub fn op_http_upgrade_raw(
152171
state: &mut OpState,
153172
external: *const c_void,
154-
) -> Result<ResourceId, AnyError> {
173+
) -> Result<ResourceId, HttpNextError> {
155174
// SAFETY: external is deleted before calling this op.
156175
let http = unsafe { take_external!(external, "op_http_upgrade_raw") };
157176

@@ -177,7 +196,7 @@ pub fn op_http_upgrade_raw(
177196
upgraded.write_all(&bytes).await?;
178197
break upgraded;
179198
}
180-
Err(err) => return Err(err),
199+
Err(err) => return Err(HttpNextError::WebSocketUpgrade(err)),
181200
}
182201
};
183202

@@ -193,7 +212,7 @@ pub fn op_http_upgrade_raw(
193212
}
194213
read_tx.write_all(&buf[..read]).await?;
195214
}
196-
Ok::<_, AnyError>(())
215+
Ok::<_, HttpNextError>(())
197216
});
198217
spawn(async move {
199218
let mut buf = [0; 1024];
@@ -204,7 +223,7 @@ pub fn op_http_upgrade_raw(
204223
}
205224
upgraded_tx.write_all(&buf[..read]).await?;
206225
}
207-
Ok::<_, AnyError>(())
226+
Ok::<_, HttpNextError>(())
208227
});
209228

210229
Ok(())
@@ -223,7 +242,7 @@ pub async fn op_http_upgrade_websocket_next(
223242
state: Rc<RefCell<OpState>>,
224243
external: *const c_void,
225244
#[serde] headers: Vec<(ByteString, ByteString)>,
226-
) -> Result<ResourceId, AnyError> {
245+
) -> Result<ResourceId, HttpNextError> {
227246
let http =
228247
// SAFETY: external is deleted before calling this op.
229248
unsafe { take_external!(external, "op_http_upgrade_websocket_next") };
@@ -690,7 +709,7 @@ pub async fn op_http_set_response_body_resource(
690709
#[smi] stream_rid: ResourceId,
691710
auto_close: bool,
692711
status: u16,
693-
) -> Result<bool, AnyError> {
712+
) -> Result<bool, HttpNextError> {
694713
let http =
695714
// SAFETY: op is called with external.
696715
unsafe { clone_external!(external, "op_http_set_response_body_resource") };
@@ -705,9 +724,15 @@ pub async fn op_http_set_response_body_resource(
705724
let resource = {
706725
let mut state = state.borrow_mut();
707726
if auto_close {
708-
state.resource_table.take_any(stream_rid)?
727+
state
728+
.resource_table
729+
.take_any(stream_rid)
730+
.map_err(HttpNextError::Resource)?
709731
} else {
710-
state.resource_table.get_any(stream_rid)?
732+
state
733+
.resource_table
734+
.get_any(stream_rid)
735+
.map_err(HttpNextError::Resource)?
711736
}
712737
};
713738

@@ -814,17 +839,17 @@ async fn serve_http2_autodetect(
814839
io: impl HttpServeStream,
815840
svc: impl HttpService<Incoming, ResBody = HttpRecordResponse> + 'static,
816841
cancel: Rc<CancelHandle>,
817-
) -> Result<(), AnyError> {
842+
) -> Result<(), HttpNextError> {
818843
let prefix = NetworkStreamPrefixCheck::new(io, HTTP2_PREFIX);
819844
let (matches, io) = prefix.match_prefix().await?;
820845
if matches {
821846
serve_http2_unconditional(io, svc, cancel)
822847
.await
823-
.map_err(|e| e.into())
848+
.map_err(HttpNextError::Hyper)
824849
} else {
825850
serve_http11_unconditional(io, svc, cancel)
826851
.await
827-
.map_err(|e| e.into())
852+
.map_err(HttpNextError::Hyper)
828853
}
829854
}
830855

@@ -833,7 +858,7 @@ fn serve_https(
833858
request_info: HttpConnectionProperties,
834859
lifetime: HttpLifetime,
835860
tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>,
836-
) -> JoinHandle<Result<(), AnyError>> {
861+
) -> JoinHandle<Result<(), HttpNextError>> {
837862
let HttpLifetime {
838863
server_state,
839864
connection_cancel_handle,
@@ -852,11 +877,11 @@ fn serve_https(
852877
if Some(TLS_ALPN_HTTP_2) == handshake.as_deref() {
853878
serve_http2_unconditional(io, svc, listen_cancel_handle)
854879
.await
855-
.map_err(|e| e.into())
880+
.map_err(HttpNextError::Hyper)
856881
} else if Some(TLS_ALPN_HTTP_11) == handshake.as_deref() {
857882
serve_http11_unconditional(io, svc, listen_cancel_handle)
858883
.await
859-
.map_err(|e| e.into())
884+
.map_err(HttpNextError::Hyper)
860885
} else {
861886
serve_http2_autodetect(io, svc, listen_cancel_handle).await
862887
}
@@ -870,7 +895,7 @@ fn serve_http(
870895
request_info: HttpConnectionProperties,
871896
lifetime: HttpLifetime,
872897
tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>,
873-
) -> JoinHandle<Result<(), AnyError>> {
898+
) -> JoinHandle<Result<(), HttpNextError>> {
874899
let HttpLifetime {
875900
server_state,
876901
connection_cancel_handle,
@@ -891,7 +916,7 @@ fn serve_http_on<HTTP>(
891916
listen_properties: &HttpListenProperties,
892917
lifetime: HttpLifetime,
893918
tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>,
894-
) -> JoinHandle<Result<(), AnyError>>
919+
) -> JoinHandle<Result<(), HttpNextError>>
895920
where
896921
HTTP: HttpPropertyExtractor,
897922
{
@@ -922,7 +947,7 @@ struct HttpLifetime {
922947
}
923948

924949
struct HttpJoinHandle {
925-
join_handle: AsyncRefCell<Option<JoinHandle<Result<(), AnyError>>>>,
950+
join_handle: AsyncRefCell<Option<JoinHandle<Result<(), HttpNextError>>>>,
926951
connection_cancel_handle: Rc<CancelHandle>,
927952
listen_cancel_handle: Rc<CancelHandle>,
928953
rx: AsyncRefCell<tokio::sync::mpsc::Receiver<Rc<HttpRecord>>>,
@@ -982,12 +1007,13 @@ impl Drop for HttpJoinHandle {
9821007
pub fn op_http_serve<HTTP>(
9831008
state: Rc<RefCell<OpState>>,
9841009
#[smi] listener_rid: ResourceId,
985-
) -> Result<(ResourceId, &'static str, String), AnyError>
1010+
) -> Result<(ResourceId, &'static str, String), HttpNextError>
9861011
where
9871012
HTTP: HttpPropertyExtractor,
9881013
{
9891014
let listener =
990-
HTTP::get_listener_for_rid(&mut state.borrow_mut(), listener_rid)?;
1015+
HTTP::get_listener_for_rid(&mut state.borrow_mut(), listener_rid)
1016+
.map_err(HttpNextError::Resource)?;
9911017

9921018
let listen_properties = HTTP::listen_properties_from_listener(&listener)?;
9931019

@@ -1002,7 +1028,8 @@ where
10021028
loop {
10031029
let conn = HTTP::accept_connection_from_listener(&listener)
10041030
.try_or_cancel(listen_cancel_clone.clone())
1005-
.await?;
1031+
.await
1032+
.map_err(HttpNextError::HttpPropertyExtractor)?;
10061033
serve_http_on::<HTTP>(
10071034
conn,
10081035
&listen_properties_clone,
@@ -1011,7 +1038,7 @@ where
10111038
);
10121039
}
10131040
#[allow(unreachable_code)]
1014-
Ok::<_, AnyError>(())
1041+
Ok::<_, HttpNextError>(())
10151042
});
10161043

10171044
// Set the handle after we start the future
@@ -1031,25 +1058,25 @@ where
10311058
pub fn op_http_serve_on<HTTP>(
10321059
state: Rc<RefCell<OpState>>,
10331060
#[smi] connection_rid: ResourceId,
1034-
) -> Result<(ResourceId, &'static str, String), AnyError>
1061+
) -> Result<(ResourceId, &'static str, String), HttpNextError>
10351062
where
10361063
HTTP: HttpPropertyExtractor,
10371064
{
10381065
let connection =
1039-
HTTP::get_connection_for_rid(&mut state.borrow_mut(), connection_rid)?;
1066+
HTTP::get_connection_for_rid(&mut state.borrow_mut(), connection_rid)
1067+
.map_err(HttpNextError::Resource)?;
10401068

10411069
let listen_properties = HTTP::listen_properties_from_connection(&connection)?;
10421070

10431071
let (tx, rx) = tokio::sync::mpsc::channel(10);
10441072
let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle::new(rx));
10451073

1046-
let handle: JoinHandle<Result<(), deno_core::anyhow::Error>> =
1047-
serve_http_on::<HTTP>(
1048-
connection,
1049-
&listen_properties,
1050-
resource.lifetime(),
1051-
tx,
1052-
);
1074+
let handle = serve_http_on::<HTTP>(
1075+
connection,
1076+
&listen_properties,
1077+
resource.lifetime(),
1078+
tx,
1079+
);
10531080

10541081
// Set the handle after we start the future
10551082
*RcRef::map(&resource, |this| &this.join_handle)
@@ -1095,12 +1122,13 @@ pub fn op_http_try_wait(
10951122
pub async fn op_http_wait(
10961123
state: Rc<RefCell<OpState>>,
10971124
#[smi] rid: ResourceId,
1098-
) -> Result<*const c_void, AnyError> {
1125+
) -> Result<*const c_void, HttpNextError> {
10991126
// We will get the join handle initially, as we might be consuming requests still
11001127
let join_handle = state
11011128
.borrow_mut()
11021129
.resource_table
1103-
.get::<HttpJoinHandle>(rid)?;
1130+
.get::<HttpJoinHandle>(rid)
1131+
.map_err(HttpNextError::Resource)?;
11041132

11051133
let cancel = join_handle.listen_cancel_handle();
11061134
let next = async {
@@ -1127,13 +1155,12 @@ pub async fn op_http_wait(
11271155

11281156
// Filter out shutdown (ENOTCONN) errors
11291157
if let Err(err) = res {
1130-
if let Some(err) = err.source() {
1131-
if let Some(err) = err.downcast_ref::<io::Error>() {
1132-
if err.kind() == io::ErrorKind::NotConnected {
1133-
return Ok(null());
1134-
}
1158+
if let HttpNextError::Io(err) = &err {
1159+
if err.kind() == io::ErrorKind::NotConnected {
1160+
return Ok(null());
11351161
}
11361162
}
1163+
11371164
return Err(err);
11381165
}
11391166

@@ -1146,7 +1173,7 @@ pub fn op_http_cancel(
11461173
state: &mut OpState,
11471174
#[smi] rid: ResourceId,
11481175
graceful: bool,
1149-
) -> Result<(), AnyError> {
1176+
) -> Result<(), deno_core::error::AnyError> {
11501177
let join_handle = state.resource_table.get::<HttpJoinHandle>(rid)?;
11511178

11521179
if graceful {
@@ -1166,11 +1193,12 @@ pub async fn op_http_close(
11661193
state: Rc<RefCell<OpState>>,
11671194
#[smi] rid: ResourceId,
11681195
graceful: bool,
1169-
) -> Result<(), AnyError> {
1196+
) -> Result<(), HttpNextError> {
11701197
let join_handle = state
11711198
.borrow_mut()
11721199
.resource_table
1173-
.take::<HttpJoinHandle>(rid)?;
1200+
.take::<HttpJoinHandle>(rid)
1201+
.map_err(HttpNextError::Resource)?;
11741202

11751203
if graceful {
11761204
http_general_trace!("graceful shutdown");
@@ -1216,23 +1244,26 @@ impl UpgradeStream {
12161244
}
12171245
}
12181246

1219-
async fn read(self: Rc<Self>, buf: &mut [u8]) -> Result<usize, AnyError> {
1247+
async fn read(
1248+
self: Rc<Self>,
1249+
buf: &mut [u8],
1250+
) -> Result<usize, std::io::Error> {
12201251
let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle);
12211252
async {
12221253
let read = RcRef::map(self, |this| &this.read);
12231254
let mut read = read.borrow_mut().await;
1224-
Ok(Pin::new(&mut *read).read(buf).await?)
1255+
Pin::new(&mut *read).read(buf).await
12251256
}
12261257
.try_or_cancel(cancel_handle)
12271258
.await
12281259
}
12291260

1230-
async fn write(self: Rc<Self>, buf: &[u8]) -> Result<usize, AnyError> {
1261+
async fn write(self: Rc<Self>, buf: &[u8]) -> Result<usize, std::io::Error> {
12311262
let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle);
12321263
async {
12331264
let write = RcRef::map(self, |this| &this.write);
12341265
let mut write = write.borrow_mut().await;
1235-
Ok(Pin::new(&mut *write).write(buf).await?)
1266+
Pin::new(&mut *write).write(buf).await
12361267
}
12371268
.try_or_cancel(cancel_handle)
12381269
.await
@@ -1242,7 +1273,7 @@ impl UpgradeStream {
12421273
self: Rc<Self>,
12431274
buf1: &[u8],
12441275
buf2: &[u8],
1245-
) -> Result<usize, AnyError> {
1276+
) -> Result<usize, std::io::Error> {
12461277
let mut wr = RcRef::map(self, |r| &r.write).borrow_mut().await;
12471278

12481279
let total = buf1.len() + buf2.len();
@@ -1295,9 +1326,12 @@ pub async fn op_raw_write_vectored(
12951326
#[smi] rid: ResourceId,
12961327
#[buffer] buf1: JsBuffer,
12971328
#[buffer] buf2: JsBuffer,
1298-
) -> Result<usize, AnyError> {
1299-
let resource: Rc<UpgradeStream> =
1300-
state.borrow().resource_table.get::<UpgradeStream>(rid)?;
1329+
) -> Result<usize, HttpNextError> {
1330+
let resource: Rc<UpgradeStream> = state
1331+
.borrow()
1332+
.resource_table
1333+
.get::<UpgradeStream>(rid)
1334+
.map_err(HttpNextError::Resource)?;
13011335
let nwritten = resource.write_vectored(&buf1, &buf2).await?;
13021336
Ok(nwritten)
13031337
}

0 commit comments

Comments
 (0)