Skip to content

Commit a247c65

Browse files
committed
feat(agent,pkg): load tunnel configuration from authentication data
Updated the tunnel listening mechanism to utilize a configuration loaded from authentication data instead of the default configuration. This change enhances flexibility by allowing device-specific settings to be applied dynamically. The NewConfigFromMap function was added to create a new configuration from a map, ensuring that default values are used when specific keys are missing. Additionally, improved error handling during session creation to attempt a fallback to the default configuration if the custom one fails.
1 parent 57d681b commit a247c65

File tree

3 files changed

+105
-34
lines changed

3 files changed

+105
-34
lines changed

pkg/agent/agent.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -646,7 +646,7 @@ func (a *Agent) Listen(ctx context.Context) error {
646646

647647
a.listening <- true
648648

649-
if err := a.tunnel.Listen(conn, &tunnel.DefaultConfig); err != nil {
649+
if err := a.tunnel.Listen(conn, tunnel.NewConfigFromMap(a.authData.Config)); err != nil {
650650
logger.WithError(err).Error("Tunnel listener exited with error")
651651
}
652652

pkg/agent/pkg/tunnel/tunnel.go

Lines changed: 98 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -38,71 +38,136 @@ func (t *Tunnel) Handle(protocol string, handler Handler) {
3838
var ErrTunnelDisconnect = errors.New("tunnel disconnected")
3939

4040
type Config struct {
41-
// AcceptBacklog is used to limit how many streams may be
41+
// YamuxAcceptBacklog is used to limit how many streams may be
4242
// waiting an accept.
43-
AcceptBacklog int `json:"accept_backlog"`
43+
YamuxAcceptBacklog int `json:"yamux_accept_backlog"`
4444

4545
// EnableKeepalive is used to do a period keep alive
4646
// messages using a ping.
47-
EnableKeepAlive bool `json:"enable_keep_alive"`
47+
YamuxEnableKeepAlive bool `json:"yamux_enable_keep_alive"`
4848

49-
// KeepAliveInterval is how often to perform the keep alive
50-
KeepAliveInterval time.Duration `json:"keep_alive_interval"`
49+
// YamuxKeepAliveInterval is how often to perform the keep alive
50+
YamuxKeepAliveInterval time.Duration `json:"yamux_keep_alive_interval"`
5151

52-
// ConnectionWriteTimeout is meant to be a "safety valve" timeout after
52+
// YamuxConnectionWriteTimeout is meant to be a "safety valve" timeout after
5353
// we which will suspect a problem with the underlying connection and
5454
// close it. This is only applied to writes, where's there's generally
5555
// an expectation that things will move along quickly.
56-
ConnectionWriteTimeout time.Duration `json:"connection_write_timeout"`
56+
YamuxConnectionWriteTimeout time.Duration `json:"yamux_connection_write_timeout"`
5757

58-
// MaxStreamWindowSize is used to control the maximum
58+
// YamuxMaxStreamWindowSize is used to control the maximum
5959
// window size that we allow for a stream.
60-
MaxStreamWindowSize uint32 `json:"max_stream_window_size"`
60+
YamuxMaxStreamWindowSize uint32 `json:"yamux_max_stream_window_size"`
6161

62-
// StreamOpenTimeout is the maximum amount of time that a stream will
62+
// YamuxStreamOpenTimeout is the maximum amount of time that a stream will
6363
// be allowed to remain in pending state while waiting for an ack from the peer.
6464
// Once the timeout is reached the session will be gracefully closed.
65-
// A zero value disables the StreamOpenTimeout allowing unbounded
65+
// A zero value disables the YamuxStreamOpenTimeout allowing unbounded
6666
// blocking on OpenStream calls.
67-
StreamOpenTimeout time.Duration `json:"stream_open_timeout"`
67+
YamuxStreamOpenTimeout time.Duration `json:"yamux_stream_open_timeout"`
6868

69-
// StreamCloseTimeout is the maximum time that a stream will allowed to
69+
// YamuxStreamCloseTimeout is the maximum time that a stream will allowed to
7070
// be in a half-closed state when `Close` is called before forcibly
7171
// closing the connection. Forcibly closed connections will empty the
7272
// receive buffer, drop any future packets received for that stream,
7373
// and send a RST to the remote side.
74-
StreamCloseTimeout time.Duration `json:"stream_close_timeout"`
74+
YamuxStreamCloseTimeout time.Duration `json:"yamux_stream_close_timeout"`
75+
}
76+
77+
// NewConfigFromMap creates a new Config from a map[string]any received from auth data from the server
78+
// or returns the default config if the map is nil. If a key is missing, the default value is used.
79+
func NewConfigFromMap(m map[string]any) *Config {
80+
cfg := DefaultConfig
81+
82+
if v, ok := m["yamux_accept_backlog"].(int); ok {
83+
cfg.YamuxAcceptBacklog = v
84+
}
85+
86+
if v, ok := m["yamux_enable_keep_alive"].(bool); ok {
87+
cfg.YamuxEnableKeepAlive = v
88+
}
89+
90+
if v, ok := m["yamux_keep_alive_interval"].(time.Duration); ok {
91+
cfg.YamuxKeepAliveInterval = v
92+
}
93+
94+
if v, ok := m["yamux_connection_write_timeout"].(time.Duration); ok {
95+
cfg.YamuxConnectionWriteTimeout = v
96+
}
97+
98+
if v, ok := m["yamux_max_stream_window_size"].(uint32); ok {
99+
cfg.YamuxMaxStreamWindowSize = v
100+
}
101+
102+
if v, ok := m["yamux_stream_open_timeout"].(time.Duration); ok {
103+
cfg.YamuxStreamOpenTimeout = v
104+
}
105+
106+
if v, ok := m["yamux_stream_close_timeout"].(time.Duration); ok {
107+
cfg.YamuxStreamCloseTimeout = v
108+
}
109+
110+
return &cfg
111+
}
112+
113+
func YamuxConfigFromConfig(cfg *Config) *yamux.Config {
114+
if cfg == nil {
115+
cfg = &DefaultConfig
116+
}
117+
118+
return &yamux.Config{
119+
AcceptBacklog: cfg.YamuxAcceptBacklog,
120+
EnableKeepAlive: cfg.YamuxEnableKeepAlive,
121+
KeepAliveInterval: cfg.YamuxKeepAliveInterval,
122+
ConnectionWriteTimeout: cfg.YamuxConnectionWriteTimeout,
123+
MaxStreamWindowSize: cfg.YamuxMaxStreamWindowSize,
124+
StreamCloseTimeout: cfg.YamuxStreamCloseTimeout,
125+
StreamOpenTimeout: cfg.YamuxStreamOpenTimeout,
126+
LogOutput: os.Stderr,
127+
}
75128
}
76129

77130
var DefaultConfig = Config{
78-
AcceptBacklog: 256,
79-
EnableKeepAlive: true,
80-
KeepAliveInterval: 35 * time.Second,
81-
ConnectionWriteTimeout: 15 * time.Second,
82-
MaxStreamWindowSize: 256 * 1024,
83-
StreamCloseTimeout: 5 * time.Minute,
84-
StreamOpenTimeout: 75 * time.Second,
131+
YamuxAcceptBacklog: 256,
132+
YamuxEnableKeepAlive: true,
133+
YamuxKeepAliveInterval: 35 * time.Second,
134+
YamuxConnectionWriteTimeout: 15 * time.Second,
135+
YamuxMaxStreamWindowSize: 256 * 1024,
136+
YamuxStreamCloseTimeout: 5 * time.Minute,
137+
YamuxStreamOpenTimeout: 75 * time.Second,
85138
}
86139

87140
func (t *Tunnel) Listen(conn net.Conn, cfg *Config) error {
88141
if cfg == nil {
89142
cfg = &DefaultConfig
90143
}
91144

92-
session, err := yamux.Server(conn, &yamux.Config{
93-
AcceptBacklog: cfg.AcceptBacklog,
94-
EnableKeepAlive: cfg.EnableKeepAlive,
95-
KeepAliveInterval: cfg.KeepAliveInterval,
96-
ConnectionWriteTimeout: cfg.ConnectionWriteTimeout,
97-
MaxStreamWindowSize: cfg.MaxStreamWindowSize,
98-
StreamCloseTimeout: cfg.StreamCloseTimeout,
99-
StreamOpenTimeout: cfg.StreamOpenTimeout,
100-
LogOutput: os.Stderr,
101-
})
145+
var session *yamux.Session
146+
var err error
147+
148+
session, err = yamux.Server(conn, YamuxConfigFromConfig(cfg))
102149
if err != nil {
103-
log.WithError(err).Error("failed to create muxed session")
150+
log.WithError(err).WithFields(log.Fields{
151+
"yamux_accept_backlog": cfg.YamuxAcceptBacklog,
152+
"yamux_enable_keep_alive": cfg.YamuxEnableKeepAlive,
153+
"yamux_keep_alive_interval": cfg.YamuxKeepAliveInterval,
154+
"yamux_connection_write_timeout": cfg.YamuxConnectionWriteTimeout,
155+
"yamux_max_stream_window_size": cfg.YamuxMaxStreamWindowSize,
156+
"yamux_stream_close_timeout": cfg.YamuxStreamCloseTimeout,
157+
"yamux_stream_open_timeout": cfg.YamuxStreamOpenTimeout,
158+
}).Error("failed to create muxed session")
159+
160+
// NOTE: If we fail to create the session, we should try again with the [DefaultConfig] as the client
161+
// could be using different settings.
162+
log.WithError(err).Warning("trying to create muxed session with default config")
163+
session, err = yamux.Server(conn, YamuxConfigFromConfig(&DefaultConfig))
164+
if err != nil {
165+
log.WithError(err).Error("failed to create muxed session with default config")
166+
167+
return err
168+
}
104169

105-
return err
170+
log.WithError(err).Warning("muxed session created with default config due to error with custom config")
106171
}
107172

108173
for {

pkg/models/device.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,12 @@ type DeviceAuthResponse struct {
6767
Token string `json:"token"`
6868
Name string `json:"name"`
6969
Namespace string `json:"namespace"`
70+
// Config holds device-specific configuration settings.
71+
// This can include various parameters that the device needs to operate correctly.
72+
// The structure of this map can vary depending on the device type and its requirements.
73+
// Example configurations might include network settings, operational modes, or feature toggles.
74+
// It's designed to be flexible to accommodate different device needs.
75+
Config map[string]any `json:"config,omitempty"`
7076
}
7177

7278
type DeviceIdentity struct {

0 commit comments

Comments
 (0)