Skip to content

Commit b55341b

Browse files
committed
naming consitency, and add test
1 parent 3b3a025 commit b55341b

File tree

4 files changed

+156
-22
lines changed

4 files changed

+156
-22
lines changed

tracker/metrics/conn.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ type Conn struct {
2020
// NewConn creates a new Conn instance.
2121
func NewConn(conn net.Conn, metadata *adapter.InboundContext) net.Conn {
2222
attributes := metadataToAttributes(metadata)
23-
metrics.Connections.Add(context.Background(), 1, metric.WithAttributes(attributes...))
23+
metrics.conns.Add(context.Background(), 1, metric.WithAttributes(attributes...))
2424
return &Conn{
2525
Conn: conn,
2626
attributes: attributes,
@@ -33,7 +33,7 @@ func (c *Conn) Read(b []byte) (n int, err error) {
3333
n, err = c.Conn.Read(b)
3434
if n > 0 {
3535
attrs := append(c.attributes, attribute.KeyValue{Key: "direction", Value: attribute.StringValue("receive")})
36-
metrics.ProxyIO.Add(context.Background(), int64(n), metric.WithAttributes(attrs...))
36+
metrics.proxyIO.Add(context.Background(), int64(n), metric.WithAttributes(attrs...))
3737
}
3838
return
3939
}
@@ -43,7 +43,7 @@ func (c *Conn) Write(b []byte) (n int, err error) {
4343
n, err = c.Conn.Write(b)
4444
if n > 0 {
4545
attrs := append(c.attributes, attribute.KeyValue{Key: "direction", Value: attribute.StringValue("transmit")})
46-
metrics.ProxyIO.Add(context.Background(), int64(n), metric.WithAttributes(attrs...))
46+
metrics.proxyIO.Add(context.Background(), int64(n), metric.WithAttributes(attrs...))
4747
}
4848
return
4949
}

tracker/metrics/metrics.go

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,10 @@ import (
1818
)
1919

2020
type metricsManager struct {
21-
meter metric.Meter
22-
ProxyIO metric.Int64Counter
23-
Connections metric.Int64Counter
24-
conns metric.Int64UpDownCounter
25-
duration metric.Int64Histogram
21+
meter metric.Meter
22+
proxyIO metric.Int64Counter
23+
conns metric.Int64UpDownCounter
24+
duration metric.Int64Histogram
2625

2726
countryLookup geo.CountryLookup
2827
}
@@ -32,33 +31,28 @@ var metrics = &metricsManager{}
3231
func SetupMetricsManager(geolite2CityURL, cityDBFile string) {
3332
meter := otel.GetMeterProvider().Meter("lantern-box")
3433

34+
// Track the number of bytes sent and recieved.
3535
pIO, err := meter.Int64Counter("proxy.io", metric.WithUnit("bytes"))
3636
if err != nil {
3737
pIO = &noop.Int64Counter{}
3838
}
3939

40-
connections, err := meter.Int64Counter("proxy.connections")
41-
if err != nil {
42-
connections = &noop.Int64Counter{}
43-
}
44-
4540
// Track the number of connections.
46-
conns, err := meter.Int64UpDownCounter("sing.connections", metric.WithDescription("Number of connections"))
41+
conns, err := meter.Int64UpDownCounter("proxy.connections", metric.WithDescription("Number of connections"))
4742
if err != nil {
4843
conns = &noop.Int64UpDownCounter{}
4944
}
5045

5146
// Track connection duration.
52-
duration, err := meter.Int64Histogram("sing.connection_duration", metric.WithDescription("Connection duration"))
47+
duration, err := meter.Int64Histogram("proxy.connection.duration", metric.WithDescription("Connection duration"))
5348
if err != nil {
5449
duration = &noop.Int64Histogram{}
5550
}
5651

5752
metrics.meter = meter
58-
metrics.ProxyIO = pIO
59-
metrics.duration = duration
60-
metrics.Connections = connections
53+
metrics.proxyIO = pIO
6154
metrics.conns = conns
55+
metrics.duration = duration
6256

6357
metrics.countryLookup = geo.FromWeb(geolite2CityURL, cityDBFile, 24*time.Hour, cityDBFile, geo.CountryCode)
6458
if metrics.countryLookup == nil {

tracker/metrics/packet_conn.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ type PacketConn struct {
2222
// NewPacketConn creates a new PacketConn instance.
2323
func NewPacketConn(conn N.PacketConn, metadata *adapter.InboundContext) N.PacketConn {
2424
attributes := metadataToAttributes(metadata)
25-
metrics.Connections.Add(context.Background(), 1, metric.WithAttributes(attributes...))
25+
metrics.conns.Add(context.Background(), 1, metric.WithAttributes(attributes...))
2626

2727
return &PacketConn{
2828
PacketConn: conn,
@@ -39,7 +39,7 @@ func (c *PacketConn) ReadPacket(buffer *buf.Buffer) (destination M.Socksaddr, er
3939
}
4040
if buffer.Len() > 0 {
4141
attrs := append(c.attributes, attribute.KeyValue{Key: "direction", Value: attribute.StringValue("receive")})
42-
metrics.ProxyIO.Add(context.Background(), int64(buffer.Len()), metric.WithAttributes(attrs...))
42+
metrics.proxyIO.Add(context.Background(), int64(buffer.Len()), metric.WithAttributes(attrs...))
4343
}
4444
return dest, nil
4545
}
@@ -48,7 +48,7 @@ func (c *PacketConn) ReadPacket(buffer *buf.Buffer) (destination M.Socksaddr, er
4848
func (c *PacketConn) WritePacket(buffer *buf.Buffer, destination M.Socksaddr) error {
4949
if buffer.Len() > 0 {
5050
attrs := append(c.attributes, attribute.KeyValue{Key: "direction", Value: attribute.StringValue("transmit")})
51-
metrics.ProxyIO.Add(context.Background(), int64(buffer.Len()), metric.WithAttributes(attrs...))
51+
metrics.proxyIO.Add(context.Background(), int64(buffer.Len()), metric.WithAttributes(attrs...))
5252
}
5353
return c.PacketConn.WritePacket(buffer, destination)
5454
}

tracker/metrics/tracker_test.go

Lines changed: 141 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"strings"
77
"sync"
88
"testing"
9+
"time"
910

1011
sdkotel "go.opentelemetry.io/otel"
1112

@@ -15,9 +16,13 @@ import (
1516
"go.opentelemetry.io/otel/sdk/metric/metricdata"
1617

1718
"github.com/stretchr/testify/assert"
19+
20+
"github.com/sagernet/sing/common/buf"
21+
M "github.com/sagernet/sing/common/metadata"
22+
N "github.com/sagernet/sing/common/network"
1823
)
1924

20-
func TestTracker(t *testing.T) {
25+
func TestConnTracker(t *testing.T) {
2126
var wg sync.WaitGroup
2227
reader := metric.NewManualReader()
2328
provider := metric.NewMeterProvider(metric.WithReader(reader))
@@ -84,6 +89,81 @@ func TestTracker(t *testing.T) {
8489
}
8590
}
8691

92+
func TestPacketConnTracker(t *testing.T) {
93+
var wg sync.WaitGroup
94+
reader := metric.NewManualReader()
95+
provider := metric.NewMeterProvider(metric.WithReader(reader))
96+
sdkotel.SetMeterProvider(provider)
97+
SetupMetricsManager("", "")
98+
99+
metricsTracker, err := NewTracker()
100+
assert.NoError(t, err)
101+
102+
client, server := packetPipe()
103+
defer client.Close()
104+
defer server.Close()
105+
106+
ctx := context.Background()
107+
serverTracked := metricsTracker.RoutedPacketConnection(ctx, server, adapter.InboundContext{}, nil, nil)
108+
109+
wg.Add(1)
110+
serverReceive := 0
111+
go func() {
112+
defer wg.Done()
113+
buffer := buf.New()
114+
_, err = serverTracked.ReadPacket(buffer)
115+
assert.NoError(t, err)
116+
serverReceive = buffer.Len()
117+
t.Logf("Server received '%v'\n", string(buffer.Bytes()))
118+
}()
119+
120+
clientSentMessage := []byte("A client sent a short request...")
121+
clientSendBuf := buf.NewSize(len(clientSentMessage))
122+
clientSendBuf.Write(clientSentMessage)
123+
err = client.WritePacket(clientSendBuf, server.LocalAddr().(M.Socksaddr))
124+
assert.NoError(t, err)
125+
126+
wg.Add(1)
127+
go func() {
128+
defer wg.Done()
129+
buffer := buf.New()
130+
_, err = client.ReadPacket(buffer)
131+
assert.NoError(t, err)
132+
t.Logf("Client received '%v'\n", string(buffer.Bytes()))
133+
134+
}()
135+
136+
serverSentMessage := []byte("...and the server sent a short response.")
137+
serverSendBuf := buf.NewSize(len(serverSentMessage))
138+
serverSendBuf.Write(serverSentMessage)
139+
serverTransmit := serverSendBuf.Len()
140+
err = serverTracked.WritePacket(serverSendBuf, client.LocalAddr().(M.Socksaddr))
141+
assert.NoError(t, err)
142+
143+
wg.Wait()
144+
var rm metricdata.ResourceMetrics
145+
reader.Collect(ctx, &rm)
146+
147+
ioCounter := extractCountersByAttribute(rm, "proxy.io")
148+
results := map[string]int64{}
149+
for k, v := range ioCounter {
150+
if strings.Contains(k, "direction=transmit") {
151+
results["transmit"] += v
152+
} else if strings.Contains(k, "direction=receive") {
153+
results["receive"] += v
154+
}
155+
}
156+
for k, v := range results {
157+
t.Logf("%s: %d\n", k, v)
158+
}
159+
if results["transmit"] != int64(serverTransmit) {
160+
t.Errorf("transmit bytes did not match, got %d, want %d", results["transmit"], serverTransmit)
161+
}
162+
if results["receive"] != int64(serverReceive) {
163+
t.Errorf("receive bytes did not match, got %d, want %d", results["receive"], serverReceive)
164+
}
165+
}
166+
87167
func extractCountersByAttribute(rm metricdata.ResourceMetrics, name string) map[string]int64 {
88168
result := make(map[string]int64)
89169
for _, sm := range rm.ScopeMetrics {
@@ -99,3 +179,63 @@ func extractCountersByAttribute(rm metricdata.ResourceMetrics, name string) map[
99179
}
100180
return result
101181
}
182+
183+
// Mock packet pipe connection for testing
184+
type packet struct {
185+
buffer *buf.Buffer
186+
dest M.Socksaddr
187+
}
188+
189+
var _ N.PacketConn = (*packetPipeConn)(nil)
190+
191+
type packetPipeConn struct {
192+
readCh chan packet
193+
writeCh chan packet
194+
once sync.Once
195+
done chan struct{}
196+
}
197+
198+
func packetPipe() (N.PacketConn, N.PacketConn) {
199+
ch1 := make(chan packet, 16)
200+
ch2 := make(chan packet, 16)
201+
done1 := make(chan struct{})
202+
done2 := make(chan struct{})
203+
204+
c1 := &packetPipeConn{readCh: ch1, writeCh: ch2, done: done1}
205+
c2 := &packetPipeConn{readCh: ch2, writeCh: ch1, done: done2}
206+
return c1, c2
207+
}
208+
209+
func (c *packetPipeConn) ReadPacket(buffer *buf.Buffer) (destination M.Socksaddr, err error) {
210+
select {
211+
case p := <-c.readCh:
212+
_, err = buffer.ReadOnceFrom(p.buffer)
213+
p.buffer.Release()
214+
return p.dest, err
215+
case <-c.done:
216+
return M.Socksaddr{}, net.ErrClosed
217+
}
218+
}
219+
220+
func (c *packetPipeConn) WritePacket(buffer *buf.Buffer, destination M.Socksaddr) error {
221+
newBuf := buf.NewSize(buffer.Len())
222+
newBuf.Write(buffer.Bytes())
223+
buffer.Release()
224+
select {
225+
case c.writeCh <- packet{buffer: newBuf, dest: destination}:
226+
return nil
227+
case <-c.done:
228+
newBuf.Release()
229+
return net.ErrClosed
230+
}
231+
}
232+
233+
func (c *packetPipeConn) Close() error {
234+
c.once.Do(func() { close(c.done) })
235+
return nil
236+
}
237+
238+
func (c *packetPipeConn) LocalAddr() net.Addr { return M.Socksaddr{} }
239+
func (c *packetPipeConn) SetDeadline(t time.Time) error { return nil }
240+
func (c *packetPipeConn) SetReadDeadline(t time.Time) error { return nil }
241+
func (c *packetPipeConn) SetWriteDeadline(t time.Time) error { return nil }

0 commit comments

Comments
 (0)