Skip to content
This repository was archived by the owner on Oct 25, 2024. It is now read-only.

Commit 9c72ad3

Browse files
authored
Don't write data after write side closed. (#60)
Stream could be invalid when read side and write side are closed, or connection is closed.
1 parent 6d9275e commit 9c72ad3

File tree

5 files changed

+129
-75
lines changed

5 files changed

+129
-75
lines changed

web_transport/docs/threading_model.md

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,27 @@
22

33
OWT QUIC SDK provides thread safe APIs, but object's creation and deletion must be called on the same thread. Objects are usually created by `WebTransportFactory`.
44

5+
All calls to QUIC SDK are proxied to an internal IO thread as described below. All callbacks from QUIC SDK are called from this internal IO thread as well. It is recommended to do lightweight tasks only in callbacks.
6+
7+
## Deadlock
8+
9+
Because your thread is blocked when it calls QUIC SDK, please be cautious for deadlocks. An example to cause deadlock is:
10+
11+
1. An external thread calls an API of QUIC SDK.
12+
1. QUIC SDK fires an event while the origin call is not finished.
13+
1. Event handler calls the same external thread.
14+
15+
Deadlock happens because the external thread is waiting for the internal IO thread, and internal IO thread is waiting for the external thread.
16+
517
## Internals
618

7-
There are two threads maintained internally:
19+
There is a thread maintained internally:
820

921
- `io_thread`: Calls to Chromium's QUIC implementation are delegated to this thread.
10-
- `event_thread`: Callbacks and events are fired on this thread. We may move to sequence later.
1122

12-
These two threads are owned by `WebTransportFactory`. Basically, all objects created by `WebTransportFactory` re-use the same task queues based on the two threads above.
23+
This thread are owned by `WebTransportFactory`. Basically, all objects created by `WebTransportFactory` re-use the same task queue based on the thread above.
24+
25+
Another thread `event_thread` was used for callbacks. But it will be removed.
1326

1427
## Reference
1528

