@@ -6,32 +6,51 @@ import (
66	"fmt" 
77	"time" 
88
9+ 	"github.com/go-logr/logr" 
910	pb "github.com/nginx/agent/v3/api/grpc/mpi/v1" 
1011	"google.golang.org/grpc" 
12+ 
13+ 	agentgrpc "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc" 
14+ 	grpcContext "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc/context" 
1115)
1216
13- // commandService handles the connection and subscription to the agent. 
17+ // commandService handles the connection and subscription to the data plane  agent. 
1418type  commandService  struct  {
1519	pb.CommandServiceServer 
20+ 	connTracker  * agentgrpc.ConnectionsTracker 
21+ 	// TODO(sberman): all logs are at Info level right now. Adjust appropriately. 
22+ 	logger  logr.Logger 
1623}
1724
18- func  newCommandService () * commandService  {
19- 	return  & commandService {}
25+ func  newCommandService (logger  logr.Logger ) * commandService  {
26+ 	return  & commandService {
27+ 		logger :      logger ,
28+ 		connTracker : agentgrpc .NewConnectionsTracker (),
29+ 	}
2030}
2131
2232func  (cs  * commandService ) Register (server  * grpc.Server ) {
2333	pb .RegisterCommandServiceServer (server , cs )
2434}
2535
36+ // CreateConnection registers a data plane agent with the control plane. 
2637func  (cs  * commandService ) CreateConnection (
27- 	_  context.Context ,
38+ 	ctx  context.Context ,
2839	req  * pb.CreateConnectionRequest ,
2940) (* pb.CreateConnectionResponse , error ) {
3041	if  req  ==  nil  {
3142		return  nil , errors .New ("empty connection request" )
3243	}
3344
34- 	fmt .Printf ("Creating connection for nginx pod: %s\n " , req .GetResource ().GetContainerInfo ().GetHostname ())
45+ 	gi , ok  :=  grpcContext .GrpcInfoFromContext (ctx )
46+ 	if  ! ok  {
47+ 		return  nil , agentgrpc .ErrStatusInvalidConnection 
48+ 	}
49+ 
50+ 	hostname  :=  req .GetResource ().GetContainerInfo ().GetHostname ()
51+ 
52+ 	cs .logger .Info (fmt .Sprintf ("Creating connection for nginx pod: %s" , hostname ))
53+ 	cs .connTracker .Track (gi .IPAddress , hostname )
3554
3655	return  & pb.CreateConnectionResponse {
3756		Response : & pb.CommandResponse {
@@ -40,50 +59,99 @@ func (cs *commandService) CreateConnection(
4059	}, nil 
4160}
4261
62+ // Subscribe is a decoupled communication mechanism between the data plane agent and control plane. 
4363func  (cs  * commandService ) Subscribe (in  pb.CommandService_SubscribeServer ) error  {
44- 	fmt .Println ("Received subscribe request" )
45- 
4664	ctx  :=  in .Context ()
4765
66+ 	gi , ok  :=  grpcContext .GrpcInfoFromContext (ctx )
67+ 	if  ! ok  {
68+ 		return  agentgrpc .ErrStatusInvalidConnection 
69+ 	}
70+ 
71+ 	cs .logger .Info (fmt .Sprintf ("Received subscribe request from %q" , gi .IPAddress ))
72+ 
73+ 	go  cs .listenForDataPlaneResponse (ctx , in )
74+ 
75+ 	// wait for connection to be established 
76+ 	podName , err  :=  cs .waitForConnection (ctx , gi )
77+ 	if  err  !=  nil  {
78+ 		cs .logger .Error (err , "error waiting for connection" )
79+ 		return  err 
80+ 	}
81+ 
82+ 	cs .logger .Info (fmt .Sprintf ("Handling subscription for %s/%s" , podName , gi .IPAddress ))
4883	for  {
4984		select  {
5085		case  <- ctx .Done ():
5186			return  ctx .Err ()
5287		case  <- time .After (1  *  time .Minute ):
5388			dummyRequest  :=  & pb.ManagementPlaneRequest {
54- 				Request : & pb.ManagementPlaneRequest_StatusRequest {
55- 					StatusRequest : & pb.StatusRequest {},
89+ 				Request : & pb.ManagementPlaneRequest_HealthRequest {
90+ 					HealthRequest : & pb.HealthRequest {},
5691				},
5792			}
58- 			if  err  :=  in .Send (dummyRequest ); err  !=  nil  { // will likely need retry logic 
59- 				fmt . Printf ( "ERROR: %v \n " ,  err )
93+ 			if  err  :=  in .Send (dummyRequest ); err  !=  nil  { // TODO(sberman):  will likely need retry logic 
94+ 				cs . logger . Error ( err ,  "error sending request to agent" )
6095			}
6196		}
6297	}
6398}
6499
65- func  (cs  * commandService ) UpdateDataPlaneStatus (
66- 	_  context.Context ,
67- 	req  * pb.UpdateDataPlaneStatusRequest ,
68- ) (* pb.UpdateDataPlaneStatusResponse , error ) {
69- 	fmt .Println ("Updating data plane status" )
100+ // TODO(sberman): current issue: when control plane restarts, agent doesn't re-establish a CreateConnection call, 
101+ // so this fails. 
102+ func  (cs  * commandService ) waitForConnection (ctx  context.Context , gi  grpcContext.GrpcInfo ) (string , error ) {
103+ 	var  podName  string 
104+ 	ticker  :=  time .NewTicker (time .Second )
105+ 	defer  ticker .Stop ()
70106
71- 	if  req  ==  nil  {
72- 		return  nil , errors .New ("empty update data plane status request" )
107+ 	timer  :=  time .NewTimer (30  *  time .Second )
108+ 	defer  timer .Stop ()
109+ 
110+ 	for  {
111+ 		select  {
112+ 		case  <- ctx .Done ():
113+ 			return  "" , ctx .Err ()
114+ 		case  <- timer .C :
115+ 			return  "" , errors .New ("timed out waiting for agent connection" )
116+ 		case  <- ticker .C :
117+ 			if  podName  =  cs .connTracker .GetConnection (gi .IPAddress ); podName  !=  ""  {
118+ 				return  podName , nil 
119+ 			}
120+ 		}
73121	}
122+ }
74123
75- 	return  & pb.UpdateDataPlaneStatusResponse {}, nil 
124+ func  (cs  * commandService ) listenForDataPlaneResponse (ctx  context.Context , in  pb.CommandService_SubscribeServer ) {
125+ 	for  {
126+ 		select  {
127+ 		case  <- ctx .Done ():
128+ 			return 
129+ 		default :
130+ 			dataPlaneResponse , err  :=  in .Recv ()
131+ 			cs .logger .Info (fmt .Sprintf ("Received data plane response: %v" , dataPlaneResponse ))
132+ 			if  err  !=  nil  {
133+ 				cs .logger .Error (err , "failed to receive data plane response" )
134+ 				return 
135+ 			}
136+ 		}
137+ 	}
76138}
77139
140+ // UpdateDataPlaneHealth includes full health information about the data plane as reported by the agent. 
141+ // TODO(sberman): Is health monitoring the data planes something useful for us to do? 
78142func  (cs  * commandService ) UpdateDataPlaneHealth (
79143	_  context.Context ,
80- 	req  * pb.UpdateDataPlaneHealthRequest ,
144+ 	_  * pb.UpdateDataPlaneHealthRequest ,
81145) (* pb.UpdateDataPlaneHealthResponse , error ) {
82- 	fmt .Println ("Updating data plane health" )
83- 
84- 	if  req  ==  nil  {
85- 		return  nil , errors .New ("empty update dataplane health request" )
86- 	}
87- 
88146	return  & pb.UpdateDataPlaneHealthResponse {}, nil 
89147}
148+ 
149+ // UpdateDataPlaneStatus is called by agent on startup and upon any change in agent metadata, 
150+ // instance metadata, or configurations. Since directly changing nginx configuration on the instance 
151+ // is not supported, this is a no-op for NGF. 
152+ func  (cs  * commandService ) UpdateDataPlaneStatus (
153+ 	_  context.Context ,
154+ 	_  * pb.UpdateDataPlaneStatusRequest ,
155+ ) (* pb.UpdateDataPlaneStatusResponse , error ) {
156+ 	return  & pb.UpdateDataPlaneStatusResponse {}, nil 
157+ }
0 commit comments