Skip to content

feat: add buffer config #35

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions .github/workflows/check-go-task.yml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,35 @@ jobs:
- name: Check formatting
run: git diff --color --exit-code

run-tests:
name: run-tests
runs-on: ubuntu-latest

strategy:
fail-fast: false
matrix:
module:
- path: ./

steps:
- name: Checkout repository
uses: actions/checkout@v5

- name: Install Go
uses: actions/setup-go@v5
with:
go-version: ${{ env.GO_VERSION }}

- name: Install Task
uses: arduino/setup-task@v2
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
version: 3.x

- name: Run tests
working-directory: ${{ matrix.module.path }}
run: task go:test

check-config:
name: check-config (${{ matrix.module.path }})
runs-on: ubuntu-latest
Expand Down
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
dist/

coverage.out
5 changes: 5 additions & 0 deletions Taskfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,8 @@ tasks:
desc: Format Go code
cmds:
- go fmt {{default .DEFAULT_GO_PACKAGES .GO_PACKAGES}}

go:test:
desc: Test Go code
cmds:
- go test -v -race -covermode=atomic -coverprofile=coverage.out ./...
191 changes: 191 additions & 0 deletions buffering.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
//
// This file is part of pluggable-monitor-protocol-handler.
//
// Copyright 2025 ARDUINO SA (http://www.arduino.cc/)
//
// This software is released under the GNU General Public License version 3,
// which covers the main part of arduino-cli.
// The terms of this license can be found at:
// https://www.gnu.org/licenses/gpl-3.0.en.html
//
// You can be released from the requirements of the above licenses by purchasing
// a commercial license. Buying such a license is mandatory if you want to modify or
// otherwise use the software for commercial activities involving the Arduino
// software without disclosing the source code of your own applications. To purchase
// a commercial license, send an email to [email protected].
//

package monitor

import (
"bytes"
"io"
"sync"
"time"
)

// BufferConfig controls how bytes coming from the board (port) are aggregated
// into larger chunks before being written to the TCP client. All fields are
// optional; invalid or zero values are normalized to sensible defaults via
// normalized().
type BufferConfig struct {
HighWaterMark int // >= 1
FlushInterval time.Duration // 0 to disable time-based flush
LineBuffering bool
// FlushQueueCapacity controls how many aggregated payloads can be queued for the TCP writer.
// If <= 0 a sensible default is applied.
FlushQueueCapacity int
// OverflowStrategy controls what to do if the queue is full when a flush happens.
// Supported values:
// "drop" (default) -> drop the payload rather than blocking the port reader.
// "wait" -> wait up to OverflowWait duration for space, then drop if still full.
OverflowStrategy string
// OverflowWait is only used when OverflowStrategy == "wait". If <= 0, the behavior falls back to "drop".
OverflowWait time.Duration
}

func (c BufferConfig) normalized() BufferConfig {
cfg := c
if cfg.HighWaterMark < 1 {
cfg.HighWaterMark = 1
}
if cfg.FlushQueueCapacity <= 0 {
cfg.FlushQueueCapacity = 32
}
if cfg.OverflowStrategy == "" {
cfg.OverflowStrategy = "drop"
}
if cfg.OverflowWait < 0 {
cfg.OverflowWait = 0
}
return cfg
}

// Option is a functional option used to configure a Server at construction time.
type Option func(*Server)

// WithBufferConfig sets the Server's buffering behavior. The provided cfg is
// normalized (e.g., min values enforced) before being stored on the Server.
func WithBufferConfig(cfg BufferConfig) Option {
return func(s *Server) {
s.bufCfg = cfg.normalized()
}
}

// internal aggregator used in port->conn pump
type aggregator struct {
cfg BufferConfig
mu sync.Mutex
buf bytes.Buffer
timer *time.Timer
flushC chan []byte
closed bool
}

func newAggregator(cfg BufferConfig) *aggregator {
cfg = cfg.normalized()
return &aggregator{
cfg: cfg,
flushC: make(chan []byte, cfg.FlushQueueCapacity),
}
}

func (a *aggregator) addChunk(p []byte) {
a.mu.Lock()
defer a.mu.Unlock()

if a.closed {
return
}

if len(p) == 0 {
return
}
// append
a.buf.Write(p)

// schedule timer if needed and not running
if a.cfg.FlushInterval > 0 && a.timer == nil {
a.timer = time.AfterFunc(a.cfg.FlushInterval, a.onTimer)
}

// flush conditions
if a.buf.Len() >= a.cfg.HighWaterMark || (a.cfg.LineBuffering && bytes.Contains(p, []byte{'\n'})) {
a.flushLocked()
}
}

func (a *aggregator) onTimer() {
a.mu.Lock()
defer a.mu.Unlock()
a.flushLocked()
}

