Skip to content

Commit b1eb87d

Browse files
committed
break out into multiple files
1 parent 803ee0d commit b1eb87d

File tree

3 files changed

+183
-169
lines changed

3 files changed

+183
-169
lines changed

src/bidirectionalStream.go

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
package main
2+
3+
import (
4+
"bufio"
5+
"bytes"
6+
"fmt"
7+
"io"
8+
"net/http"
9+
"sync"
10+
11+
"github.com/google/gopacket"
12+
"github.com/google/gopacket/tcpassembly"
13+
"github.com/google/gopacket/tcpassembly/tcpreader"
14+
)
15+
16+
type bidirectionalStreamFactory struct {
17+
conns map[string]*bidirectionalStream
18+
requestAndResponseChannel *chan httpRequestAndResponse
19+
}
20+
21+
func (f *bidirectionalStreamFactory) New(netFlow, tcpFlow gopacket.Flow) tcpassembly.Stream {
22+
key := netFlow.FastHash() ^ tcpFlow.FastHash()
23+
24+
// The second time we see the same connection, it will be from the server to the client
25+
if conn, ok := f.conns[fmt.Sprint(key)]; ok {
26+
return &conn.serverToClient
27+
}
28+
29+
s := &bidirectionalStream{
30+
net: netFlow,
31+
transport: tcpFlow,
32+
clientToServer: tcpreader.NewReaderStream(),
33+
serverToClient: tcpreader.NewReaderStream(),
34+
requestAndResponseChannel: f.requestAndResponseChannel,
35+
}
36+
f.conns[fmt.Sprint(key)] = s
37+
go s.run()
38+
39+
// The first time we see the connection, it will be from the client to the server
40+
return &s.clientToServer
41+
}
42+
43+
type bidirectionalStream struct {
44+
net, transport gopacket.Flow
45+
clientToServer tcpreader.ReaderStream
46+
serverToClient tcpreader.ReaderStream
47+
requestAndResponseChannel *chan httpRequestAndResponse
48+
}
49+
50+
func (s *bidirectionalStream) run() {
51+
wg := sync.WaitGroup{}
52+
wg.Add(2)
53+
54+
requestChannel := make(chan *http.Request, 1)
55+
responseChannel := make(chan *http.Response, 1)
56+
57+
go func() {
58+
reader := bufio.NewReader(&s.clientToServer)
59+
for {
60+
request, err := http.ReadRequest(reader)
61+
if err == io.EOF {
62+
wg.Done()
63+
return
64+
} else if err != nil {
65+
continue
66+
}
67+
// RemoteAddr is not filled in by ReadRequest so we have to populate it ourselves
68+
request.RemoteAddr = fmt.Sprintf("%s:%s", s.net.Src().String(), s.transport.Src().String())
69+
responseBody := make([]byte, request.ContentLength)
70+
if request.ContentLength > 0 {
71+
io.ReadFull(request.Body, responseBody)
72+
}
73+
request.Body.Close()
74+
request.Body = io.NopCloser(bytes.NewReader(responseBody))
75+
requestChannel <- request
76+
77+
}
78+
}()
79+
80+
go func() {
81+
reader := bufio.NewReader(&s.serverToClient)
82+
for {
83+
response, err := http.ReadResponse(reader, nil)
84+
if err == io.ErrUnexpectedEOF {
85+
wg.Done()
86+
return
87+
} else if err != nil {
88+
continue
89+
}
90+
responseBody := make([]byte, response.ContentLength)
91+
if response.ContentLength > 0 {
92+
io.ReadFull(response.Body, responseBody)
93+
}
94+
response.Body.Close()
95+
response.Body = io.NopCloser(bytes.NewReader(responseBody))
96+
responseChannel <- response
97+
}
98+
}()
99+
100+
wg.Wait()
101+
102+
capturedRequest := <-requestChannel
103+
capturedResponse := <-responseChannel
104+
close(requestChannel)
105+
close(responseChannel)
106+
107+
*s.requestAndResponseChannel <- httpRequestAndResponse{
108+
request: capturedRequest,
109+
response: capturedResponse,
110+
src: s.net.Src().String(),
111+
dst: s.net.Dst().String(),
112+
srcPort: s.transport.Src().String(),
113+
dstPort: s.transport.Dst().String(),
114+
}
115+
}

src/main.go

Lines changed: 1 addition & 169 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package main
22

