@@ -3,6 +3,7 @@ package natsnodev1
33import (
44 "context"
55 "fmt"
6+ "net/url"
67 "strconv"
78 "strings"
89 "sync"
@@ -58,9 +59,47 @@ func (b *Binding) Bind(_ context.Context, bindings *sdkv2betalib.Bindings) *sdkv
5859 var once sync.Once
5960 once .Do (
6061 func () {
62+ //routes := RoutesFromStr(b.configuration.Natsd.Options.RoutesStr)
63+ //b.configuration.Natsd.Options.Routes = routes
64+ //
65+ //fmt.Println(b.configuration.Natsd.Options.RoutesStr)
66+ //fmt.Println(b.configuration.Natsd.Options.Routes)
67+
68+ // b.configuration.Natsd.Options.Cluster.Host = "0.0.0.1"
69+ // b.configuration.Natsd.Options.Cluster.Port = 4244
70+ // b.configuration.Natsd.Options.Cluster.Name = "multiplexer-test"
71+
72+ // fmt.Println(b.configuration.Natsd.Options.Cluster.ListenStr)
73+
74+ servers := strings .Replace (strings .Trim (fmt .Sprint (b .configuration .Nats .Options .Servers ), "[]" ), " " , "," , - 1 )
6175 switch b .configuration .Natsd .Enabled {
6276 case true :
63- options := b .configuration .Natsd .Options
77+
78+ fmt .Println ("ROUTES STR: " + b .configuration .Natsd .Options .RoutesStr )
79+ var routes []* url.URL
80+ if b .configuration .Natsd .Options .RoutesStr != "" {
81+ routes = RoutesFromStr (b .configuration .Natsd .Options .RoutesStr )
82+ }
83+
84+ options := & natsd.Options {
85+ // Standard client options
86+ ServerName : b .configuration .Natsd .Options .ServerName ,
87+ Host : b .configuration .Natsd .Options .Host ,
88+ Port : b .configuration .Natsd .Options .Port ,
89+ HTTPPort : b .configuration .Natsd .Options .HTTPPort ,
90+
91+ // Clustering options
92+ Cluster : natsd.ClusterOpts {
93+ Name : b .configuration .Natsd .Options .Cluster .Name ,
94+ Host : b .configuration .Natsd .Options .Cluster .Host ,
95+ Port : b .configuration .Natsd .Options .Cluster .Port ,
96+ },
97+ Routes : routes ,
98+ Debug : true ,
99+ Trace : true ,
100+ }
101+
102+ sdkv2betalib .PrettyPrint (options )
64103
65104 // Check if we are running inside the mesh, if so, use the CustomDialer option
66105 if b .configuration .Platform .Mesh .Enabled {
@@ -70,7 +109,7 @@ func (b *Binding) Bind(_ context.Context, bindings *sdkv2betalib.Bindings) *sdkv
70109 }
71110 }
72111
73- server , err := natsd .NewServer (& options )
112+ server , err := natsd .NewServer (options )
74113 if err != nil {
75114 fmt .Println ("natsd error: " , err )
76115 panic (err )
@@ -89,7 +128,7 @@ func (b *Binding) Bind(_ context.Context, bindings *sdkv2betalib.Bindings) *sdkv
89128 panic ("Server not ready within 60 seconds" )
90129 }
91130
92- _nats , err := nats .Connect (fmt . Sprintf ( "nats://%s:%d" , options . Host , options . Port ) )
131+ _nats , err := nats .Connect (servers )
93132 if err != nil {
94133 fmt .Println ("error connecting to NATS server at localhost port: " + strconv .Itoa (options .Port ) + " " + err .Error ())
95134 }
@@ -118,8 +157,6 @@ func (b *Binding) Bind(_ context.Context, bindings *sdkv2betalib.Bindings) *sdkv
118157
119158 RegisterEventStreams ()
120159 case false :
121- servers := strings .Replace (strings .Trim (fmt .Sprint (b .configuration .Nats .Options .Servers ), "[]" ), " " , "," , - 1 )
122-
123160 // Check if we are running inside the mesh, if so, use the CustomDialer option
124161 if b .configuration .Platform .Mesh .Enabled {
125162 if ! nebulav1 .IsBound {
@@ -129,7 +166,15 @@ func (b *Binding) Bind(_ context.Context, bindings *sdkv2betalib.Bindings) *sdkv
129166
130167 natsOptions = append (natsOptions , nats .SetCustomDialer (nebulav1 .Bound .MeshSocket ))
131168 }
132-
169+ natsOptions = append (natsOptions , nats .ConnectHandler (func (c * nats.Conn ) {
170+ apexlog .Info ("Node connected to: " + c .Opts .Url )
171+ }))
172+ natsOptions = append (natsOptions , nats .ClosedHandler (func (c * nats.Conn ) {
173+ apexlog .Info ("Closed handler: " + c .Opts .Url )
174+ }))
175+ natsOptions = append (natsOptions , nats .DiscoveredServersHandler (func (c * nats.Conn ) {
176+ apexlog .Info ("Discovered Server: " + c .Opts .Url )
177+ }))
133178 natsOptions = append (natsOptions , nats .DisconnectErrHandler (func (_ * nats.Conn , err error ) {
134179 apexlog .Info ("Disconnected due to: " + err .Error ())
135180 }))
@@ -276,7 +321,7 @@ func connectWithRetry(url string, maxRetries int, retryDelay time.Duration, opti
276321 var err error
277322
278323 for attempt := 1 ; maxRetries == - 1 || attempt <= maxRetries ; attempt ++ {
279- fmt .Printf ("Connecting to NATS (attempt %d)...\n " , attempt )
324+ fmt .Printf ("Connecting to NATS- %s - (attempt %d)...\n " , nc . ConnectedUrl () , attempt )
280325
281326 nc , err = nats .Connect (url )
282327 if err != nil {
@@ -293,9 +338,24 @@ func connectWithRetry(url string, maxRetries int, retryDelay time.Duration, opti
293338 continue
294339 }
295340
296- fmt .Println ("Connected to NATS and JetStream is ready ." )
341+ fmt .Println ("Connected to NATS and JetStream. Ready ." )
297342 return nc , js , nil
298343 }
299344
300345 return nil , nil , fmt .Errorf ("failed to connect to NATS after %d attempts" , maxRetries )
301346}
347+
348+ // RoutesFromStr parses route URLs from a string
349+ func RoutesFromStr (routesStr string ) []* url.URL {
350+ routes := strings .Split (routesStr , "," )
351+ if len (routes ) == 0 {
352+ return nil
353+ }
354+ routeUrls := []* url.URL {}
355+ for _ , r := range routes {
356+ r = strings .TrimSpace (r )
357+ u , _ := url .Parse (r )
358+ routeUrls = append (routeUrls , u )
359+ }
360+ return routeUrls
361+ }
0 commit comments