@@ -175,6 +175,61 @@ class IgnoreAllRead : public ProgressiveReader {
175175 void OnEndOfMessage (const butil::Status&) {}
176176};
177177
178+ class ProgressiveTimeoutRead : public ProgressiveReader {
179+ public:
180+ explicit ProgressiveTimeoutRead (Controller* cntl, ProgressiveReader* reader):
181+ _cntl(cntl), _reader(reader), _timeout_id(0 ), _latest_add_timer_ms(0 ), _add_timer_delay_ms(1000 ) {
182+ AddIdleReadTimeoutMonitor ();
183+ }
184+ butil::Status OnReadOnePart (const void * data, size_t length) {
185+ auto status = _reader->OnReadOnePart (data, length);
186+ AddIdleReadTimeoutMonitor ();
187+ return status;
188+ }
189+ void OnEndOfMessage (const butil::Status& status) {
190+ _reader->OnEndOfMessage (status);
191+ if (_timeout_id > 0 ) {
192+ bthread_timer_del (_timeout_id);
193+ }
194+ }
195+ private:
196+ void AddToTimer () {
197+ if (_timeout_id > 0 ) {
198+ bthread_timer_del (_timeout_id);
199+ }
200+ bthread_timer_add (&_timeout_id,
201+ butil::milliseconds_from_now (_cntl->progressive_read_timeout_ms ()),
202+ Controller::HandleIdleProgressiveReader,
203+ _cntl
204+ );
205+ }
206+
207+ void AddIdleReadTimeoutMonitor () {
208+ if (_cntl->progressive_read_timeout_ms () <= 0 ) {
209+ return ;
210+ }
211+ if (_cntl->progressive_read_timeout_ms () < _add_timer_delay_ms) {
212+ AddToTimer ();
213+ return ;
214+ }
215+ auto current_ms = butil::cpuwide_time_ms ();
216+ if (current_ms - _latest_add_timer_ms < _add_timer_delay_ms) {
217+ return ;
218+ }
219+ _latest_add_timer_ms = current_ms;
220+ AddToTimer ();
221+ }
222+
223+ private:
224+ Controller* _cntl;
225+ ProgressiveReader* _reader;
226+ // Timer registered to trigger progressive timeout event
227+ bthread_timer_t _timeout_id;
228+ int64_t _latest_add_timer_ms;
229+ // avoid frequently add timer for idle handler
230+ int32_t _add_timer_delay_ms;
231+ };
232+
178233static IgnoreAllRead* s_ignore_all_read = NULL ;
179234static pthread_once_t s_ignore_all_read_once = PTHREAD_ONCE_INIT;
180235static void CreateIgnoreAllRead () { s_ignore_all_read = new IgnoreAllRead; }
@@ -1037,41 +1092,52 @@ void Controller::SubmitSpan() {
10371092 _span = NULL ;
10381093}
10391094
1040- void * Controller::HandleIdleProgressiveReader (void * arg) {
1095+ void Controller::HandleIdleProgressiveReader (void * arg) {
1096+ if (arg == nullptr ){
1097+ LOG (ERROR) << " Controller::HandleIdleProgressiveReader arg is null." ;
1098+ return ;
1099+ }
10411100 auto * cntl = static_cast <Controller*>(arg);
1042- const uint64_t CHECK_INTERVAL_US = 1000000UL ;
10431101 auto log_idle = FLAGS_log_idle_progressive_read_close;
1102+ int64_t progressive_read_timeout_us = cntl->_progressive_read_timeout_ms * 1000 ;
10441103 std::vector<SocketId> remove_socket_ids;
1045- while (bthread_usleep (CHECK_INTERVAL_US) == 0 ) {
1046- // TODO: this is not efficient for a lot of connections(>100K)
1047- auto socketIds = cntl->_checking_progressive_read_fds ;
1048- int64_t progressive_read_timeout_us = cntl->_progressive_read_timeout_ms * 1000 ;
1049- for (auto socket_id : socketIds){
1050- SocketUniquePtr s;
1051- if (Socket::Address (socket_id, &s) == 0 ) {
1052- auto cpuwide_time_us = butil::cpuwide_time_us ();
1053- const int64_t last_active_us = s->last_active_time_us ();
1054- if (cpuwide_time_us - last_active_us <= progressive_read_timeout_us) {
1055- continue ;
1056- }
1057- LOG_IF (INFO, log_idle) << " progressive read timeout socket id : " << socket_id
1058- << " progressive read timeout us : " << progressive_read_timeout_us
1059- << " progressive read idle duration : " << cpuwide_time_us - last_active_us;
1060- if (s->parsing_context () != NULL ) {
1061- s->parsing_context ()->Destroy ();
1062- }
1063- s->ReleaseReferenceIfIdle (0xffffffff );
1064- remove_socket_ids.push_back (socket_id);
1065- } else {
1066- LOG (ERROR) << " not found the socket id : " << socket_id;
1067- remove_socket_ids.push_back (socket_id);
1104+ butil::AutoLock guard (cntl->_progressive_read_lock );
1105+ auto socketIds = cntl->_checking_progressive_read_fds ;
1106+ for (auto socket_id : socketIds){
1107+ SocketUniquePtr s;
1108+ if (Socket::Address (socket_id, &s) == 0 ) {
1109+ int64_t pre_idle_duration_us = 0 ;
1110+ int64_t idle_duration_us = butil::cpuwide_time_us () - s->last_active_time_us ();
1111+ while (progressive_read_timeout_us > idle_duration_us && idle_duration_us > pre_idle_duration_us) {
1112+ auto sleep_ms = (progressive_read_timeout_us - idle_duration_us) / 1000 ;
1113+ bthread_usleep (sleep_ms > 0 ? sleep_ms : 1 );
1114+ pre_idle_duration_us = idle_duration_us;
1115+ idle_duration_us = butil::cpuwide_time_us () - s->last_active_time_us ();
10681116 }
1069- }
1070- for (auto remove_socket_id : remove_socket_ids) {
1071- socketIds.erase (remove_socket_id);
1117+ if (idle_duration_us <= pre_idle_duration_us) {
1118+ LOG_IF (INFO, log_idle) << " stop pgressive read timeout checking process!"
1119+ << " progressive_read_timeout_us : " << progressive_read_timeout_us
1120+ << " idle_duration_us : " << idle_duration_us
1121+ << " pre_idle_duration_us : " << pre_idle_duration_us;
1122+ return ;
1123+ }
1124+ LOG_IF (INFO, log_idle) << " progressive read timeout socket id : " << socket_id
1125+ << " progressive read timeout us : " << progressive_read_timeout_us
1126+ << " progressive read idle duration : " << idle_duration_us;
1127+ if (s->parsing_context () != NULL ) {
1128+ s->parsing_context ()->Destroy ();
1129+ }
1130+ s->ReleaseReferenceIfIdle (0 );
1131+ cntl->CloseConnection (" progressive read timeout" );
1132+ remove_socket_ids.push_back (socket_id);
1133+ } else {
1134+ LOG (ERROR) << " not found the socket id : " << socket_id;
1135+ remove_socket_ids.push_back (socket_id);
10721136 }
10731137 }
1074- return NULL ;
1138+ for (auto remove_socket_id : remove_socket_ids) {
1139+ socketIds.erase (remove_socket_id);
1140+ }
10751141}
10761142
10771143void Controller::HandleSendFailed () {
@@ -1227,9 +1293,9 @@ void Controller::IssueRPC(int64_t start_realtime_us) {
12271293 // stop before reading all body.
12281294 _current_call.sending_sock ->read_will_be_progressive (_connection_type);
12291295 auto socket_id = _current_call.sending_sock ->id ();
1296+ butil::AutoLock guard (_progressive_read_lock);
12301297 if (_progressive_read_timeout_ms > 0 && _checking_progressive_read_fds.seek (socket_id) == NULL ) {
12311298 _checking_progressive_read_fds.insert (socket_id);
1232- LOG (INFO) << " insert the progressive read fd : " << socket_id << " socket fds size : " << _checking_progressive_read_fds.size ();
12331299 }
12341300 }
12351301
@@ -1594,6 +1660,9 @@ void Controller::ReadProgressiveAttachmentBy(ProgressiveReader* r) {
15941660 __FUNCTION__));
15951661 }
15961662 add_flag (FLAGS_PROGRESSIVE_READER);
1663+ if (progressive_read_timeout_ms () > 0 ) {
1664+ return _rpa->ReadProgressiveAttachmentBy (new ProgressiveTimeoutRead (this , r));
1665+ }
15971666 return _rpa->ReadProgressiveAttachmentBy (r);
15981667}
15991668
0 commit comments