Skip to content

Commit 1951fea

Browse files
committed
feat: add buffer config
Ref: #33 Signed-off-by: dankeboy36 <[email protected]>
1 parent 1172c7b commit 1951fea

File tree

7 files changed

+1130
-16
lines changed

7 files changed

+1130
-16
lines changed

.github/workflows/check-go-task.yml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,25 @@ jobs:
118118
- name: Check formatting
119119
run: git diff --color --exit-code
120120

121+
run-tests:
122+
name: run-tests
123+
runs-on: ubuntu-latest
124+
125+
strategy:
126+
fail-fast: false
127+
128+
steps:
129+
- name: Checkout repository
130+
uses: actions/checkout@v5
131+
132+
- name: Install Go
133+
uses: actions/setup-go@v5
134+
with:
135+
go-version: ${{ env.GO_VERSION }}
136+
137+
- name: Run tests
138+
run: task go:test
139+
121140
check-config:
122141
name: check-config (${{ matrix.module.path }})
123142
runs-on: ubuntu-latest

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
dist/
2-
2+
coverage.out

Taskfile.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,3 +147,8 @@ tasks:
147147
desc: Format Go code
148148
cmds:
149149
- go fmt {{default .DEFAULT_GO_PACKAGES .GO_PACKAGES}}
150+
151+
go:test:
152+
desc: Test Go code
153+
cmds:
154+
- go test -v -race -covermode=atomic -coverprofile=coverage.out ./...