33
import (
4-
"bufio"
5-
"bytes"
64
"fmt"
75
"io"
86
"log"
@@ -11,179 +9,13 @@ import (
119
"net/http/httptest"
1210
"os"
1311
"strconv"
14-
"sync"
1512
"time"
1613

1714
firetail "github.com/FireTail-io/firetail-go-lib/middlewares/http"
18-
"github.com/google/gopacket"
19-
"github.com/google/gopacket/layers"
20-
"github.com/google/gopacket/pcap"
21-
"github.com/google/gopacket/tcpassembly"
22-
"github.com/google/gopacket/tcpassembly/tcpreader"
2315
)
2416

25-
type httpRequestAndResponse struct {
26-
request *http.Request
27-
response *http.Response
28-
src string
29-
dst string
30-
srcPort string
31-
dstPort string
32-
}
33-
34-
type httpRequestAndResponseStreamer struct {
35-
bpfExpression string
36-
requestAndResponseChannel *chan httpRequestAndResponse
37-
ipManager *serviceIpManager
38-
}
39-
40-
func (s *httpRequestAndResponseStreamer) start() {
41-
handle, err := pcap.OpenLive("any", 1600, true, pcap.BlockForever)
42-
if err != nil {
43-
log.Fatal(err)
44-
}
45-
defer handle.Close()
46-
47-
err = handle.SetBPFFilter(s.bpfExpression)
48-
if err != nil {
49-
log.Fatal(err)
50-
}
51-
52-
assembler := tcpassembly.NewAssembler(
53-
tcpassembly.NewStreamPool(
54-
&bidirectionalStreamFactory{
55-
conns: make(map[string]*bidirectionalStream),
56-
requestAndResponseChannel: s.requestAndResponseChannel,
57-
},
58-
),
59-
)
60-
ticker := time.Tick(time.Minute)
61-
packetsChannel := gopacket.NewPacketSource(handle, handle.LinkType()).Packets()
62-
for {
63-
select {
64-
case packet := <-packetsChannel:
65-
if packet.NetworkLayer() == nil || packet.TransportLayer() == nil {
66-
continue
67-
}
68-
tcp, ok := packet.TransportLayer().(*layers.TCP)
69-
if !ok {
70-
continue
71-
}
72-
assembler.AssembleWithTimestamp(packet.NetworkLayer().NetworkFlow(), tcp, packet.Metadata().Timestamp)
73-
case <-ticker:
74-
assembler.FlushOlderThan(time.Now().Add(-2 * time.Minute))
75-
default:
76-
}
77-
}
78-
}
79-
80-
type bidirectionalStreamFactory struct {
81-
conns map[string]*bidirectionalStream
82-
requestAndResponseChannel *chan httpRequestAndResponse
83-
}
84-
85-
func (f *bidirectionalStreamFactory) New(netFlow, tcpFlow gopacket.Flow) tcpassembly.Stream {
86-
key := netFlow.FastHash() ^ tcpFlow.FastHash()
87-
88-
// The second time we see the same connection, it will be from the server to the client
89-
if conn, ok := f.conns[fmt.Sprint(key)]; ok {
90-
return &conn.serverToClient
91-
}
92-
93-
s := &bidirectionalStream{
94-
net: netFlow,
95-
transport: tcpFlow,
96-
clientToServer: tcpreader.NewReaderStream(),
97-
serverToClient: tcpreader.NewReaderStream(),
98-
requestAndResponseChannel: f.requestAndResponseChannel,
99-
}
100-
f.conns[fmt.Sprint(key)] = s
101-
go s.run()
102-
103-
// The first time we see the connection, it will be from the client to the server
104-
return &s.clientToServer
105-
}
106-
107-
type bidirectionalStream struct {
108-
net, transport gopacket.Flow
109-
clientToServer tcpreader.ReaderStream
110-
serverToClient tcpreader.ReaderStream
111-
requestAndResponseChannel *chan httpRequestAndResponse
112-
}
113-
114-
func (s *bidirectionalStream) run() {
115-
wg := sync.WaitGroup{}
116-
wg.Add(2)
117-
118-
requestChannel := make(chan *http.Request, 1)
119-
responseChannel := make(chan *http.Response, 1)
120-
121-
go func() {
122-
reader := bufio.NewReader(&s.clientToServer)
123-
for {
124-
request, err := http.ReadRequest(reader)
125-
if err == io.EOF {
126-
wg.Done()
127-
return
128-
} else if err != nil {
129-
continue
130-
}
131-
// RemoteAddr is not filled in by ReadRequest so we have to populate it ourselves
132-
request.RemoteAddr = fmt.Sprintf("%s:%s", s.net.Src().String(), s.transport.Src().String())
133-
responseBody := make([]byte, request.ContentLength)
134-
if request.ContentLength > 0 {
135-
io.ReadFull(request.Body, responseBody)
136-
}
137-
request.Body.Close()
138-
request.Body = io.NopCloser(bytes.NewReader(responseBody))
139-
requestChannel <- request
140-
141-
}
142-
}()
143-
144-
go func() {
145-
reader := bufio.NewReader(&s.serverToClient)
146-
for {
147-
response, err := http.ReadResponse(reader, nil)
148-
if err == io.ErrUnexpectedEOF {
149-
wg.Done()
150-
return
151-
} else if err != nil {
152-
continue
153-
}
154-
responseBody := make([]byte, response.ContentLength)
155-
if response.ContentLength > 0 {
156-
io.ReadFull(response.Body, responseBody)
157-
}
158-
response.Body.Close()
159-
response.Body = io.NopCloser(bytes.NewReader(responseBody))
160-
responseChannel <- response
161-
}
162-
}()
163-
164-
wg.Wait()
165-
166-
capturedRequest := <-requestChannel
167-
capturedResponse := <-responseChannel
168-
close(requestChannel)
169-
close(responseChannel)
170-
171-
*s.requestAndResponseChannel <- httpRequestAndResponse{
172-
request: capturedRequest,
173-
response: capturedResponse,
174-
src: s.net.Src().String(),
175-
dst: s.net.Dst().String(),
176-
srcPort: s.transport.Src().String(),
177-
dstPort: s.transport.Dst().String(),
178-
}
179-
}
180-
18117
func main() {
182-
devEnabled, err := strconv.ParseBool(os.Getenv("FIRETAIL_KUBERNETES_SENSOR_DEV_MODE"))
183-
if err != nil {
184-
devEnabled = false
185-
}
186-
18+
devEnabled, _ := strconv.ParseBool(os.Getenv("FIRETAIL_KUBERNETES_SENSOR_DEV_MODE"))
18719
if devEnabled {
18820
slog.Warn("🧰 Development mode enabled, setting log level to debug...")
18921
slog.SetLogLoggerLevel(slog.LevelDebug)

src/requestAndResponse.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package main
2+
3+
import (
4+
"log"
5+
"net/http"
6+
"time"
7+
8+
"github.com/google/gopacket"
9+
"github.com/google/gopacket/layers"
10+
"github.com/google/gopacket/pcap"
11+
"github.com/google/gopacket/tcpassembly"
12+
)
13+
14+
type httpRequestAndResponse struct {
15+
request *http.Request
16+
response *http.Response
17+
src string
18+
dst string
19+
srcPort string
20+
dstPort string
21+
}
22+
23+
type httpRequestAndResponseStreamer struct {
24+
bpfExpression string
25+
requestAndResponseChannel *chan httpRequestAndResponse
26+
ipManager *serviceIpManager
27+
}
28+
29+
func (s *httpRequestAndResponseStreamer) start() {
30+
handle, err := pcap.OpenLive("any", 1600, true, pcap.BlockForever)
31+
if err != nil {
32+
log.Fatal(err)
33+
}
34+
defer handle.Close()
35+
36+
err = handle.SetBPFFilter(s.bpfExpression)
37+
if err != nil {
38+
log.Fatal(err)
39+
}
40+
41+
assembler := tcpassembly.NewAssembler(
42+
tcpassembly.NewStreamPool(
43+
&bidirectionalStreamFactory{
44+
conns: make(map[string]*bidirectionalStream),
45+
requestAndResponseChannel: s.requestAndResponseChannel,
46+
},
47+
),
48+
)
49+
ticker := time.Tick(time.Minute)
50+
packetsChannel := gopacket.NewPacketSource(handle, handle.LinkType()).Packets()
51+
for {
52+
select {
53+
case packet := <-packetsChannel:
54+
if packet.NetworkLayer() == nil || packet.TransportLayer() == nil {
55+
continue
56+
}
57+
tcp, ok := packet.TransportLayer().(*layers.TCP)
58+
if !ok {
59+
continue
60+
}
61+
assembler.AssembleWithTimestamp(packet.NetworkLayer().NetworkFlow(), tcp, packet.Metadata().Timestamp)
62+
case <-ticker:
63+
assembler.FlushOlderThan(time.Now().Add(-2 * time.Minute))
64+
default:
65+
}
66+
}
67+
}

0 commit comments

Comments
 (0)