Skip to content

Commit 0b67591

Browse files
authored
Merge pull request #766 from openziti/xgress-independent-close
Rework xgress to allow rx and tx sides to be closed independently. Fixes #765
2 parents e38400d + 25f4360 commit 0b67591

File tree

16 files changed

+747
-240
lines changed

16 files changed

+747
-240
lines changed

CHANGELOG.md

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,23 @@
1-
# Release notes 1.1.3
1+
# Release notes 1.2.0
2+
3+
## What's New
4+
5+
This release contains substantial revisions to the SDK flow control feature first released in v1.1.0.
6+
See the v1.1.0 release notes for more details.
7+
8+
It has now received a substantial amount of testing including long running tests and backwards compability testing.
9+
10+
These features should be used with version 1.6.6 or newer of OpenZiti.
11+
12+
It is still considered experimental, and the feature and APIs may still change, however Go SDK
13+
users who are multi-plexing connections, are encouraged to try it out.
14+
15+
Once it has undergone sufficient soak time in a production environment, it will marked as stable.
216

317
## Issues Fixed and Dependency Updates
418

5-
* github.com/openziti/sdk-golang: [v1.1.2 -> v1.1.3](https://github.com/openziti/sdk-golang/compare/v1.1.2...v1.1.3)
19+
* github.com/openziti/sdk-golang: [v1.1.2 -> v1.2.0](https://github.com/openziti/sdk-golang/compare/v1.1.2...v1.2.0)
20+
* [Issue #765](https://github.com/openziti/sdk-golang/issues/765) - Allow independent close of xgress send and receive
621
* [Issue #763](https://github.com/openziti/sdk-golang/issues/763) - Use a go-routine pool for payload ingest
722
* [Issue #761](https://github.com/openziti/sdk-golang/issues/761) - Use cmap.ConcurrentMap for message multiplexer
823
* [Issue #754](https://github.com/openziti/sdk-golang/issues/754) - panic: unaligned 64-bit atomic operation when running on 32-bit raspberry pi

version

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.1
1+
1.2

xgress/link_send_buffer.go

Lines changed: 66 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@
1717
package xgress
1818

1919
import (
20+
"context"
2021
"github.com/michaelquigley/pfxlog"
21-
"github.com/pkg/errors"
2222
"github.com/sirupsen/logrus"
2323
"math"
24+
"os"
2425
"slices"
2526
"sync/atomic"
2627
"time"
@@ -52,6 +53,7 @@ type LinkSendBuffer struct {
5253
closeWhenEmpty atomic.Bool
5354
inspectRequests chan *sendBufferInspectEvent
5455
blockedSince time.Time
56+
closeStart time.Time
5557
}
5658

5759
type txPayload struct {
@@ -111,22 +113,39 @@ func NewLinkSendBuffer(x *Xgress) *LinkSendBuffer {
111113
inspectRequests: make(chan *sendBufferInspectEvent, 1),
112114
}
113115

114-
go buffer.run()
115116
return buffer
116117
}
117118

118119
func (buffer *LinkSendBuffer) CloseWhenEmpty() bool {
120+
pfxlog.ContextLogger(buffer.x.Label()).Debug("close when empty")
119121
return buffer.closeWhenEmpty.CompareAndSwap(false, true)
120122
}
121123

122124
func (buffer *LinkSendBuffer) BufferPayload(payload *Payload) (func(), error) {
123125
txPayload := &txPayload{payload: payload, age: math.MaxInt64, x: buffer.x}
126+
124127
select {
125128
case buffer.newlyBuffered <- txPayload:
126129
pfxlog.ContextLogger(buffer.x.Label()).Debugf("buffered [%d]", payload.GetSequence())
127130
return txPayload.markSent, nil
128131
case <-buffer.closeNotify:
129-
return nil, errors.Errorf("payload buffer closed")
132+
return nil, ErrWriteClosed
133+
}
134+
}
135+
136+
func (buffer *LinkSendBuffer) BufferPayloadWithDeadline(payload *Payload, ctx context.Context) (func(), error) {
137+
txPayload := &txPayload{payload: payload, age: math.MaxInt64, x: buffer.x}
138+
139+
for {
140+
select {
141+
case <-ctx.Done():
142+
return nil, os.ErrDeadlineExceeded
143+
case buffer.newlyBuffered <- txPayload:
144+
pfxlog.ContextLogger(buffer.x.Label()).Debugf("buffered [%d]", payload.GetSequence())
145+
return txPayload.markSent, nil
146+
case <-buffer.closeNotify:
147+
return nil, ErrWriteClosed
148+
}
130149
}
131150
}
132151

@@ -151,10 +170,15 @@ func (buffer *LinkSendBuffer) metrics() Metrics {
151170
}
152171

153172
func (buffer *LinkSendBuffer) Close() {
154-
pfxlog.ContextLogger(buffer.x.Label()).Debugf("[%p] closing", buffer)
155173
if buffer.closed.CompareAndSwap(false, true) {
174+
pfxlog.ContextLogger(buffer.x.Label()).Debugf("[%p] closing", buffer)
156175
close(buffer.closeNotify)
157176
}
177+
buffer.x.closeIfRxAndTxDone()
178+
}
179+
180+
func (buffer *LinkSendBuffer) IsClosed() bool {
181+
return buffer.closed.Load()
158182
}
159183

160184
func (buffer *LinkSendBuffer) isBlocked() bool {
@@ -211,7 +235,7 @@ func (buffer *LinkSendBuffer) run() {
211235
case ack := <-buffer.newlyReceivedAcks:
212236
buffer.receiveAcknowledgement(ack)
213237
case <-buffer.closeNotify:
214-
buffer.close()
238+
buffer.cleanupMetrics()
215239
return
216240
default:
217241
}
@@ -232,7 +256,7 @@ func (buffer *LinkSendBuffer) run() {
232256
log.Tracef("buffering payload %v with size %v. payload buffer size: %v",
233257
txPayload.payload.Sequence, len(txPayload.payload.Data), buffer.linkSendBufferSize)
234258
case <-buffer.closeNotify:
235-
buffer.close()
259+
buffer.cleanupMetrics()
236260
return
237261
default:
238262
}
@@ -245,9 +269,7 @@ func (buffer *LinkSendBuffer) run() {
245269
case ack := <-buffer.newlyReceivedAcks:
246270
buffer.receiveAcknowledgement(ack)
247271
buffer.retransmit()
248-
if buffer.closeWhenEmpty.Load() && len(buffer.buffer) == 0 && !buffer.x.Closed() && buffer.x.IsEndOfCircuitSent() {
249-
go buffer.x.Close()
250-
}
272+
buffer.checkForClose()
251273

252274
case txPayload := <-buffered:
253275
buffer.buffer[txPayload.payload.GetSequence()] = txPayload
@@ -259,15 +281,46 @@ func (buffer *LinkSendBuffer) run() {
259281

260282
case <-retransmitTicker.C:
261283
buffer.retransmit()
284+
buffer.checkForClose()
262285

263286
case <-buffer.closeNotify:
264-
buffer.close()
287+
buffer.cleanupMetrics()
288+
if len(buffer.buffer) > 0 {
289+
isCircuitEnd := false
290+
if len(buffer.buffer) == 1 {
291+
for _, p := range buffer.buffer {
292+
isCircuitEnd = p.payload.IsCircuitEndFlagSet() || p.payload.IsFlagEOFSet()
293+
}
294+
}
295+
if !isCircuitEnd {
296+
log.WithField("payloadCount", len(buffer.buffer)).Warn("closing while buffer contains unacked payloads")
297+
}
298+
}
265299
return
266300
}
267301
}
268302
}
269303

270-
func (buffer *LinkSendBuffer) close() {
304+
func (buffer *LinkSendBuffer) checkForClose() {
305+
if buffer.closeWhenEmpty.Load() {
306+
if buffer.closeStart.IsZero() {
307+
buffer.closeStart = time.Now()
308+
}
309+
closeDuration := time.Since(buffer.closeStart)
310+
311+
if (len(buffer.buffer) == 0 && closeDuration > 5*time.Second) || closeDuration > buffer.x.Options.MaxCloseWait {
312+
buffer.Close()
313+
} else if len(buffer.buffer) == 1 && closeDuration > 5*time.Second {
314+
for _, p := range buffer.buffer {
315+
if p.payload.IsCircuitEndFlagSet() || p.payload.IsFlagEOFSet() {
316+
buffer.Close()
317+
}
318+
}
319+
}
320+
}
321+
}
322+
323+
func (buffer *LinkSendBuffer) cleanupMetrics() {
271324
if buffer.blockedByLocalWindow {
272325
buffer.metrics().BufferUnblockedByLocalWindow()
273326
}
@@ -358,7 +411,7 @@ func (buffer *LinkSendBuffer) retransmit() {
358411
}
359412

360413
if retransmitted > 0 {
361-
log.Debugf("retransmitted [%d] payloads, [%d] buffered, linkSendBufferSize: %d", retransmitted, len(buffer.buffer), buffer.linkSendBufferSize)
414+
log.WithField("circuitId", buffer.x.circuitId).Debugf("retransmitted [%d] payloads, [%d] buffered, linkSendBufferSize: %d", retransmitted, len(buffer.buffer), buffer.linkSendBufferSize)
362415
}
363416
buffer.lastRetransmitTime = now
364417
}
@@ -379,6 +432,7 @@ func (buffer *LinkSendBuffer) inspect() *SendBufferDetail {
379432
timeSinceLastRetransmit := time.Duration(time.Now().UnixMilli()-buffer.lastRetransmitTime) * time.Millisecond
380433
result := &SendBufferDetail{
381434
WindowSize: buffer.windowsSize,
435+
QueuedPayloadCount: len(buffer.buffer),
382436
LinkSendBufferSize: buffer.linkSendBufferSize,
383437
LinkRecvBufferSize: buffer.linkRecvBufferSize,
384438
Accumulator: buffer.accumulator,

xgress/messages.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ const (
7878
PayloadFlagCircuitStart Flag = 4
7979
PayloadFlagChunk Flag = 8
8080
PayloadFlagRetransmit Flag = 16
81+
PayloadFlagEOF Flag = 32
82+
PayloadFlagWriteFailed Flag = 64
8183
)
8284

8385
func NewAcknowledgement(circuitId string, originator Originator) *Acknowledgement {
@@ -308,6 +310,14 @@ func (payload *Payload) IsCircuitEndFlagSet() bool {
308310
return isFlagSet(payload.Flags, PayloadFlagCircuitEnd)
309311
}
310312

313+
func (payload *Payload) IsFlagEOFSet() bool {
314+
return isFlagSet(payload.Flags, PayloadFlagEOF)
315+
}
316+
317+
func (payload *Payload) IsFlagWriteFailedSet() bool {
318+
return isFlagSet(payload.Flags, PayloadFlagWriteFailed)
319+
}
320+
311321
func (payload *Payload) IsCircuitStartFlagSet() bool {
312322
return isFlagSet(payload.Flags, PayloadFlagCircuitStart)
313323
}

xgress/minimal_payload_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package xgress
22

33
import (
4+
"context"
45
"encoding/binary"
56
"errors"
67
"fmt"
@@ -177,7 +178,7 @@ func (self *testIntermediary) ForwardAcknowledgement(ack *Acknowledgement, addre
177178
self.acker.SendAck(ack, address)
178179
}
179180

180-
func (self *testIntermediary) ForwardPayload(payload *Payload, x *Xgress) {
181+
func (self *testIntermediary) ForwardPayload(payload *Payload, x *Xgress, ctx context.Context) {
181182
m := payload.Marshall()
182183
self.payloadTransformer.Tx(m, nil)
183184
b, err := self.msgs.GetMarshaller()(m)

xgress/ordering_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package xgress
22

33
import (
4+
"context"
45
"encoding/binary"
56
"github.com/openziti/channel/v4"
67
"github.com/stretchr/testify/require"
@@ -64,7 +65,7 @@ func (n noopReceiveHandler) GetPayloadIngester() *PayloadIngester {
6465

6566
func (n noopReceiveHandler) ForwardAcknowledgement(*Acknowledgement, Address) {}
6667

67-
func (n noopReceiveHandler) ForwardPayload(*Payload, *Xgress) {}
68+
func (n noopReceiveHandler) ForwardPayload(*Payload, *Xgress, context.Context) {}
6869

6970
func (n noopReceiveHandler) ForwardControlMessage(*Control, *Xgress) {}
7071

0 commit comments

Comments
 (0)