buffering.go

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
//
2+
// This file is part of pluggable-monitor-protocol-handler.
3+
//
4+
// Copyright 2025 ARDUINO SA (http://www.arduino.cc/)
5+
//
6+
// This software is released under the GNU General Public License version 3,
7+
// which covers the main part of arduino-cli.
8+
// The terms of this license can be found at:
9+
// https://www.gnu.org/licenses/gpl-3.0.en.html
10+
//
11+
// You can be released from the requirements of the above licenses by purchasing
12+
// a commercial license. Buying such a license is mandatory if you want to modify or
13+
// otherwise use the software for commercial activities involving the Arduino
14+
// software without disclosing the source code of your own applications. To purchase
15+
// a commercial license, send an email to [email protected].
16+
//
17+
18+
package monitor
19+
20+
import (
21+
"bytes"
22+
"io"
23+
"sync"
24+
"time"
25+
)
26+
27+
type BufferConfig struct {
28+
HighWaterMark int // >= 1
29+
FlushInterval time.Duration // 0 to disable time-based flush
30+
LineBuffering bool
31+
// FlushQueueCapacity controls how many aggregated payloads can be queued for the TCP writer.
32+
// If <= 0 a sensible default is applied.
33+
FlushQueueCapacity int
34+
// OverflowStrategy controls what to do if the queue is full when a flush happens.
35+
// Supported values:
36+
// "drop" (default) -> drop the payload rather than blocking the port reader.
37+
// "wait" -> wait up to OverflowWait duration for space, then drop if still full.
38+
OverflowStrategy string
39+
// OverflowWait is only used when OverflowStrategy == "wait". If <= 0, the behavior falls back to "drop".
40+
OverflowWait time.Duration
41+
}
42+
43+
func (c BufferConfig) normalized() BufferConfig {
44+
cfg := c
45+
if cfg.HighWaterMark < 1 {
46+
cfg.HighWaterMark = 1
47+
}
48+
if cfg.FlushQueueCapacity <= 0 {
49+
cfg.FlushQueueCapacity = 32
50+
}
51+
if cfg.OverflowStrategy == "" {
52+
cfg.OverflowStrategy = "drop"
53+
}
54+
if cfg.OverflowWait < 0 {
55+
cfg.OverflowWait = 0
56+
}
57+
return cfg
58+
}
59+
60+
type Option func(*Server)
61+
62+
func WithBufferConfig(cfg BufferConfig) Option {
63+
return func(s *Server) {
64+
s.bufCfg = cfg.normalized()
65+
}
66+
}
67+
68+
// internal aggregator used in port->conn pump
69+
type aggregator struct {
70+
cfg BufferConfig
71+
mu sync.Mutex
72+
buf bytes.Buffer
73+
timer *time.Timer
74+
flushC chan []byte
75+
closed bool
76+
}
77+
78+
func newAggregator(cfg BufferConfig) *aggregator {
79+
cfg = cfg.normalized()
80+
return &aggregator{
81+
cfg: cfg,
82+
flushC: make(chan []byte, cfg.FlushQueueCapacity),
83+
}
84+
}
85+
86+
func (a *aggregator) addChunk(p []byte) {
87+
a.mu.Lock()
88+
defer a.mu.Unlock()
89+
90+
if a.closed {
91+
return
92+
}
93+
94+
if len(p) == 0 {
95+
return
96+
}
97+
// append
98+
a.buf.Write(p)
99+
100+
// schedule timer if needed and not running
101+
if a.cfg.FlushInterval > 0 && a.timer == nil {
102+
a.timer = time.AfterFunc(a.cfg.FlushInterval, a.onTimer)
103+
}
104+
105+
// flush conditions
106+
if a.buf.Len() >= a.cfg.HighWaterMark || (a.cfg.LineBuffering && bytes.Contains(p, []byte{'\n'})) {
107+
a.flushLocked()
108+
}
109+
}
110+
111+
func (a *aggregator) onTimer() {
112+
a.mu.Lock()
113+
defer a.mu.Unlock()
114+
a.flushLocked()
115+
}
116+
117+
func (a *aggregator) flushLocked() {
118+
if a.timer != nil {
119+
a.timer.Stop()
120+
a.timer = nil
121+
}
122+
if a.buf.Len() == 0 {
123+
return
124+
}
125+
out := make([]byte, a.buf.Len())
126+
copy(out, a.buf.Bytes())
127+
a.buf.Reset()
128+
switch a.cfg.OverflowStrategy {
129+
case "wait":
130+
// To avoid deadlocks, do not wait indefinitely while holding the mutex.
131+
// We allow a bounded wait; after that, drop.
132+
if a.cfg.OverflowWait <= 0 {
133+
select {
134+
case a.flushC <- out:
135+
default:
136+
// drop
137+
}
138+
return
139+
}
140+
deadline := time.NewTimer(a.cfg.OverflowWait)
141+
defer deadline.Stop()
142+
select {
143+
case a.flushC <- out:
144+
case <-deadline.C:
145+
// drop after bounded wait
146+
}
147+
default: // "drop"
148+
select {
149+
case a.flushC <- out:
150+
default:
151+
// drop if consumer is slow
152+
}
153+
}
154+
}
155+
156+
func (a *aggregator) drainTo(w io.Writer) error {
157+
for payload := range a.flushC {
158+
if len(payload) == 0 {
159+
continue
160+
}
161+
if _, err := w.Write(payload); err != nil {
162+
return err
163+
}
164+
}
165+
return nil
166+
}
167+
168+
func (a *aggregator) close() {
169+
a.mu.Lock()
170+
defer a.mu.Unlock()
171+
if a.closed {
172+
return
173+
}
174+
// Stop timer first to prevent future callbacks
175+
if a.timer != nil {
176+
a.timer.Stop()
177+
a.timer = nil
178+
}
179+
// Flush any remaining data while the channel is still open
180+
a.flushLocked()
181+
// Now mark closed and close the channel
182+
a.closed = true
183+
close(a.flushC)
184+
}

