Skip to content

Commit 8806558

Browse files
authored
Merge pull request #1162 from safchain/auth-by-key
split agent / analyzer and api auth
2 parents 9325224 + 6628f9f commit 8806558

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

79 files changed

+1305
-947
lines changed

agent/agent.go

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func NewAnalyzerWSStructClientPool(authOptions *shttp.AuthenticationOpts) (*shtt
7171

7272
addresses, err := config.GetAnalyzerServiceAddresses()
7373
if err != nil {
74-
return nil, fmt.Errorf("Unable to get the analyzers list: %s", err.Error())
74+
return nil, fmt.Errorf("Unable to get the analyzers list: %s", err)
7575
}
7676

7777
if len(addresses) == 0 {
@@ -80,8 +80,7 @@ func NewAnalyzerWSStructClientPool(authOptions *shttp.AuthenticationOpts) (*shtt
8080
}
8181

8282
for _, sa := range addresses {
83-
authClient := shttp.NewAuthenticationClient(config.GetURL("http", sa.Addr, sa.Port, ""), authOptions)
84-
c := shttp.NewWSClientFromConfig(common.AgentService, config.GetURL("ws", sa.Addr, sa.Port, "/ws/agent"), authClient, nil)
83+
c := shttp.NewWSClientFromConfig(common.AgentService, config.GetURL("ws", sa.Addr, sa.Port, "/ws/agent"), authOptions, nil)
8584
pool.AddClient(c)
8685
}
8786

@@ -172,20 +171,28 @@ func NewAgent() (*Agent, error) {
172171
tm := topology.NewTIDMapper(g)
173172
tm.Start()
174173

174+
apiAuthBackendName := config.GetString("agent.auth.api.backend")
175+
apiAuthBackend, err := shttp.NewAuthenticationBackendByName(apiAuthBackendName)
176+
if err != nil {
177+
return nil, err
178+
}
179+
175180
hserver, err := shttp.NewServerFromConfig(common.AgentService)
176181
if err != nil {
177182
return nil, err
178183
}
179184

185+
hserver.RegisterLoginRoute(apiAuthBackend)
186+
180187
if err := hserver.Listen(); err != nil {
181188
return nil, err
182189
}
183190

184-
if _, err = api.NewAPI(hserver, nil, common.AgentService); err != nil {
191+
if _, err = api.NewAPI(hserver, nil, common.AgentService, apiAuthBackend); err != nil {
185192
return nil, err
186193
}
187194

188-
wsServer := shttp.NewWSStructServer(shttp.NewWSServer(hserver, "/ws/subscriber"))
195+
wsServer := shttp.NewWSStructServer(shttp.NewWSServer(hserver, "/ws/subscriber", apiAuthBackend))
189196

190197
// declare all extension available throught API and filtering
191198
tr := traversal.NewGremlinTraversalParser()
@@ -198,13 +205,16 @@ func NewAgent() (*Agent, error) {
198205
return nil, err
199206
}
200207

201-
api.RegisterTopologyAPI(hserver, g, tr)
208+
api.RegisterTopologyAPI(hserver, g, tr, apiAuthBackend)
202209

203-
authOptions := analyzer.NewAnalyzerAuthenticationOpts()
210+
clusterAuthOptions := &shttp.AuthenticationOpts{
211+
Username: config.GetString("agent.auth.cluster.username"),
212+
Password: config.GetString("agent.auth.cluster.password"),
213+
}
204214

205215
topologyEndpoint := topology.NewTopologySubscriberEndpoint(wsServer, g, tr)
206216

207-
analyzerClientPool, err := NewAnalyzerWSStructClientPool(authOptions)
217+
analyzerClientPool, err := NewAnalyzerWSStructClientPool(clusterAuthOptions)
208218
if err != nil {
209219
return nil, err
210220
}
@@ -239,7 +249,7 @@ func NewAgent() (*Agent, error) {
239249

240250
onDemandProbeServer, err := ondemand.NewOnDemandProbeServer(flowProbeBundle, g, analyzerClientPool)
241251
if err != nil {
242-
return nil, fmt.Errorf("Unable to initialize on-demand flow probe %s", err.Error())
252+
return nil, fmt.Errorf("Unable to initialize on-demand flow probe %s", err)
243253
}
244254

245255
agent := &Agent{
@@ -259,7 +269,7 @@ func NewAgent() (*Agent, error) {
259269
topologyForwarder: tforwarder,
260270
}
261271

262-
api.RegisterStatusAPI(hserver, agent)
272+
api.RegisterStatusAPI(hserver, agent, apiAuthBackend)
263273

264274
return agent, nil
265275
}

analyzer/flow_client.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import (
2727
"math/rand"
2828
"net"
2929
"net/url"
30-
"strconv"
3130
"strings"
3231
"time"
3332

@@ -108,11 +107,8 @@ func (c *FlowClientWebSocketConn) Close() error {
108107

109108
// Connect to the WebSocket flow server
110109
func (c *FlowClientWebSocketConn) Connect() error {
111-
authOptions := NewAnalyzerAuthenticationOpts()
112-
authAddr := common.NormalizeAddrForURL(c.url.Hostname())
113-
authPort, _ := strconv.Atoi(c.url.Port())
114-
authClient := shttp.NewAuthenticationClient(config.GetURL("http", authAddr, authPort, ""), authOptions)
115-
c.wsClient = shttp.NewWSClientFromConfig(common.AgentService, c.url, authClient, nil)
110+
authOptions := AnalyzerClusterAuthenticationOpts()
111+
c.wsClient = shttp.NewWSClientFromConfig(common.AgentService, c.url, authOptions, nil)
116112
c.wsClient.Connect()
117113
c.wsClient.AddEventHandler(c)
118114

analyzer/flow_server.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ type FlowServerWebSocketConn struct {
8282
timeOfLastLostFlowsLog time.Time
8383
numOfLostFlows int
8484
maxFlowBufferSize int
85+
auth shttp.AuthenticationBackend
8586
}
8687

8788
// FlowServer describes a flow server with pipeline enhancers mechanism
@@ -96,6 +97,7 @@ type FlowServer struct {
9697
bulkInsertDeadline time.Duration
9798
ch chan *flow.Flow
9899
quit chan struct{}
100+
auth shttp.AuthenticationBackend
99101
}
100102

101103
// OnMessage event
@@ -122,7 +124,7 @@ func (c *FlowServerWebSocketConn) OnMessage(client shttp.WSSpeaker, m shttp.WSMe
122124
// Serve starts a WebSocket flow server
123125
func (c *FlowServerWebSocketConn) Serve(ch chan *flow.Flow, quit chan struct{}, wg *sync.WaitGroup) {
124126
c.ch = ch
125-
server := shttp.NewWSServer(c.server, "/ws/flow")
127+
server := shttp.NewWSServer(c.server, "/ws/flow", c.auth)
126128
server.AddEventHandler(c)
127129
go func() {
128130
server.Start()
@@ -132,9 +134,9 @@ func (c *FlowServerWebSocketConn) Serve(ch chan *flow.Flow, quit chan struct{},
132134
}
133135

134136
// NewFlowServerWebSocketConn returns a new WebSocket flow server
135-
func NewFlowServerWebSocketConn(server *shttp.Server) (*FlowServerWebSocketConn, error) {
137+
func NewFlowServerWebSocketConn(server *shttp.Server, auth shttp.AuthenticationBackend) (*FlowServerWebSocketConn, error) {
136138
flowsMax := config.GetConfig().GetInt("analyzer.flow.max_buffer_size")
137-
return &FlowServerWebSocketConn{server: server, maxFlowBufferSize: flowsMax}, nil
139+
return &FlowServerWebSocketConn{server: server, maxFlowBufferSize: flowsMax, auth: auth}, nil
138140
}
139141

140142
// Serve UDP connections
@@ -273,23 +275,23 @@ func (s *FlowServer) setupBulkConfigFromBackend() error {
273275
}
274276

275277
// NewFlowServer creates a new flow server listening at address/port, based on configuration
276-
func NewFlowServer(s *shttp.Server, g *graph.Graph, store storage.Storage, probe *probe.ProbeBundle) (*FlowServer, error) {
278+
func NewFlowServer(s *shttp.Server, g *graph.Graph, store storage.Storage, probe *probe.ProbeBundle, auth shttp.AuthenticationBackend) (*FlowServer, error) {
277279
pipeline := flow.NewEnhancerPipeline(enhancers.NewGraphFlowEnhancer(g))
278280

279281
// check that the neutron probe is loaded if so add the neutron flow enhancer
280282
if probe.GetProbe("neutron") != nil {
281283
pipeline.AddEnhancer(enhancers.NewNeutronFlowEnhancer(g))
282284
}
283285

284-
var err error
285286
var conn FlowServerConn
286287
protocol := strings.ToLower(config.GetString("flow.protocol"))
287288

289+
var err error
288290
switch protocol {
289291
case "udp":
290292
conn, err = NewFlowServerUDPConn(s.Addr, s.Port)
291293
case "websocket":
292-
conn, err = NewFlowServerWebSocketConn(s)
294+
conn, err = NewFlowServerWebSocketConn(s, auth)
293295
default:
294296
err = fmt.Errorf("Invalid protocol %s", protocol)
295297
}
@@ -304,6 +306,7 @@ func NewFlowServer(s *shttp.Server, g *graph.Graph, store storage.Storage, probe
304306
enhancerPipelineConfig: flow.NewEnhancerPipelineConfig(),
305307
conn: conn,
306308
quit: make(chan struct{}, 2),
309+
auth: auth,
307310
}
308311
err = fs.setupBulkConfigFromBackend()
309312
if err != nil {

analyzer/server.go

Lines changed: 60 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -76,12 +76,13 @@ func (s *Server) GetStatus() interface{} {
7676
Incomers: make(map[string]shttp.WSConnStatus),
7777
Outgoers: make(map[string]shttp.WSConnStatus),
7878
}
79-
for host, peer := range s.replicationEndpoint.conns {
80-
if host == peer.GetHost() {
81-
peersStatus.Incomers[host] = peer.GetStatus()
82-
} else {
83-
peersStatus.Outgoers[host] = peer.GetStatus()
84-
}
79+
80+
for _, speaker := range s.replicationEndpoint.in.GetSpeakers() {
81+
peersStatus.Incomers[speaker.GetRemoteHost()] = speaker.GetStatus()
82+
}
83+
84+
for _, speaker := range s.replicationEndpoint.out.GetSpeakers() {
85+
peersStatus.Outgoers[speaker.GetRemoteHost()] = speaker.GetStatus()
8586
}
8687

8788
return &types.AnalyzerStatus{
@@ -190,7 +191,7 @@ func NewServerFromConfig() (*Server, error) {
190191
for {
191192
host := config.GetString("host_id")
192193
if err = etcdClient.SetInt64(fmt.Sprintf("/analyzer:%s/start-time", host), time.Now().Unix()); err != nil {
193-
logging.GetLogger().Errorf("Etcd server not ready: %s", err.Error())
194+
logging.GetLogger().Errorf("Etcd server not ready: %s", err)
194195
time.Sleep(time.Second)
195196
} else {
196197
break
@@ -228,83 +229,97 @@ func NewServerFromConfig() (*Server, error) {
228229

229230
g := graph.NewGraphFromConfig(cached, common.AnalyzerService)
230231

231-
authOptions := NewAnalyzerAuthenticationOpts()
232+
clusterAuthOptions := AnalyzerClusterAuthenticationOpts()
233+
234+
clusterAuthBackendName := config.GetString("analyzer.auth.cluster.backend")
235+
clusterAuthBackend, err := shttp.NewAuthenticationBackendByName(clusterAuthBackendName)
236+
if err != nil {
237+
return nil, err
238+
}
239+
// force admin user for the cluster backend to ensure that all the user connection through
240+
// "cluster" endpoints will be admin
241+
clusterAuthBackend.SetDefaultUserRole("admin")
232242

233-
agentWSServer := shttp.NewWSStructServer(shttp.NewWSServer(hserver, "/ws/agent"))
234-
_, err = NewTopologyAgentEndpoint(agentWSServer, authOptions, cached, g)
243+
apiAuthBackendName := config.GetString("analyzer.auth.api.backend")
244+
apiAuthBackend, err := shttp.NewAuthenticationBackendByName(apiAuthBackendName)
235245
if err != nil {
236246
return nil, err
237247
}
238248

239-
publisherWSServer := shttp.NewWSStructServer(shttp.NewWSServer(hserver, "/ws/publisher"))
240-
_, err = NewTopologyPublisherEndpoint(publisherWSServer, authOptions, g)
249+
hserver.RegisterLoginRoute(apiAuthBackend)
250+
251+
agentWSServer := shttp.NewWSStructServer(shttp.NewWSServer(hserver, "/ws/agent", clusterAuthBackend))
252+
_, err = NewTopologyAgentEndpoint(agentWSServer, cached, g)
241253
if err != nil {
242254
return nil, err
243255
}
244256

245-
replicationWSServer := shttp.NewWSStructServer(shttp.NewWSServer(hserver, "/ws/replication"))
246-
replicationEndpoint, err := NewTopologyReplicationEndpoint(replicationWSServer, authOptions, cached, g)
257+
publisherWSServer := shttp.NewWSStructServer(shttp.NewWSServer(hserver, "/ws/publisher", apiAuthBackend))
258+
_, err = NewTopologyPublisherEndpoint(publisherWSServer, g)
247259
if err != nil {
248260
return nil, err
249261
}
250262

251263
tableClient := flow.NewTableClient(agentWSServer)
252264

253265
storage, err := storage.NewStorageFromConfig(etcdClient)
266+
267+
replicationWSServer := shttp.NewWSStructServer(shttp.NewWSServer(hserver, "/ws/replication", clusterAuthBackend))
268+
replicationEndpoint, err := NewTopologyReplicationEndpoint(replicationWSServer, clusterAuthOptions, cached, g)
254269
if err != nil {
255270
return nil, err
256271
}
257272

258-
// declare all extension available throught API and filtering
273+
// declare all extension available through API and filtering
259274
tr := traversal.NewGremlinTraversalParser()
260275
tr.AddTraversalExtension(ge.NewMetricsTraversalExtension())
261276
tr.AddTraversalExtension(ge.NewRawPacketsTraversalExtension())
262277
tr.AddTraversalExtension(ge.NewFlowTraversalExtension(tableClient, storage))
263278
tr.AddTraversalExtension(ge.NewSocketsTraversalExtension())
264279
tr.AddTraversalExtension(ge.NewDescendantsTraversalExtension())
265280

266-
subscriberWSServer := shttp.NewWSStructServer(shttp.NewWSServer(hserver, "/ws/subscriber"))
281+
subscriberWSServer := shttp.NewWSStructServer(shttp.NewWSServer(hserver, "/ws/subscriber", apiAuthBackend))
267282
topology.NewTopologySubscriberEndpoint(subscriberWSServer, g, tr)
268283

269284
probeBundle, err := NewTopologyProbeBundleFromConfig(g)
270285
if err != nil {
271286
return nil, err
272287
}
273288

274-
apiServer, err := api.NewAPI(hserver, etcdClient.KeysAPI, common.AnalyzerService)
289+
apiServer, err := api.NewAPI(hserver, etcdClient.KeysAPI, common.AnalyzerService, apiAuthBackend)
275290
if err != nil {
276291
return nil, err
277292
}
278293

279-
captureAPIHandler, err := api.RegisterCaptureAPI(apiServer, g)
294+
captureAPIHandler, err := api.RegisterCaptureAPI(apiServer, g, apiAuthBackend)
280295
if err != nil {
281296
return nil, err
282297
}
283298

284-
metadataAPIHandler, err := api.RegisterUserMetadataAPI(apiServer, g)
299+
metadataAPIHandler, err := api.RegisterUserMetadataAPI(apiServer, g, apiAuthBackend)
285300
if err != nil {
286301
return nil, err
287302
}
288303

289-
piAPIHandler, err := api.RegisterPacketInjectorAPI(g, apiServer)
304+
piAPIHandler, err := api.RegisterPacketInjectorAPI(g, apiServer, apiAuthBackend)
290305
if err != nil {
291306
return nil, err
292307
}
293308
piClient := packet_injector.NewPacketInjectorClient(agentWSServer, etcdClient, piAPIHandler, g)
294309

295-
if _, err = api.RegisterAlertAPI(apiServer); err != nil {
310+
if _, err = api.RegisterAlertAPI(apiServer, apiAuthBackend); err != nil {
296311
return nil, err
297312
}
298313

299-
if _, err := api.RegisterWorkflowAPI(apiServer); err != nil {
314+
if _, err := api.RegisterWorkflowAPI(apiServer, apiAuthBackend); err != nil {
300315
return nil, err
301316
}
302317

303318
onDemandClient := ondemand.NewOnDemandProbeClient(g, captureAPIHandler, agentWSServer, subscriberWSServer, etcdClient)
304319

305320
metadataManager := metadata.NewUserMetadataManager(g, metadataAPIHandler)
306321

307-
flowServer, err := NewFlowServer(hserver, g, storage, probeBundle)
322+
flowServer, err := NewFlowServer(hserver, g, storage, probeBundle, clusterAuthBackend)
308323
if err != nil {
309324
return nil, err
310325
}
@@ -334,22 +349,34 @@ func NewServerFromConfig() (*Server, error) {
334349

335350
s.createStartupCapture(captureAPIHandler)
336351

337-
api.RegisterTopologyAPI(hserver, g, tr)
338-
api.RegisterPcapAPI(hserver, storage)
339-
api.RegisterConfigAPI(hserver)
340-
api.RegisterStatusAPI(hserver, s)
352+
api.RegisterTopologyAPI(hserver, g, tr, apiAuthBackend)
353+
api.RegisterPcapAPI(hserver, storage, apiAuthBackend)
354+
api.RegisterConfigAPI(hserver, apiAuthBackend)
355+
api.RegisterStatusAPI(hserver, s, apiAuthBackend)
341356

342-
if err := dede.RegisterHandler("terminal", "/dede", hserver.Router); err != nil {
343-
return nil, err
357+
if config.GetBool("analyzer.ssh_enabled") {
358+
if err := dede.RegisterHandler("terminal", "/dede", hserver.Router); err != nil {
359+
return nil, err
360+
}
361+
}
362+
363+
// server index for the following url as the client side will redirect
364+
// the user to the correct page
365+
routes := []shttp.Route{
366+
{Path: "/topology", Method: "GET", HandlerFunc: hserver.ServeIndex},
367+
{Path: "/conversation", Method: "GET", HandlerFunc: hserver.ServeIndex},
368+
{Path: "/discovery", Method: "GET", HandlerFunc: hserver.ServeIndex},
369+
{Path: "/preference", Method: "GET", HandlerFunc: hserver.ServeIndex},
370+
{Path: "/status", Method: "GET", HandlerFunc: hserver.ServeIndex},
344371
}
372+
hserver.RegisterRoutes(routes, shttp.NewNoAuthenticationBackend())
345373

346374
return s, nil
347375
}
348376

349-
// NewAnalyzerAuthenticationOpts returns an object to authenticate to the analyzer
350-
func NewAnalyzerAuthenticationOpts() *shttp.AuthenticationOpts {
377+
func AnalyzerClusterAuthenticationOpts() *shttp.AuthenticationOpts {
351378
return &shttp.AuthenticationOpts{
352-
Username: config.GetString("auth.analyzer_username"),
353-
Password: config.GetString("auth.analyzer_password"),
379+
Username: config.GetString("analyzer.auth.cluster.username"),
380+
Password: config.GetString("analyzer.auth.cluster.password"),
354381
}
355382
}

0 commit comments

Comments
 (0)