Skip to content

Commit 9f78270

Browse files
authored
feat: add UDP support (#20)
1 parent 9ab2878 commit 9f78270

File tree

17 files changed

+703
-142
lines changed

17 files changed

+703
-142
lines changed

README.md

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -49,14 +49,14 @@ The `server` command supports the following additional arguments.
4949

5050
```shell
5151
OPTIONS:
52-
--addr value The address to listen on for probe messages (default: ":8123") [$ADDR]
53-
--buffer-size value The size of the read buffer used by the server (default: 512) [$BUFFER_SIZE]
54-
--read-timeout value The duration after which the server should timeout when reading from a connection (default: 2s) [$READ_TIMEOUT]
55-
--write-timeout value The duration after which the server should timeout when writing to a connection (default: 5s) [$WRITE_TIMEOUT]
56-
--log.format value Specify the format of logs. Supported formats: 'logfmt', 'json', 'console' [$LOG_FORMAT]
57-
--log.level value Specify the log level. e.g. 'debug', 'info', 'error'. (default: "info") [$LOG_LEVEL]
58-
--log.ctx value A list of context field appended to every log. Format: key=value. [$LOG_CTX]
59-
--help, -h show help (default: false)
52+
--addr value The address to listen on for probe messages (default: ":8123") [$ADDR]
53+
--buffer-size value The size of the read buffer used by the server (default: 512) [$BUFFER_SIZE]
54+
--read-timeout value The duration after which the server should timeout when reading from a connection (default: 2s) [$READ_TIMEOUT]
55+
--write-timeout value The duration after which the server should timeout when writing to a connection (default: 5s) [$WRITE_TIMEOUT]
56+
--log.format value Specify the format of logs. Supported formats: 'logfmt', 'json', 'console' [$LOG_FORMAT]
57+
--log.level value Specify the log level. e.g. 'debug', 'info', 'error'. (default: "info") [$LOG_LEVEL]
58+
--log.ctx value [ --log.ctx value ] A list of context field appended to every log. Format: key=value. [$LOG_CTX]
59+
--help, -h show help
6060
```
6161

6262
#### Firewall
@@ -95,15 +95,17 @@ The `client` command supports the following additional arguments.
9595

9696
```shell
9797
OPTIONS:
98-
--addr value The address of a connqc server [$ADDR]
99-
--backoff value Duration to wait for before retrying to connect to the server (default: 1s) [$BACKOFF]
100-
--interval value Interval at which to send probe messages to the server (default: 1s) [$INTERVAL]
101-
--read-timeout value The duration after which the client should timeout when reading from a connection (default: 2s) [$READ_TIMEOUT]
102-
--write-timeout value The duration after which the client should timeout when writing to a connection (default: 5s) [$WRITE_TIMEOUT]
103-
--log.format value Specify the format of logs. Supported formats: 'logfmt', 'json', 'console' [$LOG_FORMAT]
104-
--log.level value Specify the log level. e.g. 'debug', 'info', 'error'. (default: "info") [$LOG_LEVEL]
105-
--log.ctx value A list of context field appended to every log. Format: key=value. [$LOG_CTX]
106-
--help, -h show help (default: false)
98+
--protocol value Use protocol for the connection (available: tcp, udp) (default: "tcp") [$PROTOCOL]
99+
--addr value The address of the connqc server [$ADDR]
100+
--backoff value The duration to wait for before retrying to connect to the server (default: 1s) [$BACKOFF]
101+
--interval value The interval at which to send probe messages to the server (default: 1s) [$INTERVAL]
102+
--read-timeout value The duration after which the client should timeout when reading from a connection (default: 2s) [$READ_TIMEOUT]
103+
--write-timeout value The duration after which the client should timeout when writing to a connection (default: 5s) [$WRITE_TIMEOUT]
104+
--log.format value Specify the format of logs. Supported formats: 'logfmt', 'json', 'console' [$LOG_FORMAT]
105+
--log.level value Specify the log level. e.g. 'debug', 'info', 'error'. (default: "info") [$LOG_LEVEL]
106+
--log.ctx value [ --log.ctx value ] A list of context field appended to every log. Format: key=value. [$LOG_CTX]
107+
--help, -h show help
108+
107109
```
108110

109111
## License

client.go

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88

99
"github.com/hamba/logger/v2"
1010
lctx "github.com/hamba/logger/v2/ctx"
11+
"github.com/nitrado/connqc/tcp"
12+
"github.com/nitrado/connqc/udp"
1113
)
1214