buffering_internal_test.go

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
//
2+
// This file is part of pluggable-monitor-protocol-handler.
3+
//
4+
// Copyright 2025 ARDUINO SA (http://www.arduino.cc/)
5+
//
6+
// This software is released under the GNU General Public License version 3,
7+
// which covers the main part of arduino-cli.
8+
// The terms of this license can be found at:
9+
// https://www.gnu.org/licenses/gpl-3.0.en.html
10+
//
11+
// You can be released from the requirements of the above licenses by purchasing
12+
// a commercial license. Buying such a license is mandatory if you want to modify or
13+
// otherwise use the software for commercial activities involving the Arduino
14+
// software without disclosing the source code of your own applications. To purchase
15+
// a commercial license, send an email to [email protected].
16+
//
17+
18+
package monitor
19+
20+
import (
21+
"testing"
22+
"time"
23+
)
24+
25+
// These tests exercise the aggregator's overflow behavior directly, without
26+
// going through the TCP server. We intentionally do NOT start drainTo(), so
27+
// the flush queue can fill up and trigger the configured strategy.
28+
29+
func Test_Aggregator_Overflow_Drop(t *testing.T) {
30+
cfg := BufferConfig{
31+
HighWaterMark: 1, // flush every byte
32+
FlushInterval: 0,
33+
LineBuffering: false,
34+
FlushQueueCapacity: 1, // tiny queue to force overflow
35+
OverflowStrategy: "drop",
36+
}
37+
a := newAggregator(cfg)
38+
39+
// Push a burst of bytes quickly; with capacity=1 and no consumer, only the
40+
// first payload can be queued, the rest should be dropped.
41+
for i := 0; i < 50; i++ {
42+
a.addChunk([]byte{'a' + byte(i%26)})
43+
}
44+
// close will try a final flush; with a full queue and strategy=drop, it drops too.
45+
a.close()
46+
47+
// Drain whatever remained in the queue and count.
48+
total := 0
49+
count := 0
50+
for p := range a.flushC {
51+
count++
52+
total += len(p)
53+
}
54+
55+
if count > 1 { // capacity is 1, so at most 1 payload can remain queued
56+
t.Fatalf("expected at most 1 queued payload, got %d", count)
57+
}
58+
if total >= 50 { // we queued 50 bytes, but nearly all should be dropped
59+
t.Fatalf("expected drops with strategy=drop; total delivered=%d", total)
60+
}
61+
}
62+
63+
func Test_Aggregator_Overflow_Wait(t *testing.T) {
64+
cfg := BufferConfig{
65+
HighWaterMark: 1,
66+
FlushInterval: 0,
67+
LineBuffering: false,
68+
FlushQueueCapacity: 1,
69+
OverflowStrategy: "wait",
70+
OverflowWait: 25 * time.Millisecond,
71+
}
72+
a := newAggregator(cfg)
73+
74+
start := time.Now()
75+
for i := 0; i < 3; i++ {
76+
a.addChunk([]byte{'x'}) // first enqueues; next two wait ~OverflowWait then drop
77+
}
78+
a.close()
79+
elapsed := time.Since(start)
80+
81+
// Drain queue and count payloads
82+
count := 0
83+
for range a.flushC {
84+
count++
85+
}
86+
if count != 1 {
87+
t.Fatalf("expected exactly 1 queued payload with capacity=1 and no consumer; got %d", count)
88+
}
89+
90+
// We called addChunk 3 times; with OverflowWait=25ms, we expect ~50ms waiting.
91+
// Allow slack for CI.
92+
if elapsed < 40*time.Millisecond {
93+
t.Fatalf("expected bounded waiting to take noticeable time; elapsed=%v (<40ms)", elapsed)
94+
}
95+
}
96+
97+
func Test_Aggregator_QueueCapacity(t *testing.T) {
98+
cfg := BufferConfig{
99+
HighWaterMark: 1,
100+
FlushInterval: 0,
101+
LineBuffering: false,
102+
FlushQueueCapacity: 3, // allow a few in-flight payloads
103+
OverflowStrategy: "drop",
104+
}
105+
a := newAggregator(cfg)
106+
107+
for i := 0; i < 10; i++ {
108+
a.addChunk([]byte{'A' + byte(i%26)})
109+
}
110+
a.close()
111+
112+
count := 0
113+
for range a.flushC {
114+
count++
115+
}
116+
if count > 3 {
117+
t.Fatalf("expected at most queue capacity (3) payloads, got %d", count)
118+
}
119+
}

0 commit comments

Comments
 (0)