Skip to content

Commit fddde90

Browse files
committed
Add deadline fallback for readers requiring additional handling
1 parent 93986ef commit fddde90

File tree

2 files changed

+29
-0
lines changed

2 files changed

+29
-0
lines changed

common/bufio/packet_reactor.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,23 @@ const (
2424
stateClosed int32 = 2
2525
)
2626

27+
type withoutReadDeadline interface {
28+
NeedAdditionalReadDeadline() bool
29+
}
30+
31+
func needAdditionalReadDeadline(rawReader any) bool {
32+
if deadlineReader, loaded := rawReader.(withoutReadDeadline); loaded {
33+
return deadlineReader.NeedAdditionalReadDeadline()
34+
}
35+
if upstream, hasUpstream := rawReader.(N.WithUpstreamReader); hasUpstream {
36+
return needAdditionalReadDeadline(upstream.UpstreamReader())
37+
}
38+
if upstream, hasUpstream := rawReader.(common.WithUpstream); hasUpstream {
39+
return needAdditionalReadDeadline(upstream.Upstream())
40+
}
41+
return false
42+
}
43+
2744
func CreatePacketPushable(reader N.PacketReader) (N.PacketPushable, bool) {
2845
if pushable, ok := reader.(N.PacketPushable); ok {
2946
return pushable, true
@@ -222,6 +239,12 @@ func (r *PacketReactor) prepareStream(conn *reactorConnection, source N.PacketRe
222239
}
223240

224241
func (r *PacketReactor) registerStream(stream *reactorStream) {
242+
if needAdditionalReadDeadline(stream.source) {
243+
r.logger.Trace("packet stream: needs additional deadline handling, using legacy copy")
244+
go stream.runLegacyCopy()
245+
return
246+
}
247+
225248
if stream.pushable != nil {
226249
r.logger.Trace("packet stream: using pushable mode")
227250
stream.pushable.SetOnDataReady(func() {

common/bufio/stream_reactor.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,12 @@ func (r *StreamReactor) prepareDirection(conn *streamConnection, source io.Reade
215215
}
216216

217217
func (r *StreamReactor) registerDirection(direction *streamDirection) {
218+
if needAdditionalReadDeadline(direction.source) {
219+
r.logger.Trace("stream direction: needs additional deadline handling, using legacy copy")
220+
go direction.runLegacyCopy()
221+
return
222+
}
223+
218224
// Check if there's buffered data that needs processing first
219225
if direction.pollable != nil && direction.pollable.Buffered() > 0 {
220226
r.logger.Trace("stream direction: has buffered data, starting active loop")

0 commit comments

Comments
 (0)