func (a *aggregator) flushLocked() {
if a.timer != nil {
a.timer.Stop()
a.timer = nil
}
if a.buf.Len() == 0 {
return
}
out := make([]byte, a.buf.Len())
copy(out, a.buf.Bytes())
a.buf.Reset()
switch a.cfg.OverflowStrategy {
case "wait":
// To avoid deadlocks, do not wait indefinitely while holding the mutex.
// We allow a bounded wait; after that, drop.
if a.cfg.OverflowWait <= 0 {
select {
case a.flushC <- out:
default:
// drop
}
return
}
deadline := time.NewTimer(a.cfg.OverflowWait)
defer deadline.Stop()
select {
case a.flushC <- out:
case <-deadline.C:
// drop after bounded wait
}
default: // "drop"
select {
case a.flushC <- out:
default:
// drop if consumer is slow
}
}
}

func (a *aggregator) drainTo(w io.Writer) error {
for payload := range a.flushC {
if len(payload) == 0 {
continue
}
if _, err := w.Write(payload); err != nil {
return err
}
}
return nil
}

func (a *aggregator) close() {
a.mu.Lock()
defer a.mu.Unlock()
if a.closed {
return
}
// Stop timer first to prevent future callbacks
if a.timer != nil {
a.timer.Stop()
a.timer = nil
}
// Flush any remaining data while the channel is still open
a.flushLocked()
// Now mark closed and close the channel
a.closed = true
close(a.flushC)
}
119 changes: 119 additions & 0 deletions buffering_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
//
// This file is part of pluggable-monitor-protocol-handler.
//
// Copyright 2025 ARDUINO SA (http://www.arduino.cc/)
//
// This software is released under the GNU General Public License version 3,
// which covers the main part of arduino-cli.
// The terms of this license can be found at:
// https://www.gnu.org/licenses/gpl-3.0.en.html
//
// You can be released from the requirements of the above licenses by purchasing
// a commercial license. Buying such a license is mandatory if you want to modify or
// otherwise use the software for commercial activities involving the Arduino
// software without disclosing the source code of your own applications. To purchase
// a commercial license, send an email to [email protected].
//

package monitor

import (
"testing"
"time"
)

// These tests exercise the aggregator's overflow behavior directly, without
// going through the TCP server. We intentionally do NOT start drainTo(), so
// the flush queue can fill up and trigger the configured strategy.

func Test_Aggregator_Overflow_Drop(t *testing.T) {
cfg := BufferConfig{
HighWaterMark: 1, // flush every byte
FlushInterval: 0,
LineBuffering: false,
FlushQueueCapacity: 1, // tiny queue to force overflow
OverflowStrategy: "drop",
}
a := newAggregator(cfg)

// Push a burst of bytes quickly; with capacity=1 and no consumer, only the
// first payload can be queued, the rest should be dropped.
for i := 0; i < 50; i++ {
a.addChunk([]byte{'a' + byte(i%26)})
}
// close will try a final flush; with a full queue and strategy=drop, it drops too.
a.close()

// Drain whatever remained in the queue and count.
total := 0
count := 0
for p := range a.flushC {
count++
total += len(p)
}

if count > 1 { // capacity is 1, so at most 1 payload can remain queued
t.Fatalf("expected at most 1 queued payload, got %d", count)
}
if total >= 50 { // we queued 50 bytes, but nearly all should be dropped
t.Fatalf("expected drops with strategy=drop; total delivered=%d", total)
}
}

func Test_Aggregator_Overflow_Wait(t *testing.T) {
cfg := BufferConfig{
HighWaterMark: 1,
FlushInterval: 0,
LineBuffering: false,
FlushQueueCapacity: 1,
OverflowStrategy: "wait",
OverflowWait: 25 * time.Millisecond,
}
a := newAggregator(cfg)

start := time.Now()
for i := 0; i < 3; i++ {
a.addChunk([]byte{'x'}) // first enqueues; next two wait ~OverflowWait then drop
}
a.close()
elapsed := time.Since(start)

// Drain queue and count payloads
count := 0
for range a.flushC {
count++
}
if count != 1 {
t.Fatalf("expected exactly 1 queued payload with capacity=1 and no consumer; got %d", count)
}

// We called addChunk 3 times; with OverflowWait=25ms, we expect ~50ms waiting.
// Allow slack for CI.
if elapsed < 40*time.Millisecond {
t.Fatalf("expected bounded waiting to take noticeable time; elapsed=%v (<40ms)", elapsed)
}
}

func Test_Aggregator_QueueCapacity(t *testing.T) {
cfg := BufferConfig{
HighWaterMark: 1,
FlushInterval: 0,
LineBuffering: false,
FlushQueueCapacity: 3, // allow a few in-flight payloads
OverflowStrategy: "drop",
}
a := newAggregator(cfg)

for i := 0; i < 10; i++ {
a.addChunk([]byte{'A' + byte(i%26)})
}
a.close()

count := 0
for range a.flushC {
count++
}
if count > 3 {
t.Fatalf("expected at most queue capacity (3) payloads, got %d", count)
}
}
Loading
Loading