1315
// Client attempts to hold a connection with a server, sending probe messages at a configured interval.
@@ -33,11 +35,27 @@ func NewClient(backoff, sendInterval, readTimeout, writeTimeout time.Duration, l
3335

3436
// Run sends probe messages to the server continuously.
3537
// If the connection fails, it retries at the configured backoff interval.
36-
func (c *Client) Run(ctx context.Context, addr string) {
38+
func (c *Client) Run(ctx context.Context, protocol, addr string) {
39+
var (
40+
conn net.Conn
41+
err error
42+
idx int
43+
)
3744
for {
38-
conn, err := net.Dial("tcp", addr)
45+
log := c.log.With(lctx.Str("protocol", protocol), lctx.Str("addr", addr), lctx.Int("reconnect", idx))
46+
idx++
47+
48+
switch protocol {
49+
case "tcp":
50+
conn, err = tcp.Connect(addr)
51+
case "udp":
52+
conn, err = udp.Connect(addr)
53+
default:
54+
log.Error("Unexpected protocol")
55+
return
56+
}
3957
if err != nil {
40-
c.log.Error("Could not connect to server", lctx.Str("address", addr))
58+
log.Error("Could not connect to server", lctx.Err(err))
4159

4260
select {
4361
case <-ctx.Done():
@@ -48,7 +66,7 @@ func (c *Client) Run(ctx context.Context, addr string) {
4866
}
4967

5068
if err = c.handleConn(ctx, conn); err != nil {
51-
c.log.Error("Connection error", lctx.Error("error", err))
69+
log.Error("Connection error", lctx.Err(err))
5270
}
5371

5472
select {
@@ -64,7 +82,7 @@ type expectation struct {
6482
probe Probe
6583
}
6684

67-
func (c *Client) handleConn(ctx context.Context, conn net.Conn) error { //nolint:funlen
85+
func (c *Client) handleConn(ctx context.Context, conn net.Conn) error { //nolint:funlen // Simplify readability.
6886
defer func() { _ = conn.Close() }()
6987

7088
readCh := make(chan readResponse)
@@ -86,10 +104,10 @@ func (c *Client) handleConn(ctx context.Context, conn net.Conn) error { //nolint
86104
Data: fmt.Sprintf("Hello %d", id),
87105
}
88106
if err := enc.Encode(p); err != nil {
89-
return fmt.Errorf("writing Message: %w", err)
107+
return fmt.Errorf("writing message: %w", err)
90108
}
91109

92-
c.log.Debug("Sent probe", lctx.Interface("probe", p))
110+
c.log.Info("Message sent", lctx.Interface("probe", p))
93111

94112
id++
95113
expect = append(expect, expectation{timestamp: time.Now(), probe: p})
@@ -98,7 +116,7 @@ func (c *Client) handleConn(ctx context.Context, conn net.Conn) error { //nolint
98116
return nil
99117
}
100118
if resp.err != nil {
101-
return fmt.Errorf("reading Message: %w", resp.err)
119+
return fmt.Errorf("reading response: %w", resp.err)
102120
}
103121

104122
var (
@@ -115,7 +133,12 @@ func (c *Client) handleConn(ctx context.Context, conn net.Conn) error { //nolint
115133
break
116134
}
117135

118-
c.log.Warn("Message lost", lctx.Uint64("id", exp.probe.ID), lctx.Str("data", exp.probe.Data))
136+
c.log.Warn("Message dropped",
137+
lctx.Str("error", "unexpected ID"),
138+
lctx.Uint64("expected_id", resp.probe.ID),
139+
lctx.Uint64("id", exp.probe.ID),
140+
lctx.Str("data", exp.probe.Data),
141+
)
119142
}
120143
if !found {
121144
c.log.Error("No expectation found")
@@ -152,12 +175,10 @@ func (c *Client) readLoop(conn net.Conn, ch chan readResponse) {
152175

153176
p, ok := msg.(Probe)
154177
if !ok {
155-
ch <- readResponse{err: err}
178+
ch <- readResponse{err: fmt.Errorf("message not a probe: %T", msg)}
156179
continue
157180
}
158181

159-
c.log.Debug("Received probe", lctx.Interface("probe", p))
160-
161182
ch <- readResponse{timestamp: time.Now(), probe: p}
162183
}
163184
}

cmd/connqc/client.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,41 @@
11
package main
22

33
import (
4+
"context"
5+
"fmt"
6+
47
"github.com/hamba/cmd/v2"
8+
lctx "github.com/hamba/logger/v2/ctx"
59
"github.com/nitrado/connqc"
610
"github.com/urfave/cli/v2"
711
)
812

913
func runClient(c *cli.Context) error {
10-
ctx := c.Context
14+
ctx, cancel := context.WithCancel(c.Context)
15+
defer cancel()
1116

1217
log, err := cmd.NewLogger(c)
1318
if err != nil {
1419
return err
1520
}
1621

22+
protocol := c.String(flagProtocol)
23+
if protocol != flagProtocolTCP && protocol != flagProtocolUDP {
24+
return fmt.Errorf("unsupported protocol: %s", protocol)
25+
}
26+
1727
backoff := c.Duration(flagConnBackoff)
1828
sendInterval := c.Duration(flagSendInterval)
1929
readTimeout := c.Duration(flagReadTimeout)
2030
writeTimeout := c.Duration(flagWriteTimeout)
2131

22-
sig := connqc.NewClient(backoff, sendInterval, readTimeout, writeTimeout, log)
32+
log = log.With(lctx.Str("protocol", protocol))
2333

24-
addr := c.String(flagAddr)
25-
go sig.Run(ctx, addr)
34+
client := connqc.NewClient(backoff, sendInterval, readTimeout, writeTimeout, log)
35+
go func() {
36+
client.Run(ctx, protocol, c.String(flagAddr))
37+
cancel()
38+
}()
2639

2740
<-ctx.Done()
2841

cmd/connqc/main.go

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@ import (
1616
)
1717

1818
const (
19-
flagAddr = "addr"
19+
flagProtocol = "protocol"
20+
flagProtocolTCP = "tcp"
21+
flagProtocolUDP = "udp"
22+
flagAddr = "addr"
2023

2124
flagBufferSize = "buffer-size"
2225
flagReadTimeout = "read-timeout"
@@ -33,21 +36,29 @@ var commands = []*cli.Command{
3336
Name: "client",
3437
Usage: "Run the connqc client",
3538
Flags: cmd.Flags{
39+
&cli.StringFlag{
40+
Name: flagProtocol,
41+
Usage: fmt.Sprintf(
42+
"The protocol for the connection. Supported protocols: '%s', '%s'", flagProtocolTCP, flagProtocolUDP,
43+
),
44+
Value: flagProtocolTCP,
45+
EnvVars: []string{strcase.ToSNAKE(flagProtocol)},
46+
},
3647
&cli.StringFlag{
3748
Name: flagAddr,
38-
Usage: "The address of a connqc server",
49+
Usage: "The address of the connqc server",
3950
Required: true,
4051
EnvVars: []string{strcase.ToSNAKE(flagAddr)},
4152
},
4253
&cli.DurationFlag{
4354
Name: flagConnBackoff,
44-
Usage: "Duration to wait for before retrying to connect to the server",
55+
Usage: "The duration to wait for before retrying to connect to the server",
4556
Value: time.Second,
4657
EnvVars: []string{strcase.ToSNAKE(flagConnBackoff)},
4758
},
4859
&cli.DurationFlag{
4960
Name: flagSendInterval,
50-
Usage: "Interval at which to send probe messages to the server",
61+
Usage: "The interval at which to send probe messages to the server",
5162
Value: time.Second,
5263
EnvVars: []string{strcase.ToSNAKE(flagSendInterval)},
5364
},

cmd/connqc/server.go

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,18 @@ package main
22

33
import (
44
"errors"
5+
"net"
6+
"sync"
57

68
"github.com/hamba/cmd/v2"
79
lctx "github.com/hamba/logger/v2/ctx"
810
"github.com/nitrado/connqc"
911
"github.com/nitrado/connqc/tcp"
12+
"github.com/nitrado/connqc/udp"
1013
"github.com/urfave/cli/v2"
1114
)
1215

13-
func runServer(c *cli.Context) error {
16+
func runServer(c *cli.Context) error { //nolint:funlen // Keep it simple and readable.
1417
ctx := c.Context
1518

1619
log, err := cmd.NewLogger(c)
@@ -22,34 +25,56 @@ func runServer(c *cli.Context) error {
2225
readTimeout := c.Duration(flagReadTimeout)
2326
writeTimeout := c.Duration(flagWriteTimeout)
2427

25-
sigSrv := connqc.NewServer(bufferSize, readTimeout, writeTimeout, log)
28+
srv := connqc.NewServer(bufferSize, readTimeout, writeTimeout, log)
2629

27-
srv, err := tcp.NewServer(sigSrv)
30+
tcpSrv, err := tcp.NewServer(srv)
31+
if err != nil {
32+
return err
33+
}
34+
35+
udpSrv, err := udp.NewServer(srv)
2836
if err != nil {
2937
return err
3038
}
3139

3240
addr := c.String(flagAddr)
41+
42+
grp := sync.WaitGroup{}
43+
3344
log.Info("Starting server",
3445
lctx.Str("addr", addr),
3546
lctx.Int("buffer_size", bufferSize),
3647
lctx.Duration("read_timeout", readTimeout),
3748
lctx.Duration("write_timeout", writeTimeout),
3849
)
50+
51+
grp.Add(1)
3952
go func() {
40-
if err := srv.Listen(addr); err != nil && !errors.Is(err, tcp.ErrServerClosed) {
41-
log.Error("Server error", lctx.Error("error", err))
53+
defer grp.Done()
54+
55+
if err := tcpSrv.Listen(ctx, addr); err != nil && !errors.Is(err, net.ErrClosed) {
56+
log.Error("Server error", lctx.Str("protocol", "tcp"), lctx.Err(err))
57+
return
4258
}
59+
log.Info("TCP server stopped")
60+
}()
61+
62+
grp.Add(1)
63+
go func() {
64+
defer grp.Done()
65+
66+
if err := udpSrv.Listen(ctx, addr); err != nil && !errors.Is(err, net.ErrClosed) {
67+
log.Error("Server error", lctx.Str("protocol", "udp"), lctx.Err(err))
68+
return
69+
}
70+
log.Info("UDP server stopped")
4371
}()
44-
defer func() { _ = srv.Close() }()
4572

4673
<-ctx.Done()
4774

4875
log.Info("Shutting down")
4976

50-
if err = srv.Close(); err != nil {
51-
log.Warn("Failed to shutdown server", lctx.Error("error", err))
52-
}
77+
grp.Wait()
5378

5479
return nil
5580
}

0 commit comments

Comments
 (0)