@@ -2,11 +2,9 @@ package main
22
33import (
44 "bufio"
5- "encoding/json"
65 "flag"
76 "fmt"
87 "io"
9- "io/ioutil"
108 "log"
119 "net"
1210 "net/http"
@@ -17,22 +15,12 @@ import (
1715 "strings"
1816 "sync"
1917 "syscall"
20- "time"
2118
2219 "github.com/prometheus/client_golang/prometheus"
2320 "github.com/prometheus/client_golang/prometheus/promhttp"
2421 "go.uber.org/zap"
2522)
2623
27- type status struct {
28- sync.Mutex
29- Ready bool
30- dataProcessed bool
31-
32- timestampLastProcessed time.Time
33- IdleTimeMilliSecs int64
34- }
35-
3624func parsePorts (portsStr string , lg * zap.Logger ) []int {
3725 var ports []int
3826
@@ -63,66 +51,87 @@ func parsePorts(portsStr string, lg *zap.Logger) []int {
6351 ports = append (ports , i )
6452 }
6553 default :
66- lg .Fatal ("wrong ports argument" )
67-
54+ lg .Fatal ("invalid ports argument" )
6855 }
6956 }
7057
7158 return ports
7259}
7360
74- func setupStatusServer (localAPIPort int , currentStatus * status , lg * zap.Logger ) {
75- http .HandleFunc ("/status" , func (w http.ResponseWriter , req * http.Request ) {
76- currentStatus .Lock ()
77- defer currentStatus .Unlock ()
78- if currentStatus .dataProcessed {
79- currentStatus .IdleTimeMilliSecs = time .Since (currentStatus .timestampLastProcessed ).Milliseconds ()
80- }
81- data , err := json .Marshal (currentStatus )
82- if err != nil {
83- lg .Error ("error when json marshaling status" , zap .Any ("status" , currentStatus ), zap .Error (err ))
84- }
85- fmt .Fprint (w , string (data ))
86- })
87- if localAPIPort != 0 {
88- go func () {
89- err := http .ListenAndServe (fmt .Sprintf (":%d" , localAPIPort ), nil )
90- if err != nil {
91- lg .Fatal ("failed to start the status server" , zap .Error (err ))
92- }
93- lg .Info ("status server running" , zap .Int ("port" , localAPIPort ))
94- }()
95- }
96-
97- }
98-
9961type metrics struct {
100- inRecs prometheus.Counter
62+ inRecs prometheus.Counter
63+ timeOfLastWrite prometheus.Gauge
64+ nOpenPorts prometheus.Gauge
10165}
10266
103- func setupMetrics () * metrics {
67+ func setupMetrics (lg * zap.Logger ) * metrics {
68+ ns := "receiver"
10469 ms := metrics {
10570 inRecs : prometheus .NewCounter (prometheus.CounterOpts {
106- Namespace : "receiver" ,
71+ Namespace : ns ,
10772 Name : "in_records_total" ,
10873 Help : "Incoming records counter." ,
10974 }),
75+ timeOfLastWrite : prometheus .NewGauge (prometheus.GaugeOpts {
76+ Namespace : ns ,
77+ Name : "time_of_last_write" ,
78+ Help : "Time of last write to the port dump file." ,
79+ }),
80+ nOpenPorts : prometheus .NewGauge (prometheus.GaugeOpts {
81+ Namespace : ns ,
82+ Name : "n_open_ports" ,
83+ Help : "Number of opened ports." ,
84+ }),
11085 }
11186 err := prometheus .Register (ms .inRecs )
11287 if err != nil {
113- log .Fatalf ("error registering the in_records_total metric: %v" , err )
88+ lg .Fatal ("error registering the metric" , zap .String ("metric" , "in_records_total" ),
89+ zap .Error (err ))
90+ }
91+ err = prometheus .Register (ms .timeOfLastWrite )
92+ if err != nil {
93+ lg .Fatal ("error registering the metric" , zap .String ("metric" , "time_of_last_write" ),
94+ zap .Error (err ))
95+ }
96+ err = prometheus .Register (ms .nOpenPorts )
97+ if err != nil {
98+ lg .Fatal ("error registering the metric" , zap .String ("metric" , "n_open_ports" ),
99+ zap .Error (err ))
114100 }
115101
116102 return & ms
117103}
118104
105+ func openFiles (outDir string , outPrefix string , ports []int , lg * zap.Logger ) map [int ]* os.File {
106+ fs := make (map [int ]* os.File )
107+
108+ for _ , p := range ports {
109+ fPath := fmt .Sprintf ("%s/%s%d" , outDir , outPrefix , p )
110+ f , err := os .Create (fPath )
111+ if err != nil {
112+ lg .Fatal ("failed to create file" , zap .String ("path" , fPath ), zap .Error (err ))
113+ }
114+ fs [p ] = f
115+ }
116+
117+ return fs
118+ }
119+
120+ func closeFiles (fs map [int ]* os.File , lg * zap.Logger ) {
121+ for p , f := range fs {
122+ err := f .Close ()
123+ if err != nil {
124+ lg .Error ("could not close file for port" , zap .Int ("port" , p ), zap .Error (err ))
125+ }
126+ }
127+ }
128+
119129func main () {
120130 portsStr := flag .String ("ports" , "" , `List of the ports to listen on. Has to be supplied in the from "XXXX YYYY ZZZZ AAAA-BBBB" in quotes.` )
121131 outPrefix := flag .String ("prefix" , "" , "Prefix for the output files." )
122132 outDir := flag .String ("outdir" , "" , "Output directory. Absolute path. Optional." )
123- profiler := flag .String ("profiler" , "" , "Where should the profiler listen?" )
124- localAPIPort := flag .Int ("local-api-port" , 0 , "specify which port the local HTTP API should be listening on" )
125- promPort := flag .Int ("prom-port" , 0 , "Prometheus port. If unset, Prometheus metrics are not exposed." )
133+ profPort := flag .Int ("profPort" , 0 , "Where should the profiler listen?" )
134+ promPort := flag .Int ("promPort" , 0 , "Prometheus port. If unset, Prometheus metrics are not exposed." )
126135
127136 flag .Parse ()
128137
@@ -131,59 +140,62 @@ func main() {
131140 log .Fatal ("failed to create logger: " , err )
132141 }
133142
134- if * portsStr == "" {
135- lg .Fatal ("please supply the ports argument" )
136- }
137-
138- ms := setupMetrics ()
143+ ms := setupMetrics (lg )
139144
140145 if * promPort != 0 {
141- go func () {
142- l , err := net .Listen ("tcp" , fmt .Sprintf (":%d" , * promPort ))
143- if err != nil {
144- lg .Error ("opening TCP port for Prometheus failed" , zap .Error (err ))
145- }
146- err = http .Serve (l , promhttp .Handler ())
147- if err != nil {
148- lg .Error ("prometheus server failed" , zap .Error (err ))
149- }
150- }()
146+ go promListen (* promPort , lg )
151147 }
152148
153- if * profiler != "" {
149+ if * profPort != 0 {
154150 go func () {
155- lg .Info ("profiler server exited" , zap .Error (http .ListenAndServe (* profiler , nil )))
151+ lg .Info ("profiler server exited" , zap .Error (http .ListenAndServe (fmt . Sprintf ( ":%d" , * profPort ) , nil )))
156152 }()
157153 }
154+
158155 ports := parsePorts (* portsStr , lg )
156+ var fs map [int ]* os.File
157+ if * outDir != "" {
158+ fs = openFiles (* outDir , * outPrefix , ports , lg )
159+ defer closeFiles (fs , lg )
160+ }
161+ ls := openPorts (ports , lg )
159162
160- currentStatus := & status {sync.Mutex {}, false , false , time .Now (), 0 }
163+ ms .nOpenPorts .Set (float64 (len (ls )))
164+
165+ stop := make (chan struct {})
166+
167+ var portsWG sync.WaitGroup
168+ for _ , p := range ports {
169+ portsWG .Add (1 )
161170
162- if * localAPIPort != 0 {
163- setupStatusServer (* localAPIPort , currentStatus , lg )
171+ go listen (ls [p ], p , stop , & portsWG , fs , ms , lg )
164172 }
165173
166- fs := make (map [int ]io.Writer )
174+ sgn := make (chan os.Signal , 1 )
175+ signal .Notify (sgn , os .Interrupt , syscall .SIGTERM )
176+ <- sgn
167177
168- for _ , p := range ports {
169- fs [p ] = ioutil .Discard
170- if * outDir != "" {
171- fPath := fmt .Sprintf ("%s/%s%d" , * outDir , * outPrefix , p )
172- f , err := os .Create (fPath )
173- if err != nil {
174- lg .Fatal ("failed to create file" , zap .String ("path" , fPath ), zap .Error (err ))
175- }
176- defer func (p int ) {
177- err := f .Close ()
178- if err != nil {
179- lg .Error ("could not close file for port" , zap .Int ("port" , p ), zap .Error (err ))
180- }
178+ // start termination sequence
179+ close (stop )
181180
182- }(p )
183- fs [p ] = f
184- }
181+ if * outDir == "" {
182+ os .Exit (0 )
185183 }
184+ portsWG .Wait ()
185+ }
186+
187+ func promListen (promPort int , lg * zap.Logger ) {
188+ l , err := net .Listen ("tcp" , fmt .Sprintf (":%d" , promPort ))
189+ if err != nil {
190+ lg .Error ("opening TCP port for Prometheus failed" , zap .Error (err ))
191+ }
192+ err = http .Serve (l , promhttp .Handler ())
193+ if err != nil {
194+ lg .Error ("prometheus server failed" , zap .Error (err ))
195+ }
196+ }
186197
198+ func openPorts (ports []int , lg * zap.Logger ) map [int ]net.Listener {
187199 ls := make (map [int ]net.Listener )
188200 for _ , p := range ports {
189201 l , err := net .Listen ("tcp" , fmt .Sprintf (":%d" , p ))
@@ -193,80 +205,55 @@ func main() {
193205 ls [p ] = l
194206 }
195207
196- stop := make (chan struct {})
208+ return ls
209+ }
197210
198- var portsWG sync.WaitGroup
199- for _ , p := range ports {
200- portsWG .Add (1 )
211+ func listen (lst net.Listener , prt int , stop chan struct {}, portsWG * sync.WaitGroup , fs map [int ]* os.File , ms * metrics , lg * zap.Logger ) {
212+ defer portsWG .Done ()
213+ var connectionWG sync.WaitGroup
214+ out:
215+ for {
216+ connCh := make (chan net.Conn )
217+ go func () {
218+ conn , err := lst .Accept ()
219+ if err != nil {
220+ lg .Fatal ("failed to accept connection on addr %s: %v" , zap .String ("address" , lst .Addr ().String ()), zap .Error (err ))
221+ }
222+ connCh <- conn
223+ }()
201224
202- go func (lst net.Listener , prt int , stop chan struct {}) {
203- defer portsWG .Done ()
204- var connectionWG sync.WaitGroup
205- out:
206- for {
207- connCh := make (chan net.Conn )
208- go func () {
209- conn , err := lst .Accept ()
225+ select {
226+ case <- stop :
227+ break out
228+ case conn := <- connCh :
229+ connectionWG .Add (1 )
230+
231+ go func (conn net.Conn ) {
232+ defer connectionWG .Done ()
233+ defer func () {
234+ err := conn .Close ()
210235 if err != nil {
211- lg .Fatal ("failed to accept connection on addr %s: %v" , zap . String ( "address" , lst . Addr (). String ()) , zap .Error (err ))
236+ lg .Fatal ("connection close failed" , zap .Error (err ))
212237 }
213- connCh <- conn
214238 }()
215-
216- select {
217- case <- stop :
218- break out
219- case conn := <- connCh :
220- connectionWG .Add (1 )
221-
222- go func (conn net.Conn ) {
223- defer connectionWG .Done ()
224- defer func () {
225- err := conn .Close ()
226- if err != nil {
227- lg .Fatal ("connection close failed" , zap .Error (err ))
228- }
229- }()
230- scanner := bufio .NewScanner (conn )
231- scanner .Buffer (make ([]byte , bufio .MaxScanTokenSize * 100 ), bufio .MaxScanTokenSize )
232- if * outDir == "" {
233- for scanner .Scan () {
234- ms .inRecs .Inc ()
235- }
236- if err := scanner .Err (); err != nil {
237- lg .Info ("failed scan when reading data" , zap .Error (err ))
238- }
239- } else {
240- _ , err := io .Copy (fs [prt ], conn )
241- if err != nil {
242- lg .Error ("failed when dumping data" , zap .Error (err ))
243- }
244- }
245-
246- currentStatus .Lock ()
247- currentStatus .dataProcessed = true
248- currentStatus .timestampLastProcessed = time .Now ()
249- currentStatus .Unlock ()
250- }(conn )
239+ if fs == nil {
240+ scanner := bufio .NewScanner (conn )
241+ scanner .Buffer (make ([]byte , bufio .MaxScanTokenSize * 100 ), bufio .MaxScanTokenSize )
242+ for scanner .Scan () {
243+ ms .inRecs .Inc ()
244+ }
245+ if err := scanner .Err (); err != nil {
246+ lg .Info ("failed scan when reading data" , zap .Error (err ))
247+ }
248+ } else {
249+ _ , err := io .Copy (fs [prt ], conn )
250+ if err != nil {
251+ lg .Error ("failed when dumping data" , zap .Error (err ))
252+ }
253+ ms .timeOfLastWrite .SetToCurrentTime ()
251254 }
252- }
253- connectionWG .Wait ()
254- }(ls [p ], p , stop )
255- }
256-
257- currentStatus .Lock ()
258- currentStatus .Ready = true
259- currentStatus .Unlock ()
260-
261- sgn := make (chan os.Signal , 1 )
262- signal .Notify (sgn , os .Interrupt , syscall .SIGTERM )
263- <- sgn
264-
265- // start termination sequence
266- close (stop )
267-
268- if * outDir == "" {
269- os .Exit (0 )
255+ }(conn )
256+ }
270257 }
271- portsWG .Wait ()
258+ connectionWG .Wait ()
272259}
0 commit comments