@@ -6,25 +6,25 @@ import (
66 "net"
77 "time"
88
9- "github.com/rs/zerolog"
10-
119 "github.com/cloudflare/cloudflared/management"
10+ "github.com/cloudflare/cloudflared/tunnelrpc"
1211 tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
1312)
1413
15- // RPCClientFunc derives a named tunnel rpc client that can then be used to register and unregister connections.
16- type RPCClientFunc func (context.Context , io.ReadWriteCloser , * zerolog. Logger ) NamedTunnelRPCClient
14+ // registerClient derives a named tunnel rpc client that can then be used to register and unregister connections.
15+ type registerClientFunc func (context.Context , io.ReadWriteCloser , time. Duration ) tunnelrpc. RegistrationClient
1716
1817type controlStream struct {
1918 observer * Observer
2019
21- connectedFuse ConnectedFuse
22- namedTunnelProperties * NamedTunnelProperties
23- connIndex uint8
24- edgeAddress net.IP
25- protocol Protocol
20+ connectedFuse ConnectedFuse
21+ tunnelProperties * TunnelProperties
22+ connIndex uint8
23+ edgeAddress net.IP
24+ protocol Protocol
2625
27- newRPCClientFunc RPCClientFunc
26+ registerClientFunc registerClientFunc
27+ registerTimeout time.Duration
2828
2929 gracefulShutdownC <- chan struct {}
3030 gracePeriod time.Duration
@@ -47,27 +47,29 @@ type TunnelConfigJSONGetter interface {
4747func NewControlStream (
4848 observer * Observer ,
4949 connectedFuse ConnectedFuse ,
50- namedTunnelConfig * NamedTunnelProperties ,
50+ tunnelProperties * TunnelProperties ,
5151 connIndex uint8 ,
5252 edgeAddress net.IP ,
53- newRPCClientFunc RPCClientFunc ,
53+ registerClientFunc registerClientFunc ,
54+ registerTimeout time.Duration ,
5455 gracefulShutdownC <- chan struct {},
5556 gracePeriod time.Duration ,
5657 protocol Protocol ,
5758) ControlStreamHandler {
58- if newRPCClientFunc == nil {
59- newRPCClientFunc = newRegistrationRPCClient
59+ if registerClientFunc == nil {
60+ registerClientFunc = tunnelrpc . NewRegistrationClient
6061 }
6162 return & controlStream {
62- observer : observer ,
63- connectedFuse : connectedFuse ,
64- namedTunnelProperties : namedTunnelConfig ,
65- newRPCClientFunc : newRPCClientFunc ,
66- connIndex : connIndex ,
67- edgeAddress : edgeAddress ,
68- gracefulShutdownC : gracefulShutdownC ,
69- gracePeriod : gracePeriod ,
70- protocol : protocol ,
63+ observer : observer ,
64+ connectedFuse : connectedFuse ,
65+ tunnelProperties : tunnelProperties ,
66+ registerClientFunc : registerClientFunc ,
67+ registerTimeout : registerTimeout ,
68+ connIndex : connIndex ,
69+ edgeAddress : edgeAddress ,
70+ gracefulShutdownC : gracefulShutdownC ,
71+ gracePeriod : gracePeriod ,
72+ protocol : protocol ,
7173 }
7274}
7375
@@ -77,13 +79,25 @@ func (c *controlStream) ServeControlStream(
7779 connOptions * tunnelpogs.ConnectionOptions ,
7880 tunnelConfigGetter TunnelConfigJSONGetter ,
7981) error {
80- rpcClient := c .newRPCClientFunc (ctx , rw , c .observer .log )
81-
82- registrationDetails , err := rpcClient .RegisterConnection (ctx , c .namedTunnelProperties , connOptions , c .connIndex , c .edgeAddress , c .observer )
82+ registrationClient := c .registerClientFunc (ctx , rw , c .registerTimeout )
83+
84+ registrationDetails , err := registrationClient .RegisterConnection (
85+ ctx ,
86+ c .tunnelProperties .Credentials .Auth (),
87+ c .tunnelProperties .Credentials .TunnelID ,
88+ connOptions ,
89+ c .connIndex ,
90+ c .edgeAddress )
8391 if err != nil {
84- rpcClient .Close ()
85- return err
92+ defer registrationClient .Close ()
93+ if err .Error () == DuplicateConnectionError {
94+ c .observer .metrics .regFail .WithLabelValues ("dup_edge_conn" , "registerConnection" ).Inc ()
95+ return errDuplicationConnection
96+ }
97+ c .observer .metrics .regFail .WithLabelValues ("server_error" , "registerConnection" ).Inc ()
98+ return serverRegistrationErrorFromRPC (err )
8699 }
100+ c .observer .metrics .regSuccess .WithLabelValues ("registerConnection" ).Inc ()
87101
88102 c .observer .logConnected (registrationDetails .UUID , c .connIndex , registrationDetails .Location , c .edgeAddress , c .protocol )
89103 c .observer .sendConnectedEvent (c .connIndex , c .protocol , registrationDetails .Location )
@@ -92,21 +106,23 @@ func (c *controlStream) ServeControlStream(
92106 // if conn index is 0 and tunnel is not remotely managed, then send local ingress rules configuration
93107 if c .connIndex == 0 && ! registrationDetails .TunnelIsRemotelyManaged {
94108 if tunnelConfig , err := tunnelConfigGetter .GetConfigJSON (); err == nil {
95- if err := rpcClient .SendLocalConfiguration (ctx , tunnelConfig , c .observer ); err != nil {
109+ if err := registrationClient .SendLocalConfiguration (ctx , tunnelConfig ); err != nil {
110+ c .observer .metrics .localConfigMetrics .pushesErrors .Inc ()
96111 c .observer .log .Err (err ).Msg ("unable to send local configuration" )
97112 }
113+ c .observer .metrics .localConfigMetrics .pushes .Inc ()
98114 } else {
99115 c .observer .log .Err (err ).Msg ("failed to obtain current configuration" )
100116 }
101117 }
102118
103- c .waitForUnregister (ctx , rpcClient )
119+ c .waitForUnregister (ctx , registrationClient )
104120 return nil
105121}
106122
107- func (c * controlStream ) waitForUnregister (ctx context.Context , rpcClient NamedTunnelRPCClient ) {
123+ func (c * controlStream ) waitForUnregister (ctx context.Context , registrationClient tunnelrpc. RegistrationClient ) {
108124 // wait for connection termination or start of graceful shutdown
109- defer rpcClient .Close ()
125+ defer registrationClient .Close ()
110126 select {
111127 case <- ctx .Done ():
112128 break
@@ -115,7 +131,7 @@ func (c *controlStream) waitForUnregister(ctx context.Context, rpcClient NamedTu
115131 }
116132
117133 c .observer .sendUnregisteringEvent (c .connIndex )
118- rpcClient .GracefulShutdown (ctx , c .gracePeriod )
134+ registrationClient .GracefulShutdown (ctx , c .gracePeriod )
119135 c .observer .log .Info ().
120136 Int (management .EventTypeKey , int (management .Cloudflared )).
121137 Uint8 (LogFieldConnIndex , c .connIndex ).
0 commit comments