web_transport/sdk/impl/web_transport_server_session.cc

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ WebTransportStreamInterface*
132132
WebTransportServerSession::CreateBidirectionalStreamOnCurrentThread() {
133133
::quic::WebTransportStream* wt_stream =
134134
session_->OpenOutgoingBidirectionalStream();
135-
std::unique_ptr<WebTransportStreamInterface> stream =
135+
std::unique_ptr<WebTransportStreamImpl> stream =
136136
std::make_unique<WebTransportStreamImpl>(
137137
wt_stream,
138138
http3_session_->GetOrCreateStream(wt_stream->GetStreamId()),
@@ -205,15 +205,17 @@ void WebTransportServerSession::OnIncomingUnidirectionalStreamAvailable() {
205205
void WebTransportServerSession::OnSessionClosed(
206206
::quic::WebTransportSessionError error_code,
207207
const std::string& error_message) {
208+
for (auto& stream : streams_) {
209+
stream->OnSessionClosed();
210+
}
208211
if (visitor_) {
209212
visitor_->OnConnectionClosed();
210213
}
211214
}
212215

213216
void WebTransportServerSession::AcceptIncomingStream(
214217
::quic::WebTransportStream* stream) {
215-
LOG(INFO) << "Accept incoming stream.";
216-
std::unique_ptr<WebTransportStreamInterface> wt_stream =
218+
std::unique_ptr<WebTransportStreamImpl> wt_stream =
217219
std::make_unique<WebTransportStreamImpl>(
218220
stream, http3_session_->GetOrCreateStream(stream->GetStreamId()),
219221
io_runner_, event_runner_);

web_transport/sdk/impl/web_transport_server_session.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
namespace owt {
2020
namespace quic {
2121

22+
class WebTransportStreamImpl;
23+
2224
// A proxy of ::quic::WebTransportHttp3. WebTransport over HTTP/2 is not
2325
// supported.
2426
class WebTransportServerSession : public WebTransportSessionInterface,
@@ -66,7 +68,7 @@ class WebTransportServerSession : public WebTransportSessionInterface,
6668
::quic::QuicSpdySession* http3_session_;
6769
base::SingleThreadTaskRunner* io_runner_;
6870
base::SingleThreadTaskRunner* event_runner_;
69-
std::vector<std::unique_ptr<WebTransportStreamInterface>> streams_;
71+
std::vector<std::unique_ptr<WebTransportStreamImpl>> streams_;
7072
WebTransportSessionInterface::Visitor* visitor_;
7173
ConnectionStats stats_;
7274
};

web_transport/sdk/impl/web_transport_stream_impl.cc

Lines changed: 100 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,17 @@ class WebTransportStreamVisitorAdapter
2828
: visitor_(visitor) {}
2929
void OnCanRead() override { visitor_->OnCanRead(); }
3030
void OnCanWrite() override { visitor_->OnCanWrite(); }
31-
void OnResetStreamReceived(::quic::WebTransportStreamError error) override {}
32-
void OnStopSendingReceived(::quic::WebTransportStreamError error) override {}
31+
void OnResetStreamReceived(::quic::WebTransportStreamError error) override {
32+
LOG(INFO)<<"OnResetStream received.";
33+
if (visitor_) {
34+
visitor_->OnResetStreamReceived(error);
35+
}
36+
}
37+
void OnStopSendingReceived(::quic::WebTransportStreamError error) override {
38+
if (visitor_) {
39+
visitor_->OnStopSendingReceived(error);
40+
}
41+
}
3342
void OnWriteSideInDataRecvdState() override {}
3443

3544
private:
@@ -45,7 +54,8 @@ WebTransportStreamImpl::WebTransportStreamImpl(
4554
quic_stream_(quic_stream),
4655
io_runner_(io_runner),
4756
event_runner_(event_runner),
48-
visitor_(nullptr) {
57+
visitor_(nullptr),
58+
write_side_closed_(false) {
4959
CHECK(stream_);
5060
CHECK(quic_stream_);
5161
CHECK(io_runner_);
@@ -63,17 +73,24 @@ size_t WebTransportStreamImpl::Write(const uint8_t* data, size_t length) {
6373
DCHECK_EQ(sizeof(uint8_t), sizeof(char));
6474
CHECK(io_runner_);
6575
if (io_runner_->BelongsToCurrentThread()) {
76+
if (write_side_closed_) {
77+
return 0;
78+
}
6679
return stream_->Write(
6780
absl::string_view(reinterpret_cast<const char*>(data), length));
6881
}
69-
bool result;
82+
bool result = false;
7083
base::WaitableEvent done(base::WaitableEvent::ResetPolicy::AUTOMATIC,
7184
base::WaitableEvent::InitialState::NOT_SIGNALED);
7285
io_runner_->PostTask(
7386
FROM_HERE,
7487
base::BindOnce(
75-
[](WebTransportStreamImpl* stream, const uint8_t* data, size_t& length,
76-
bool& result, base::WaitableEvent* event) {
88+
[](base::WeakPtr<WebTransportStreamImpl> stream, const uint8_t* data,
89+
size_t& length, bool& result, base::WaitableEvent* event) {
90+
if (!stream || stream->write_side_closed_) {
91+
event->Signal();
92+
return;
93+
}
7794
if (stream->stream_->CanWrite()) {
7895
result = stream->stream_->Write(absl::string_view(
7996
reinterpret_cast<const char*>(data), length));
@@ -82,7 +99,7 @@ size_t WebTransportStreamImpl::Write(const uint8_t* data, size_t length) {
8299
}
83100
event->Signal();
84101
},
85-
base::Unretained(this), base::Unretained(data), std::ref(length),
102+
weak_factory_.GetWeakPtr(), base::Unretained(data), std::ref(length),
86103
std::ref(result), base::Unretained(&done)));
87104
done.Wait();
88105
return result ? length : 0;
@@ -100,21 +117,25 @@ size_t WebTransportStreamImpl::Read(uint8_t* data, size_t length) {
100117
// TODO: FIN is not handled.
101118
return read_result.bytes_read;
102119
}
103-
size_t result;
120+
size_t result = 0;
104121
base::WaitableEvent done(base::WaitableEvent::ResetPolicy::AUTOMATIC,
105122
base::WaitableEvent::InitialState::NOT_SIGNALED);
106123
io_runner_->PostTask(
107124
FROM_HERE,
108125
base::BindOnce(
109-
[](WebTransportStreamImpl* stream, uint8_t* data, size_t& length,
110-
size_t& result, base::WaitableEvent* event) {
126+
[](base::WeakPtr<WebTransportStreamImpl> stream, uint8_t* data,
127+
size_t& length, size_t& result, base::WaitableEvent* event) {
128+
if (!stream) {
129+
event->Signal();
130+
return;
131+
}
111132
auto read_result =
112133
stream->stream_->Read(reinterpret_cast<char*>(data), length);
113134
// TODO: FIN is not handled.
114135
result = read_result.bytes_read;
115136
event->Signal();
116137
},
117-
base::Unretained(this), base::Unretained(data), std::ref(length),
138+
weak_factory_.GetWeakPtr(), base::Unretained(data), std::ref(length),
118139
std::ref(result), base::Unretained(&done)));
119140
done.Wait();
120141
return result;
@@ -124,18 +145,22 @@ size_t WebTransportStreamImpl::ReadableBytes() const {
124145
if (io_runner_->BelongsToCurrentThread()) {
125146
return stream_->ReadableBytes();
126147
}
127-
size_t result;
148+
size_t result = 0;
128149
base::WaitableEvent done(base::WaitableEvent::ResetPolicy::AUTOMATIC,
129150
base::WaitableEvent::InitialState::NOT_SIGNALED);
130-
io_runner_->PostTask(
131-
FROM_HERE,
132-
base::BindOnce(
133-
[](WebTransportStreamImpl const* stream, size_t& result,
134-
base::WaitableEvent* event) {
135-
result = stream->stream_->ReadableBytes();
136-
event->Signal();
137-
},
138-
base::Unretained(this), std::ref(result), base::Unretained(&done)));
151+
io_runner_->PostTask(FROM_HERE,
152+
base::BindOnce(
153+
[](base::WeakPtr<WebTransportStreamImpl> stream,
154+
size_t& result, base::WaitableEvent* event) {
155+
if (!stream) {
156+
event->Signal();
157+
return;
158+
}
159+
result = stream->stream_->ReadableBytes();
160+
event->Signal();
161+
},
162+
weak_factory_.GetWeakPtr(), std::ref(result),
163+
base::Unretained(&done)));
139164
done.Wait();
140165
return result;
141166
}
@@ -150,34 +175,42 @@ void WebTransportStreamImpl::Close() {
150175
base::WaitableEvent done(base::WaitableEvent::ResetPolicy::AUTOMATIC,
151176
base::WaitableEvent::InitialState::NOT_SIGNALED);
152177
io_runner_->PostTask(
153-
FROM_HERE,
154-
base::BindOnce(
155-
[](WebTransportStreamImpl* stream, base::WaitableEvent* event) {
156-
if (!stream->stream_->SendFin()) {
157-
LOG(ERROR) << "Failed to send FIN.";
158-
}
159-
event->Signal();
160-
},
161-
base::Unretained(this), base::Unretained(&done)));
178+
FROM_HERE, base::BindOnce(
179+
[](base::WeakPtr<WebTransportStreamImpl> stream,
180+
base::WaitableEvent* event) {
181+
if (!stream) {
182+
event->Signal();
183+
return;
184+
}
185+
if (!stream->stream_->SendFin()) {
186+
LOG(ERROR) << "Failed to send FIN.";
187+
}
188+
event->Signal();
189+
},
190+
weak_factory_.GetWeakPtr(), base::Unretained(&done)));
162191
done.Wait();
163192
}
164193

165194
uint64_t WebTransportStreamImpl::BufferedDataBytes() const {
166195
if (io_runner_->BelongsToCurrentThread()) {
167196
return quic_stream_->BufferedDataBytes();
168197
}
169-
uint64_t result;
198+
uint64_t result = 0;
170199
base::WaitableEvent done(base::WaitableEvent::ResetPolicy::AUTOMATIC,
171200
base::WaitableEvent::InitialState::NOT_SIGNALED);
172-
io_runner_->PostTask(
173-
FROM_HERE,
174-
base::BindOnce(
175-
[](WebTransportStreamImpl const* stream, uint64_t& result,
176-
base::WaitableEvent* event) {
177-
result = stream->quic_stream_->BufferedDataBytes();
178-
event->Signal();
179-
},
180-
base::Unretained(this), std::ref(result), base::Unretained(&done)));
201+
io_runner_->PostTask(FROM_HERE,
202+
base::BindOnce(
203+
[](base::WeakPtr<WebTransportStreamImpl> stream,
204+
uint64_t& result, base::WaitableEvent* event) {
205+
if (!stream) {
206+
event->Signal();
207+
return;
208+
}
209+
result = stream->quic_stream_->BufferedDataBytes();
210+
event->Signal();
211+
},
212+
weak_factory_.GetWeakPtr(), std::ref(result),
213+
base::Unretained(&done)));
181214
done.Wait();
182215
return result;
183216
}
@@ -186,47 +219,49 @@ bool WebTransportStreamImpl::CanWrite() const {
186219
if (io_runner_->BelongsToCurrentThread()) {
187220
return stream_->CanWrite();
188221
}
189-
bool result;
222+
bool result = false;
190223
base::WaitableEvent done(base::WaitableEvent::ResetPolicy::AUTOMATIC,
191224
base::WaitableEvent::InitialState::NOT_SIGNALED);
192-
io_runner_->PostTask(
193-
FROM_HERE,
194-
base::BindOnce(
195-
[](WebTransportStreamImpl const* stream, bool& result,
196-
base::WaitableEvent* event) {
197-
result = stream->stream_->CanWrite();
198-
event->Signal();
199-
},
200-
base::Unretained(this), std::ref(result), base::Unretained(&done)));
225+
io_runner_->PostTask(FROM_HERE,
226+
base::BindOnce(
227+
[](base::WeakPtr<WebTransportStreamImpl> stream,
228+
bool& result, base::WaitableEvent* event) {
229+
if (!stream) {
230+
event->Signal();
231+
return;
232+
}
233+
result = stream->stream_->CanWrite();
234+
event->Signal();
235+
},
236+
weak_factory_.GetWeakPtr(), std::ref(result),
237+
base::Unretained(&done)));
201238
done.Wait();
202239
return result;
203240
}
204241

205242
void WebTransportStreamImpl::OnCanRead() {
206-
event_runner_->PostTask(
207-
FROM_HERE,
208-
base::BindOnce(&WebTransportStreamImpl::OnCanReadOnCurrentThread,
209-
weak_factory_.GetWeakPtr()));
210-
}
211-
212-
void WebTransportStreamImpl::OnCanWrite() {
213-
event_runner_->PostTask(
214-
FROM_HERE,
215-
base::BindOnce(&WebTransportStreamImpl::OnCanWriteOnCurrentThread,
216-
weak_factory_.GetWeakPtr()));
217-
}
218-
219-
void WebTransportStreamImpl::OnCanReadOnCurrentThread() {
220243
if (visitor_) {
221244
visitor_->OnCanRead();
222245
}
223246
}
224247

225-
void WebTransportStreamImpl::OnCanWriteOnCurrentThread() {
248+
void WebTransportStreamImpl::OnCanWrite() {
226249
if (visitor_) {
227250
visitor_->OnCanWrite();
228251
}
229252
}
230253

254+
void WebTransportStreamImpl::OnResetStreamReceived(
255+
::quic::WebTransportStreamError error) {
256+
write_side_closed_ = true;
257+
}
258+
259+
void WebTransportStreamImpl::OnStopSendingReceived(
260+
::quic::WebTransportStreamError error) {}
261+
262+
void WebTransportStreamImpl::OnSessionClosed() {
263+
write_side_closed_ = true;
264+
}
265+
231266
} // namespace quic
232267
} // namespace owt

web_transport/sdk/impl/web_transport_stream_impl.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,16 +48,17 @@ class WebTransportStreamImpl : public WebTransportStreamInterface,
4848
uint64_t BufferedDataBytes() const override;
4949
bool CanWrite() const override;
5050

51+
void OnSessionClosed();
52+
5153
// Overrides ::quic::WebTransportStreamVisitor.
5254
void OnCanRead() override;
5355
void OnCanWrite() override;
54-
void OnResetStreamReceived(::quic::WebTransportStreamError error) override {}
55-
void OnStopSendingReceived(::quic::WebTransportStreamError error) override {}
56+
void OnResetStreamReceived(::quic::WebTransportStreamError error) override;
57+
void OnStopSendingReceived(::quic::WebTransportStreamError error) override;
5658
void OnWriteSideInDataRecvdState() override {}
5759

5860
private:
5961
void OnCanReadOnCurrentThread();
60-
void OnFinReadOnCurrentThread();
6162
void OnCanWriteOnCurrentThread();
6263

6364
::quic::WebTransportStream* stream_;
@@ -68,6 +69,7 @@ class WebTransportStreamImpl : public WebTransportStreamInterface,
6869
base::SingleThreadTaskRunner* io_runner_;
6970
base::SingleThreadTaskRunner* event_runner_;
7071
owt::quic::WebTransportStreamInterface::Visitor* visitor_;
72+
bool write_side_closed_;
7173
base::WeakPtrFactory<WebTransportStreamImpl> weak_factory_{this};
7274
};
7375
} // namespace quic

0 commit comments

Comments
 (0)