Skip to content
This repository was archived by the owner on Oct 25, 2024. It is now read-only.

Commit 0448298

Browse files
TommiCommit Bot
authored andcommitted
Revert "[Sheriff] Revert "Remove MessageHandler[AutoCleanup] dependency from StreamInterface.""
This reverts commit af05c83. Reason for revert: The failure in remoting_unittests has been addressed. Original change's description: > [Sheriff] Revert "Remove MessageHandler[AutoCleanup] dependency from StreamInterface." > > This reverts commit eb79dd9. > > Reason for revert: breaks WebRTC roll into Chrome: > https://crrev.com/c/2445696 > > Sample failure: > https://ci.chromium.org/p/chromium/builders/try/linux-rel/506049 > [ RUN ] PseudoTcpAdapterTest.DeleteOnConnected > > Original change's description: > > Remove MessageHandler[AutoCleanup] dependency from StreamInterface. > > > > This includes relying on related types such as MessageData and > > PostEvent functionality inside the StreamInterface itself. > > > > This affects mostly tests but OpenSSLStreamAdapter > > requires special attention. > > > > Bug: webrtc:11988 > > Change-Id: Ib5c895f1bdf77bb49e3162bd49718f8a98812d91 > > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/185505 > > Commit-Queue: Tommi <[email protected]> > > Reviewed-by: Karl Wiberg <[email protected]> > > Cr-Commit-Position: refs/heads/master@{#32290} > > [email protected],[email protected] > > Change-Id: I23d7a311a73c739eba872a21e6123235465c28cc > No-Presubmit: true > No-Tree-Checks: true > No-Try: true > Bug: webrtc:11988 > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/186564 > Commit-Queue: Marina Ciocea <[email protected]> > Reviewed-by: Marina Ciocea <[email protected]> > Cr-Commit-Position: refs/heads/master@{#32299} [email protected],[email protected],[email protected] # Not skipping CQ checks because original CL landed > 1 day ago. Bug: webrtc:11988 Change-Id: Iff07e0943fc5dded9eeed5c2626798691594300d Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/186700 Reviewed-by: Tommi <[email protected]> Commit-Queue: Tommi <[email protected]> Cr-Commit-Position: refs/heads/master@{#32314}
1 parent 6556ed2 commit 0448298

File tree

9 files changed

+86
-66
lines changed

9 files changed

+86
-66
lines changed

rtc_base/BUILD.gn

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -835,6 +835,7 @@ rtc_library("rtc_base") {
835835
"system:no_unique_address",
836836
"system:rtc_export",
837837
"task_utils:pending_task_safety_flag",
838+
"task_utils:repeating_task",
838839
"task_utils:to_queued_task",
839840
"third_party/base64",
840841
"third_party/sigslot",
@@ -1425,6 +1426,7 @@ if (rtc_include_tests) {
14251426
"memory:fifo_buffer",
14261427
"synchronization:mutex",
14271428
"synchronization:synchronization_unittests",
1429+
"task_utils:pending_task_safety_flag",
14281430
"task_utils:to_queued_task",
14291431
"third_party/sigslot",
14301432
]

rtc_base/memory/BUILD.gn

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,15 @@ rtc_library("aligned_malloc") {
2020
deps = [ "..:checks" ]
2121
}
2222

23+
# Test only utility.
24+
# TODO: Tag with `testonly = true` once all depending targets are correctly
25+
# tagged.
2326
rtc_library("fifo_buffer") {
2427
visibility = [
25-
"../../p2p:rtc_p2p",
28+
":unittests",
2629
"..:rtc_base_tests_utils",
2730
"..:rtc_base_unittests",
28-
":unittests",
31+
"../../p2p:rtc_p2p", # This needs to be fixed.
2932
]
3033
sources = [
3134
"fifo_buffer.cc",
@@ -34,6 +37,8 @@ rtc_library("fifo_buffer") {
3437
deps = [
3538
"..:rtc_base",
3639
"../synchronization:mutex",
40+
"../task_utils:pending_task_safety_flag",
41+
"../task_utils:to_queued_task",
3742
]
3843
}
3944

rtc_base/memory/fifo_buffer.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ StreamResult FifoBuffer::Read(void* buffer,
104104

105105
// if we were full before, and now we're not, post an event
106106
if (!was_writable && copy > 0) {
107-
PostEvent(owner_, SE_WRITE, 0);
107+
PostEvent(SE_WRITE, 0);
108108
}
109109
}
110110
return result;
@@ -129,7 +129,7 @@ StreamResult FifoBuffer::Write(const void* buffer,
129129

130130
// if we didn't have any data to read before, and now we do, post an event
131131
if (!was_readable && copy > 0) {
132-
PostEvent(owner_, SE_READ, 0);
132+
PostEvent(SE_READ, 0);
133133
}
134134
}
135135
return result;
@@ -155,7 +155,7 @@ void FifoBuffer::ConsumeReadData(size_t size) {
155155
read_position_ = (read_position_ + size) % buffer_length_;
156156
data_length_ -= size;
157157
if (!was_writable && size > 0) {
158-
PostEvent(owner_, SE_WRITE, 0);
158+
PostEvent(SE_WRITE, 0);
159159
}
160160
}
161161

@@ -185,7 +185,7 @@ void FifoBuffer::ConsumeWriteBuffer(size_t size) {
185185
const bool was_readable = (data_length_ > 0);
186186
data_length_ += size;
187187
if (!was_readable && size > 0) {
188-
PostEvent(owner_, SE_READ, 0);
188+
PostEvent(SE_READ, 0);
189189
}
190190
}
191191

rtc_base/memory/fifo_buffer.h

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
#include "rtc_base/stream.h"
1717
#include "rtc_base/synchronization/mutex.h"
18+
#include "rtc_base/task_utils/pending_task_safety_flag.h"
19+
#include "rtc_base/task_utils/to_queued_task.h"
1820

1921
namespace rtc {
2022

@@ -98,6 +100,12 @@ class FifoBuffer final : public StreamInterface {
98100
bool GetWriteRemaining(size_t* size) const;
99101

100102
private:
103+
void PostEvent(int events, int err) {
104+
owner_->PostTask(webrtc::ToQueuedTask(task_safety_, [this, events, err]() {
105+
SignalEvent(this, events, err);
106+
}));
107+
}
108+
101109
// Helper method that implements ReadOffset. Caller must acquire a lock
102110
// when calling this method.
103111
StreamResult ReadOffsetLocked(void* buffer,
@@ -114,6 +122,8 @@ class FifoBuffer final : public StreamInterface {
114122
size_t* bytes_written)
115123
RTC_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
116124

125+
webrtc::ScopedTaskSafety task_safety_;
126+
117127
// keeps the opened/closed state of the stream
118128
StreamState state_ RTC_GUARDED_BY(mutex_);
119129
// the allocated buffer
@@ -125,7 +135,7 @@ class FifoBuffer final : public StreamInterface {
125135
// offset to the readable data
126136
size_t read_position_ RTC_GUARDED_BY(mutex_);
127137
// stream callbacks are dispatched on this thread
128-
Thread* owner_;
138+
Thread* const owner_;
129139
// object lock
130140
mutable webrtc::Mutex mutex_;
131141
RTC_DISALLOW_COPY_AND_ASSIGN(FifoBuffer);

rtc_base/openssl_stream_adapter.cc

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
#include "rtc_base/openssl_identity.h"
3636
#include "rtc_base/ssl_certificate.h"
3737
#include "rtc_base/stream.h"
38+
#include "rtc_base/task_utils/to_queued_task.h"
3839
#include "rtc_base/thread.h"
3940
#include "rtc_base/time_utils.h"
4041
#include "system_wrappers/include/field_trial.h"
@@ -283,6 +284,7 @@ bool ShouldAllowLegacyTLSProtocols() {
283284
OpenSSLStreamAdapter::OpenSSLStreamAdapter(
284285
std::unique_ptr<StreamInterface> stream)
285286
: SSLStreamAdapter(std::move(stream)),
287+
owner_(rtc::Thread::Current()),
286288
state_(SSL_NONE),
287289
role_(SSL_CLIENT),
288290
ssl_read_needs_write_(false),
@@ -296,6 +298,7 @@ OpenSSLStreamAdapter::OpenSSLStreamAdapter(
296298
support_legacy_tls_protocols_flag_(ShouldAllowLegacyTLSProtocols()) {}
297299

298300
OpenSSLStreamAdapter::~OpenSSLStreamAdapter() {
301+
timeout_task_.Stop();
299302
Cleanup(0);
300303
}
301304

@@ -801,6 +804,33 @@ void OpenSSLStreamAdapter::OnEvent(StreamInterface* stream,
801804
}
802805
}
803806

807+
void OpenSSLStreamAdapter::PostEvent(int events, int err) {
808+
owner_->PostTask(webrtc::ToQueuedTask(
809+
task_safety_, [this, events, err]() { SignalEvent(this, events, err); }));
810+
}
811+
812+
void OpenSSLStreamAdapter::SetTimeout(int delay_ms) {
813+
// We need to accept 0 delay here as well as >0 delay, because
814+
// DTLSv1_get_timeout seems to frequently return 0 ms.
815+
RTC_DCHECK_GE(delay_ms, 0);
816+
RTC_DCHECK(!timeout_task_.Running());
817+
818+
timeout_task_ = webrtc::RepeatingTaskHandle::DelayedStart(
819+
owner_, webrtc::TimeDelta::Millis(delay_ms),
820+
[flag = task_safety_.flag(), this]() {
821+
if (flag->alive()) {
822+
RTC_DLOG(LS_INFO) << "DTLS timeout expired";
823+
timeout_task_.Stop();
824+
DTLSv1_handle_timeout(ssl_);
825+
ContinueSSL();
826+
} else {
827+
RTC_NOTREACHED();
828+
}
829+
// This callback will never run again (stopped above).
830+
return webrtc::TimeDelta::PlusInfinity();
831+
});
832+
}
833+
804834
int OpenSSLStreamAdapter::BeginSSL() {
805835
RTC_DCHECK(state_ == SSL_CONNECTING);
806836
// The underlying stream has opened.
@@ -851,7 +881,7 @@ int OpenSSLStreamAdapter::ContinueSSL() {
851881
RTC_DCHECK(state_ == SSL_CONNECTING);
852882

853883
// Clear the DTLS timer
854-
Thread::Current()->Clear(this, MSG_TIMEOUT);
884+
timeout_task_.Stop();
855885

856886
const int code = (role_ == SSL_CLIENT) ? SSL_connect(ssl_) : SSL_accept(ssl_);
857887
const int ssl_error = SSL_get_error(ssl_, code);
@@ -883,9 +913,7 @@ int OpenSSLStreamAdapter::ContinueSSL() {
883913
struct timeval timeout;
884914
if (DTLSv1_get_timeout(ssl_, &timeout)) {
885915
int delay = timeout.tv_sec * 1000 + timeout.tv_usec / 1000;
886-
887-
Thread::Current()->PostDelayed(RTC_FROM_HERE, delay, this, MSG_TIMEOUT,
888-
0);
916+
SetTimeout(delay);
889917
}
890918
} break;
891919

@@ -962,18 +990,7 @@ void OpenSSLStreamAdapter::Cleanup(uint8_t alert) {
962990
peer_cert_chain_.reset();
963991

964992
// Clear the DTLS timer
965-
Thread::Current()->Clear(this, MSG_TIMEOUT);
966-
}
967-
968-
void OpenSSLStreamAdapter::OnMessage(Message* msg) {
969-
// Process our own messages and then pass others to the superclass
970-
if (MSG_TIMEOUT == msg->message_id) {
971-
RTC_DLOG(LS_INFO) << "DTLS timeout expired";
972-
DTLSv1_handle_timeout(ssl_);
973-
ContinueSSL();
974-
} else {
975-
StreamInterface::OnMessage(msg);
976-
}
993+
timeout_task_.Stop();
977994
}
978995

979996
SSL_CTX* OpenSSLStreamAdapter::SetupSSLContext() {

rtc_base/openssl_stream_adapter.h

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
#include "rtc_base/ssl_stream_adapter.h"
2727
#include "rtc_base/stream.h"
2828
#include "rtc_base/system/rtc_export.h"
29+
#include "rtc_base/task_utils/pending_task_safety_flag.h"
30+
#include "rtc_base/task_utils/repeating_task.h"
2931

3032
namespace rtc {
3133

@@ -145,7 +147,8 @@ class OpenSSLStreamAdapter final : public SSLStreamAdapter {
145147
SSL_CLOSED // Clean close
146148
};
147149

148-
enum { MSG_TIMEOUT = MSG_MAX + 1 };
150+
void PostEvent(int events, int err);
151+
void SetTimeout(int delay_ms);
149152

150153
// The following three methods return 0 on success and a negative
151154
// error code on failure. The error code may be from OpenSSL or -1
@@ -169,9 +172,6 @@ class OpenSSLStreamAdapter final : public SSLStreamAdapter {
169172
void Error(const char* context, int err, uint8_t alert, bool signal);
170173
void Cleanup(uint8_t alert);
171174

172-
// Override MessageHandler
173-
void OnMessage(Message* msg) override;
174-
175175
// Flush the input buffers by reading left bytes (for DTLS)
176176
void FlushInput(unsigned int left);
177177

@@ -192,6 +192,10 @@ class OpenSSLStreamAdapter final : public SSLStreamAdapter {
192192
!peer_certificate_digest_value_.empty();
193193
}
194194

195+
rtc::Thread* const owner_;
196+
webrtc::ScopedTaskSafety task_safety_;
197+
webrtc::RepeatingTaskHandle timeout_task_;
198+
195199
SSLState state_;
196200
SSLRole role_;
197201
int ssl_error_code_; // valid when state_ == SSL_ERROR or SSL_CLOSED

rtc_base/ssl_stream_adapter_unittest.cc

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
#include "rtc_base/ssl_identity.h"
2727
#include "rtc_base/ssl_stream_adapter.h"
2828
#include "rtc_base/stream.h"
29+
#include "rtc_base/task_utils/pending_task_safety_flag.h"
30+
#include "rtc_base/task_utils/to_queued_task.h"
2931
#include "test/field_trial.h"
3032

3133
using ::testing::Combine;
@@ -214,7 +216,15 @@ class SSLDummyStreamBase : public rtc::StreamInterface,
214216
out_->Close();
215217
}
216218

217-
protected:
219+
private:
220+
void PostEvent(int events, int err) {
221+
thread_->PostTask(webrtc::ToQueuedTask(task_safety_, [this, events, err]() {
222+
SignalEvent(this, events, err);
223+
}));
224+
}
225+
226+
webrtc::ScopedTaskSafety task_safety_;
227+
rtc::Thread* const thread_ = rtc::Thread::Current();
218228
SSLStreamAdapterTestBase* test_base_;
219229
const std::string side_;
220230
rtc::StreamInterface* in_;
@@ -276,10 +286,17 @@ class BufferQueueStream : public rtc::StreamInterface {
276286

277287
protected:
278288
void NotifyReadableForTest() { PostEvent(rtc::SE_READ, 0); }
279-
280289
void NotifyWritableForTest() { PostEvent(rtc::SE_WRITE, 0); }
281290

282291
private:
292+
void PostEvent(int events, int err) {
293+
thread_->PostTask(webrtc::ToQueuedTask(task_safety_, [this, events, err]() {
294+
SignalEvent(this, events, err);
295+
}));
296+
}
297+
298+
rtc::Thread* const thread_ = rtc::Thread::Current();
299+
webrtc::ScopedTaskSafety task_safety_;
283300
rtc::BufferQueue buffer_;
284301
};
285302

rtc_base/stream.cc

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ namespace rtc {
2424
///////////////////////////////////////////////////////////////////////////////
2525
// StreamInterface
2626
///////////////////////////////////////////////////////////////////////////////
27-
StreamInterface::~StreamInterface() {}
2827

2928
StreamResult StreamInterface::WriteAll(const void* data,
3029
size_t data_len,
@@ -44,29 +43,12 @@ StreamResult StreamInterface::WriteAll(const void* data,
4443
return result;
4544
}
4645

47-
void StreamInterface::PostEvent(Thread* t, int events, int err) {
48-
t->Post(RTC_FROM_HERE, this, MSG_POST_EVENT,
49-
new StreamEventData(events, err));
50-
}
51-
52-
void StreamInterface::PostEvent(int events, int err) {
53-
PostEvent(Thread::Current(), events, err);
54-
}
55-
5646
bool StreamInterface::Flush() {
5747
return false;
5848
}
5949

6050
StreamInterface::StreamInterface() {}
6151

62-
void StreamInterface::OnMessage(Message* msg) {
63-
if (MSG_POST_EVENT == msg->message_id) {
64-
StreamEventData* pe = static_cast<StreamEventData*>(msg->pdata);
65-
SignalEvent(this, pe->events, pe->error);
66-
delete msg->pdata;
67-
}
68-
}
69-
7052
///////////////////////////////////////////////////////////////////////////////
7153
// StreamAdapterInterface
7254
///////////////////////////////////////////////////////////////////////////////

rtc_base/stream.h

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -48,16 +48,9 @@ enum StreamResult { SR_ERROR, SR_SUCCESS, SR_BLOCK, SR_EOS };
4848
// SE_WRITE: Data can be written, so Write is likely to not return SR_BLOCK
4949
enum StreamEvent { SE_OPEN = 1, SE_READ = 2, SE_WRITE = 4, SE_CLOSE = 8 };
5050

51-
struct StreamEventData : public MessageData {
52-
int events, error;
53-
StreamEventData(int ev, int er) : events(ev), error(er) {}
54-
};
55-
56-
class RTC_EXPORT StreamInterface : public MessageHandlerAutoCleanup {
51+
class RTC_EXPORT StreamInterface {
5752
public:
58-
enum { MSG_POST_EVENT = 0xF1F1, MSG_MAX = MSG_POST_EVENT };
59-
60-
~StreamInterface() override;
53+
virtual ~StreamInterface() {}
6154

6255
virtual StreamState GetState() const = 0;
6356

@@ -96,13 +89,6 @@ class RTC_EXPORT StreamInterface : public MessageHandlerAutoCleanup {
9689
// certain events will be raised in the future.
9790
sigslot::signal3<StreamInterface*, int, int> SignalEvent;
9891

99-
// Like calling SignalEvent, but posts a message to the specified thread,
100-
// which will call SignalEvent. This helps unroll the stack and prevent
101-
// re-entrancy.
102-
void PostEvent(Thread* t, int events, int err);
103-
// Like the aforementioned method, but posts to the current thread.
104-
void PostEvent(int events, int err);
105-
10692
// Return true if flush is successful.
10793
virtual bool Flush();
10894

@@ -125,9 +111,6 @@ class RTC_EXPORT StreamInterface : public MessageHandlerAutoCleanup {
125111
protected:
126112
StreamInterface();
127113

128-
// MessageHandler Interface
129-
void OnMessage(Message* msg) override;
130-
131114
private:
132115
RTC_DISALLOW_COPY_AND_ASSIGN(StreamInterface);
133116
};

0 commit comments

Comments
 (0)