diff --git a/CHANGELOG.md b/CHANGELOG.md index d5b533fd..f5b24626 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,34 @@ +# Release notes 1.0.0 + +## What's New + +* Multi-connection support to edge routers +* Major version set to 1, to indicate compatibility with OpenZiti v1+ + +## Multi-connection support to edge router + +If the `EnableSeparateControlPlaneConnection` is set to true in `ziti.Config`, +the SDK will attempt to use a separate connection to each ER for control messaging. +If the router does not support this feature, then the SDK will fallback to using +a single connection. + +Using a separate connection for control messaging will ensure that control messages +such as dials do not get stuck behind data messages. This is mostly important for +SDKs which are being used to host services or client side applications which are +multiplexing multiple connections, for example proxies and tunnelers. + +## Issues Fixed and Dependency Updates + +* github.com/openziti/sdk-golang: [v0.25.2 -> v1.0.0](https://github.com/openziti/sdk-golang/compare/v0.25.2...v1.0.0) + * [Issue #701](https://github.com/openziti/sdk-golang/issues/701) - Support multi-underlay channels for edge router connections + +* github.com/openziti/channel/v4: [v4.0.1 -> v4.0.3](https://github.com/openziti/channel/compare/v4.0.1...v4.0.3) + * [Issue #176](https://github.com/openziti/channel/issues/176) - Multi-channel need a mechanism to notify the txer that the underlay has closed + +* github.com/openziti/metrics: [v1.3.0 -> v1.4.0](https://github.com/openziti/metrics/compare/v1.3.0...v1.4.0) +* github.com/openziti/transport/v2: [v2.0.167 -> v2.0.168](https://github.com/openziti/transport/compare/v2.0.167...v2.0.168) +* golang.org/x/net: v0.37.0 -> v0.38.0 + # Release notes 0.25.2 ## What's New diff --git a/example/go.mod b/example/go.mod index 9980d8de..2ffceb63 100644 --- a/example/go.mod +++ b/example/go.mod @@ -14,7 +14,7 @@ require ( github.com/openziti/foundation/v2 v2.0.59 github.com/openziti/runzmd v1.0.33 github.com/openziti/sdk-golang v0.0.0 - github.com/openziti/transport/v2 v2.0.167 + github.com/openziti/transport/v2 v2.0.168 github.com/pkg/errors v0.9.1 github.com/sirupsen/logrus v1.9.3 github.com/spf13/cobra v1.9.1 @@ -79,10 +79,10 @@ require ( github.com/muhlemmer/gu v0.3.1 // indirect github.com/oklog/ulid v1.3.1 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect - github.com/openziti/channel/v4 v4.0.1 // indirect + github.com/openziti/channel/v4 v4.0.3 // indirect github.com/openziti/edge-api v0.26.42 // indirect github.com/openziti/identity v1.0.100 // indirect - github.com/openziti/metrics v1.3.0 // indirect + github.com/openziti/metrics v1.4.0 // indirect github.com/openziti/secretstream v0.1.32 // indirect github.com/orcaman/concurrent-map/v2 v2.0.1 // indirect github.com/parallaxsecond/parsec-client-go v0.0.0-20221025095442-f0a77d263cf9 // indirect @@ -117,7 +117,7 @@ require ( golang.org/x/crypto v0.36.0 // indirect golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect golang.org/x/image v0.18.0 // indirect - golang.org/x/net v0.37.0 // indirect + golang.org/x/net v0.38.0 // indirect golang.org/x/oauth2 v0.28.0 // indirect golang.org/x/sync v0.12.0 // indirect golang.org/x/sys v0.31.0 // indirect diff --git a/example/go.sum b/example/go.sum index 3d3a8fa1..0c05c30e 100644 --- a/example/go.sum +++ b/example/go.sum @@ -358,22 +358,22 @@ github.com/onsi/gomega v1.13.0 h1:7lLHu94wT9Ij0o6EWWclhu0aOh32VxhkwEJvzuWPeak= github.com/onsi/gomega v1.13.0/go.mod h1:lRk9szgn8TxENtWd0Tp4c3wjlRfMTMH27I+3Je41yGY= github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs= github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= -github.com/openziti/channel/v4 v4.0.1 h1:C95j0IVnOiBZeeVmssb7A45vPgc17H9IouemX2KNPJs= -github.com/openziti/channel/v4 v4.0.1/go.mod h1:rhQ7RnsO5UQ9qZhyyRwc7nGg0VdGAEbP7ug04FfAxAo= +github.com/openziti/channel/v4 v4.0.3 h1:lONmg4TgiPxqUdgZzc69hRSwnvH1WZF+xXP3nfbI+aU= +github.com/openziti/channel/v4 v4.0.3/go.mod h1:ekqQwL27nKufMN8nB0jO+bjj9QfOCiPM0GCPllUSK60= github.com/openziti/edge-api v0.26.42 h1:Wi/BUttSUvedT9XGht7vi/zI/TNGc3ApvjkAviWhauA= github.com/openziti/edge-api v0.26.42/go.mod h1:sYHVpm26Jr1u7VooNJzTb2b2nGSlmCHMnbGC8XfWSng= github.com/openziti/foundation/v2 v2.0.59 h1:PJwrcTq62x+cONBeKMlnsuphsTlOvTz8j8prYnehm8o= github.com/openziti/foundation/v2 v2.0.59/go.mod h1:76gmsdIBHvv4O3I0TuFBfO58Rv7YN8FA0ojwYz27ZxE= github.com/openziti/identity v1.0.100 h1:FTkbhykDCMw1z/wxEeDfmq1aBp2tLRjZ3ggioRT4pg8= github.com/openziti/identity v1.0.100/go.mod h1:E4SHqfXaZldDCo/GIdSD/Xg61obuolWYg9Qe8lqGUrQ= -github.com/openziti/metrics v1.3.0 h1:oeythnUY2gs48MYM/HelAbJupfP/u81VYKMEwaGHeRM= -github.com/openziti/metrics v1.3.0/go.mod h1:MOLcoTxhPNla6+NWUCMVTnl1PNqTU40qrbKVa/lVVgg= +github.com/openziti/metrics v1.4.0 h1:uZALaZINoTFqRE3XcVZ/xGuvXJpDhn/a0kxaX586lzE= +github.com/openziti/metrics v1.4.0/go.mod h1:MOLcoTxhPNla6+NWUCMVTnl1PNqTU40qrbKVa/lVVgg= github.com/openziti/runzmd v1.0.33 h1:tOyjRoUuVXIo1z1pNU32jALWkMmhzsSaDrhLtuOn3Ts= github.com/openziti/runzmd v1.0.33/go.mod h1:8c/uvZR/XWXQNllTq6LuTpfKL2DTNxfI2X2wYhgRwik= github.com/openziti/secretstream v0.1.32 h1:89/ZVcwIQjdVmWDfVRfMEChJJXTLXJ59AYBw5j646M4= github.com/openziti/secretstream v0.1.32/go.mod h1:8YaIbjyMwBeKQ7eOYcoVPKHT10u+4OVPXpnZAeDzC6o= -github.com/openziti/transport/v2 v2.0.167 h1:KE2u04cPAO+Xx9eidcYMhAwoGccXZOVnqmhG7nWeuBo= -github.com/openziti/transport/v2 v2.0.167/go.mod h1:RYom6Xjt8gZaCmL0t4FrIcM46RfvqDtoRSUixq8V+mI= +github.com/openziti/transport/v2 v2.0.168 h1:1Anf7X+4xmSKQ12GdPJFhoMZi04QxgD4MJu3agFc1R4= +github.com/openziti/transport/v2 v2.0.168/go.mod h1:vE9FGxPB6I89SWun5mOz3Tuz2QDctwNfB4oqDIdzPoM= github.com/orcaman/concurrent-map/v2 v2.0.1 h1:jOJ5Pg2w1oeB6PeDurIYf6k9PQ+aTITr/6lP/L/zp6c= github.com/orcaman/concurrent-map/v2 v2.0.1/go.mod h1:9Eq3TG2oBe5FirmYWQfYO5iH1q0Jv47PLaNK++uCdOM= github.com/parallaxsecond/parsec-client-go v0.0.0-20221025095442-f0a77d263cf9 h1:mOvehYivJ4Aqu2CPe3D3lv8jhqOI9/1o0THxJHBE0qw= @@ -614,8 +614,8 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= -golang.org/x/net v0.37.0 h1:1zLorHbz+LYj7MQlSf1+2tPIIgibq2eL5xkrGk6f+2c= -golang.org/x/net v0.37.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= +golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8= +golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= diff --git a/example/influxdb-client-go/go.mod b/example/influxdb-client-go/go.mod index 6e3c7ebd..98916798 100644 --- a/example/influxdb-client-go/go.mod +++ b/example/influxdb-client-go/go.mod @@ -93,13 +93,13 @@ require ( github.com/muhlemmer/gu v0.3.1 // indirect github.com/oklog/ulid v1.3.1 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect - github.com/openziti/channel/v4 v4.0.1 // indirect + github.com/openziti/channel/v4 v4.0.3 // indirect github.com/openziti/edge-api v0.26.42 // indirect github.com/openziti/foundation/v2 v2.0.59 // indirect github.com/openziti/identity v1.0.100 // indirect - github.com/openziti/metrics v1.3.0 // indirect + github.com/openziti/metrics v1.4.0 // indirect github.com/openziti/secretstream v0.1.32 // indirect - github.com/openziti/transport/v2 v2.0.167 // indirect + github.com/openziti/transport/v2 v2.0.168 // indirect github.com/orcaman/concurrent-map/v2 v2.0.1 // indirect github.com/parallaxsecond/parsec-client-go v0.0.0-20221025095442-f0a77d263cf9 // indirect github.com/pelletier/go-toml/v2 v2.1.0 // indirect @@ -132,7 +132,7 @@ require ( golang.org/x/arch v0.5.0 // indirect golang.org/x/crypto v0.36.0 // indirect golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect - golang.org/x/net v0.37.0 // indirect + golang.org/x/net v0.38.0 // indirect golang.org/x/oauth2 v0.28.0 // indirect golang.org/x/sync v0.12.0 // indirect golang.org/x/sys v0.31.0 // indirect diff --git a/example/influxdb-client-go/go.sum b/example/influxdb-client-go/go.sum index c7de0ae6..c6665ece 100644 --- a/example/influxdb-client-go/go.sum +++ b/example/influxdb-client-go/go.sum @@ -413,20 +413,20 @@ github.com/onsi/gomega v1.13.0 h1:7lLHu94wT9Ij0o6EWWclhu0aOh32VxhkwEJvzuWPeak= github.com/onsi/gomega v1.13.0/go.mod h1:lRk9szgn8TxENtWd0Tp4c3wjlRfMTMH27I+3Je41yGY= github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs= github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= -github.com/openziti/channel/v4 v4.0.1 h1:C95j0IVnOiBZeeVmssb7A45vPgc17H9IouemX2KNPJs= -github.com/openziti/channel/v4 v4.0.1/go.mod h1:rhQ7RnsO5UQ9qZhyyRwc7nGg0VdGAEbP7ug04FfAxAo= +github.com/openziti/channel/v4 v4.0.3 h1:lONmg4TgiPxqUdgZzc69hRSwnvH1WZF+xXP3nfbI+aU= +github.com/openziti/channel/v4 v4.0.3/go.mod h1:ekqQwL27nKufMN8nB0jO+bjj9QfOCiPM0GCPllUSK60= github.com/openziti/edge-api v0.26.42 h1:Wi/BUttSUvedT9XGht7vi/zI/TNGc3ApvjkAviWhauA= github.com/openziti/edge-api v0.26.42/go.mod h1:sYHVpm26Jr1u7VooNJzTb2b2nGSlmCHMnbGC8XfWSng= github.com/openziti/foundation/v2 v2.0.59 h1:PJwrcTq62x+cONBeKMlnsuphsTlOvTz8j8prYnehm8o= github.com/openziti/foundation/v2 v2.0.59/go.mod h1:76gmsdIBHvv4O3I0TuFBfO58Rv7YN8FA0ojwYz27ZxE= github.com/openziti/identity v1.0.100 h1:FTkbhykDCMw1z/wxEeDfmq1aBp2tLRjZ3ggioRT4pg8= github.com/openziti/identity v1.0.100/go.mod h1:E4SHqfXaZldDCo/GIdSD/Xg61obuolWYg9Qe8lqGUrQ= -github.com/openziti/metrics v1.3.0 h1:oeythnUY2gs48MYM/HelAbJupfP/u81VYKMEwaGHeRM= -github.com/openziti/metrics v1.3.0/go.mod h1:MOLcoTxhPNla6+NWUCMVTnl1PNqTU40qrbKVa/lVVgg= +github.com/openziti/metrics v1.4.0 h1:uZALaZINoTFqRE3XcVZ/xGuvXJpDhn/a0kxaX586lzE= +github.com/openziti/metrics v1.4.0/go.mod h1:MOLcoTxhPNla6+NWUCMVTnl1PNqTU40qrbKVa/lVVgg= github.com/openziti/secretstream v0.1.32 h1:89/ZVcwIQjdVmWDfVRfMEChJJXTLXJ59AYBw5j646M4= github.com/openziti/secretstream v0.1.32/go.mod h1:8YaIbjyMwBeKQ7eOYcoVPKHT10u+4OVPXpnZAeDzC6o= -github.com/openziti/transport/v2 v2.0.167 h1:KE2u04cPAO+Xx9eidcYMhAwoGccXZOVnqmhG7nWeuBo= -github.com/openziti/transport/v2 v2.0.167/go.mod h1:RYom6Xjt8gZaCmL0t4FrIcM46RfvqDtoRSUixq8V+mI= +github.com/openziti/transport/v2 v2.0.168 h1:1Anf7X+4xmSKQ12GdPJFhoMZi04QxgD4MJu3agFc1R4= +github.com/openziti/transport/v2 v2.0.168/go.mod h1:vE9FGxPB6I89SWun5mOz3Tuz2QDctwNfB4oqDIdzPoM= github.com/orcaman/concurrent-map/v2 v2.0.1 h1:jOJ5Pg2w1oeB6PeDurIYf6k9PQ+aTITr/6lP/L/zp6c= github.com/orcaman/concurrent-map/v2 v2.0.1/go.mod h1:9Eq3TG2oBe5FirmYWQfYO5iH1q0Jv47PLaNK++uCdOM= github.com/parallaxsecond/parsec-client-go v0.0.0-20221025095442-f0a77d263cf9 h1:mOvehYivJ4Aqu2CPe3D3lv8jhqOI9/1o0THxJHBE0qw= @@ -671,8 +671,8 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= -golang.org/x/net v0.37.0 h1:1zLorHbz+LYj7MQlSf1+2tPIIgibq2eL5xkrGk6f+2c= -golang.org/x/net v0.37.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= +golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8= +golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= diff --git a/go.mod b/go.mod index b824e50a..32c248b2 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ toolchain go1.23.1 require ( github.com/Jeffail/gabs v1.4.0 github.com/cenkalti/backoff/v4 v4.3.0 + github.com/emirpasic/gods v1.18.1 github.com/fullsailor/pkcs7 v0.0.0-20190404230743-d7302db945fa github.com/go-openapi/runtime v0.28.0 github.com/go-openapi/strfmt v0.23.0 @@ -17,13 +18,13 @@ require ( github.com/michaelquigley/pfxlog v0.6.10 github.com/mitchellh/go-ps v1.0.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openziti/channel/v4 v4.0.1 + github.com/openziti/channel/v4 v4.0.3 github.com/openziti/edge-api v0.26.42 github.com/openziti/foundation/v2 v2.0.59 github.com/openziti/identity v1.0.100 - github.com/openziti/metrics v1.3.0 + github.com/openziti/metrics v1.4.0 github.com/openziti/secretstream v0.1.32 - github.com/openziti/transport/v2 v2.0.167 + github.com/openziti/transport/v2 v2.0.168 github.com/orcaman/concurrent-map/v2 v2.0.1 github.com/pkg/errors v0.9.1 github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 @@ -40,7 +41,6 @@ require ( require ( github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect - github.com/emirpasic/gods v1.18.1 // indirect github.com/fsnotify/fsnotify v1.8.0 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect @@ -80,7 +80,7 @@ require ( go.opentelemetry.io/otel/metric v1.29.0 // indirect go.opentelemetry.io/otel/trace v1.29.0 // indirect golang.org/x/crypto v0.36.0 // indirect - golang.org/x/net v0.37.0 // indirect + golang.org/x/net v0.38.0 // indirect golang.org/x/sync v0.12.0 // indirect golang.org/x/term v0.30.0 // indirect golang.org/x/text v0.23.0 // indirect diff --git a/go.sum b/go.sum index d148ca78..61744d8d 100644 --- a/go.sum +++ b/go.sum @@ -299,20 +299,20 @@ github.com/onsi/gomega v1.13.0 h1:7lLHu94wT9Ij0o6EWWclhu0aOh32VxhkwEJvzuWPeak= github.com/onsi/gomega v1.13.0/go.mod h1:lRk9szgn8TxENtWd0Tp4c3wjlRfMTMH27I+3Je41yGY= github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs= github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= -github.com/openziti/channel/v4 v4.0.1 h1:C95j0IVnOiBZeeVmssb7A45vPgc17H9IouemX2KNPJs= -github.com/openziti/channel/v4 v4.0.1/go.mod h1:rhQ7RnsO5UQ9qZhyyRwc7nGg0VdGAEbP7ug04FfAxAo= +github.com/openziti/channel/v4 v4.0.3 h1:lONmg4TgiPxqUdgZzc69hRSwnvH1WZF+xXP3nfbI+aU= +github.com/openziti/channel/v4 v4.0.3/go.mod h1:ekqQwL27nKufMN8nB0jO+bjj9QfOCiPM0GCPllUSK60= github.com/openziti/edge-api v0.26.42 h1:Wi/BUttSUvedT9XGht7vi/zI/TNGc3ApvjkAviWhauA= github.com/openziti/edge-api v0.26.42/go.mod h1:sYHVpm26Jr1u7VooNJzTb2b2nGSlmCHMnbGC8XfWSng= github.com/openziti/foundation/v2 v2.0.59 h1:PJwrcTq62x+cONBeKMlnsuphsTlOvTz8j8prYnehm8o= github.com/openziti/foundation/v2 v2.0.59/go.mod h1:76gmsdIBHvv4O3I0TuFBfO58Rv7YN8FA0ojwYz27ZxE= github.com/openziti/identity v1.0.100 h1:FTkbhykDCMw1z/wxEeDfmq1aBp2tLRjZ3ggioRT4pg8= github.com/openziti/identity v1.0.100/go.mod h1:E4SHqfXaZldDCo/GIdSD/Xg61obuolWYg9Qe8lqGUrQ= -github.com/openziti/metrics v1.3.0 h1:oeythnUY2gs48MYM/HelAbJupfP/u81VYKMEwaGHeRM= -github.com/openziti/metrics v1.3.0/go.mod h1:MOLcoTxhPNla6+NWUCMVTnl1PNqTU40qrbKVa/lVVgg= +github.com/openziti/metrics v1.4.0 h1:uZALaZINoTFqRE3XcVZ/xGuvXJpDhn/a0kxaX586lzE= +github.com/openziti/metrics v1.4.0/go.mod h1:MOLcoTxhPNla6+NWUCMVTnl1PNqTU40qrbKVa/lVVgg= github.com/openziti/secretstream v0.1.32 h1:89/ZVcwIQjdVmWDfVRfMEChJJXTLXJ59AYBw5j646M4= github.com/openziti/secretstream v0.1.32/go.mod h1:8YaIbjyMwBeKQ7eOYcoVPKHT10u+4OVPXpnZAeDzC6o= -github.com/openziti/transport/v2 v2.0.167 h1:KE2u04cPAO+Xx9eidcYMhAwoGccXZOVnqmhG7nWeuBo= -github.com/openziti/transport/v2 v2.0.167/go.mod h1:RYom6Xjt8gZaCmL0t4FrIcM46RfvqDtoRSUixq8V+mI= +github.com/openziti/transport/v2 v2.0.168 h1:1Anf7X+4xmSKQ12GdPJFhoMZi04QxgD4MJu3agFc1R4= +github.com/openziti/transport/v2 v2.0.168/go.mod h1:vE9FGxPB6I89SWun5mOz3Tuz2QDctwNfB4oqDIdzPoM= github.com/orcaman/concurrent-map/v2 v2.0.1 h1:jOJ5Pg2w1oeB6PeDurIYf6k9PQ+aTITr/6lP/L/zp6c= github.com/orcaman/concurrent-map/v2 v2.0.1/go.mod h1:9Eq3TG2oBe5FirmYWQfYO5iH1q0Jv47PLaNK++uCdOM= github.com/parallaxsecond/parsec-client-go v0.0.0-20221025095442-f0a77d263cf9 h1:mOvehYivJ4Aqu2CPe3D3lv8jhqOI9/1o0THxJHBE0qw= @@ -497,8 +497,8 @@ golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLd golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= -golang.org/x/net v0.37.0 h1:1zLorHbz+LYj7MQlSf1+2tPIIgibq2eL5xkrGk6f+2c= -golang.org/x/net v0.37.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= +golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8= +golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= diff --git a/version b/version index 7d385d41..d3827e75 100644 --- a/version +++ b/version @@ -1 +1 @@ -0.25 +1.0 diff --git a/ziti/config.go b/ziti/config.go index 2efd674f..e0f0fd72 100644 --- a/ziti/config.go +++ b/ziti/config.go @@ -60,6 +60,15 @@ type Config struct { //Allows providing a function which controls how/where connections to a router are proxied. RouterProxy func(addr string) *transport.ProxyConfiguration `json:"-"` + + // If set to true, the sdk will attempt to create a separate connection to edge routers for control plane data, + // such as dials. This flag should not be considered part of the stable API yet. It may default to true at + // some point in the future or be removed. + EnableSeparateControlPlaneConnection bool `json:"-"` +} + +func (cfg *Config) SetSeparateControlPlaneConnectionEnabled(enabled bool) { + cfg.EnableSeparateControlPlaneConnection = enabled } // NewConfig will create a new Config object from a provided Ziti Edge Client API URL and identity configuration. diff --git a/ziti/contexts.go b/ziti/contexts.go index 99f78f4b..6d749d74 100644 --- a/ziti/contexts.go +++ b/ziti/contexts.go @@ -82,13 +82,14 @@ func NewContextWithOpts(cfg *Config, options *Options) (Context, error) { } newContext := &ContextImpl{ - Id: NewId(), - routerConnections: cmap.New[edge.RouterConn](), - options: options, - authQueryHandlers: map[string]func(query *rest_model.AuthQueryDetail, response MfaCodeResponse) error{}, - closeNotify: make(chan struct{}), - EventEmmiter: events.New(), - routerProxy: cfg.RouterProxy, + Id: NewId(), + routerConnections: cmap.New[edge.RouterConn](), + options: options, + authQueryHandlers: map[string]func(query *rest_model.AuthQueryDetail, response MfaCodeResponse) error{}, + closeNotify: make(chan struct{}), + EventEmmiter: events.New(), + routerProxy: cfg.RouterProxy, + enableCtrlPlaneConnection: cfg.EnableSeparateControlPlaneConnection, } if cfg.ID.Cert != "" && cfg.ID.Key != "" { diff --git a/ziti/edge/addr.go b/ziti/edge/addr.go index f2134a23..b90d633a 100644 --- a/ziti/edge/addr.go +++ b/ziti/edge/addr.go @@ -29,5 +29,5 @@ func (e *Addr) Network() string { } func (e *Addr) String() string { - return fmt.Sprintf("ziti-edge-router connId=%v, logical=%v", e.MsgCh.Id(), e.MsgCh.LogicalName()) + return fmt.Sprintf("ziti-edge-router connId=%v, logical=%v", e.MsgCh.Id(), e.MsgCh.GetChannel().LogicalName()) } diff --git a/ziti/edge/channel.go b/ziti/edge/channel.go new file mode 100644 index 00000000..66a28a8c --- /dev/null +++ b/ziti/edge/channel.go @@ -0,0 +1,220 @@ +package edge + +import ( + "github.com/michaelquigley/pfxlog" + "github.com/openziti/channel/v4" + "io" + "sync/atomic" + "time" +) + +const ( + ChannelTypeControl string = "edge.control" + ChannelTypeDefault string = "edge.default" +) + +func NewBaseSdkChannel(underlay channel.Underlay) *BaseSdkChannel { + senderContext := channel.NewSenderContext() + + defaultMsgChan := make(chan channel.Sendable, 4) + controlMsgChan := make(chan channel.Sendable, 4) + retryMsgChan := make(chan channel.Sendable, 4) + + result := &BaseSdkChannel{ + SenderContext: senderContext, + id: underlay.ConnectionId(), + defaultSender: channel.NewSingleChSender(senderContext, defaultMsgChan), + controlSender: channel.NewSingleChSender(senderContext, controlMsgChan), + controlMsgChan: controlMsgChan, + defaultMsgChan: defaultMsgChan, + retryMsgChan: retryMsgChan, + } + return result +} + +type BaseSdkChannel struct { + id string + ch channel.MultiChannel + channel.SenderContext + controlSender channel.Sender + defaultSender channel.Sender + + controlChannelAvailable atomic.Bool + controlMsgChan chan channel.Sendable + defaultMsgChan chan channel.Sendable + retryMsgChan chan channel.Sendable +} + +func (self *BaseSdkChannel) InitChannel(ch channel.MultiChannel) { + self.ch = ch +} + +func (self *BaseSdkChannel) GetChannel() channel.Channel { + return self.ch +} + +func (self *BaseSdkChannel) GetDefaultSender() channel.Sender { + return self.defaultSender +} + +func (self *BaseSdkChannel) GetControlSender() channel.Sender { + return self.controlSender +} + +func (self *BaseSdkChannel) GetNextMsgDefault(notifier *channel.CloseNotifier) (channel.Sendable, error) { + if self.controlChannelAvailable.Load() { + select { + case msg := <-self.defaultMsgChan: + return msg, nil + case msg := <-self.retryMsgChan: + return msg, nil + case <-self.GetCloseNotify(): + return nil, io.EOF + case <-notifier.GetCloseNotify(): + return nil, io.EOF + } + } else { + select { + case msg := <-self.defaultMsgChan: + return msg, nil + case msg := <-self.controlMsgChan: + return msg, nil + case msg := <-self.retryMsgChan: + return msg, nil + case <-self.GetCloseNotify(): + return nil, io.EOF + case <-notifier.GetCloseNotify(): + return nil, io.EOF + } + } +} + +func (self *BaseSdkChannel) GetNextControlMsg(notifier *channel.CloseNotifier) (channel.Sendable, error) { + select { + case msg := <-self.controlMsgChan: + return msg, nil + case msg := <-self.retryMsgChan: + return msg, nil + case <-self.GetCloseNotify(): + return nil, io.EOF + case <-notifier.GetCloseNotify(): + return nil, io.EOF + } +} + +func (self *BaseSdkChannel) GetMessageSource(underlay channel.Underlay) channel.MessageSourceF { + if channel.GetUnderlayType(underlay) == ChannelTypeControl { + return self.GetNextControlMsg + } + return self.GetNextMsgDefault +} + +func (self *BaseSdkChannel) HandleTxFailed(_ channel.Underlay, sendable channel.Sendable) bool { + select { + case self.retryMsgChan <- sendable: + return true + case self.defaultMsgChan <- sendable: + return true + default: + return false + } +} + +func (self *BaseSdkChannel) HandleUnderlayAccepted(ch channel.MultiChannel, underlay channel.Underlay) { + self.UpdateCtrlChannelAvailable(ch) + pfxlog.Logger(). + WithField("id", ch.Label()). + WithField("underlays", ch.GetUnderlayCountsByType()). + WithField("underlayType", channel.GetUnderlayType(underlay)). + WithField("controlAvailable", self.controlChannelAvailable.Load()). + Info("underlay added") +} + +func (self *BaseSdkChannel) UpdateCtrlChannelAvailable(ch channel.MultiChannel) { + self.controlChannelAvailable.Store(ch.GetUnderlayCountsByType()[ChannelTypeControl] > 0) +} + +func NewDialSdkChannel(dialer channel.DialUnderlayFactory, underlay channel.Underlay) UnderlayHandlerSdkChannel { + result := &DialSdkChannel{ + BaseSdkChannel: *NewBaseSdkChannel(underlay), + dialer: dialer, + } + + result.constraints.AddConstraint(ChannelTypeDefault, 1, 1) + result.constraints.AddConstraint(ChannelTypeControl, 1, 0) + + return result +} + +type UnderlayHandlerSdkChannel interface { + SdkChannel + channel.UnderlayHandler +} + +type SdkChannel interface { + InitChannel(channel.MultiChannel) + GetChannel() channel.Channel + GetDefaultSender() channel.Sender + GetControlSender() channel.Sender +} + +type DialSdkChannel struct { + BaseSdkChannel + dialer channel.DialUnderlayFactory + constraints channel.UnderlayConstraints +} + +func (self *DialSdkChannel) Start(channel channel.MultiChannel) { + self.constraints.Apply(channel, self) +} + +func (self *DialSdkChannel) HandleUnderlayClose(ch channel.MultiChannel, underlay channel.Underlay) { + pfxlog.Logger(). + WithField("id", ch.Label()). + WithField("underlays", ch.GetUnderlayCountsByType()). + WithField("underlayType", channel.GetUnderlayType(underlay)). + Info("underlay closed") + self.UpdateCtrlChannelAvailable(ch) + self.constraints.Apply(ch, self) +} + +func (self *DialSdkChannel) DialFailed(_ channel.MultiChannel, _ string, attempt int) { + delay := 2 * time.Duration(attempt) * time.Second + if delay > time.Minute { + delay = time.Minute + } + time.Sleep(delay) +} + +func (self *DialSdkChannel) CreateGroupedUnderlay(groupId string, underlayType string, timeout time.Duration) (channel.Underlay, error) { + return self.dialer.CreateWithHeaders(timeout, map[int32][]byte{ + channel.TypeHeader: []byte(underlayType), + channel.ConnectionIdHeader: []byte(groupId), + channel.IsGroupedHeader: {1}, + }) +} + +func NewSingleSdkChannel(ch channel.Channel) SdkChannel { + return &SingleSdkChannel{ + ch: ch, + } +} + +type SingleSdkChannel struct { + ch channel.Channel +} + +func (self *SingleSdkChannel) InitChannel(channel.MultiChannel) { +} + +func (self *SingleSdkChannel) GetChannel() channel.Channel { + return self.ch +} + +func (self *SingleSdkChannel) GetDefaultSender() channel.Sender { + return self.ch +} + +func (self *SingleSdkChannel) GetControlSender() channel.Sender { + return self.ch +} diff --git a/ziti/edge/conn.go b/ziti/edge/conn.go index 1517f00e..f7d23d14 100644 --- a/ziti/edge/conn.go +++ b/ziti/edge/conn.go @@ -103,7 +103,7 @@ type Conn interface { const forever = time.Hour * 24 * 365 * 100 type MsgChannel struct { - channel.Channel + SdkChannel id uint32 msgIdSeq *sequence.Sequence writeDeadline time.Time @@ -118,17 +118,17 @@ type TraceRouteResult struct { Error string } -func NewEdgeMsgChannel(ch channel.Channel, connId uint32) *MsgChannel { +func NewEdgeMsgChannel(ch SdkChannel, connId uint32) *MsgChannel { traceEnabled := strings.EqualFold("true", os.Getenv("ZITI_TRACE_ENABLED")) if traceEnabled { pfxlog.Logger().Info("Ziti message tracing ENABLED") } return &MsgChannel{ - Channel: ch, - id: connId, - msgIdSeq: sequence.NewSequence(), - trace: traceEnabled, + SdkChannel: ch, + id: connId, + msgIdSeq: sequence.NewSequence(), + trace: traceEnabled, } } @@ -178,9 +178,9 @@ func (ec *MsgChannel) WriteTraced(data []byte, msgUUID []byte, hdrs map[int32][] // it is retained, and we can cause data corruption var err error if ec.writeDeadline.IsZero() { - err = msg.WithTimeout(forever).SendAndWaitForWire(ec.Channel) + err = msg.WithTimeout(forever).SendAndWaitForWire(ec.GetDefaultSender()) } else { - err = msg.WithTimeout(time.Until(ec.writeDeadline)).SendAndWaitForWire(ec.Channel) + err = msg.WithTimeout(time.Until(ec.writeDeadline)).SendAndWaitForWire(ec.GetDefaultSender()) } if err != nil { @@ -193,7 +193,7 @@ func (ec *MsgChannel) WriteTraced(data []byte, msgUUID []byte, hdrs map[int32][] func (ec *MsgChannel) SendState(msg *channel.Message) error { msg.PutUint32Header(SeqHeader, ec.msgIdSeq.Next()) ec.TraceMsg("SendState", msg) - return msg.WithTimeout(5 * time.Second).SendAndWaitForWire(ec.Channel) + return msg.WithTimeout(5 * time.Second).SendAndWaitForWire(ec.GetDefaultSender()) } func (ec *MsgChannel) TraceMsg(source string, msg *channel.Message) { diff --git a/ziti/edge/network/conn.go b/ziti/edge/network/conn.go index 8dd05a4f..6eee7a09 100644 --- a/ziti/edge/network/conn.go +++ b/ziti/edge/network/conn.go @@ -148,7 +148,7 @@ func (conn *edgeConn) Accept(msg *channel.Message) { if msg.ContentType == edge.ContentTypeConnInspectRequest { resp := edge.NewConnInspectResponse(0, edge.ConnType(conn.connType), conn.Inspect()) - if err := resp.ReplyTo(msg).Send(conn.Channel); err != nil { + if err := resp.ReplyTo(msg).Send(conn.GetControlSender()); err != nil { logrus.WithFields(edge.GetLoggerFields(msg)).WithError(err). Error("failed to send inspect response") } @@ -179,7 +179,7 @@ func (conn *edgeConn) Accept(msg *channel.Message) { resp.Headers[edge.UUIDHeader] = msgUUID } - if err := conn.Send(resp); err != nil { + if err := conn.GetControlSender().Send(resp); err != nil { logrus.WithFields(edge.GetLoggerFields(msg)).WithError(err). Error("failed to send trace route response") } @@ -289,7 +289,7 @@ func (conn *edgeConn) Connect(session *rest_model.SessionDetail, options *edge.D connectRequest := edge.NewConnectMsg(conn.Id(), *session.Token, pub, options) connectRequest.Headers[edge.ConnectionMarkerHeader] = []byte(conn.marker) conn.TraceMsg("connect", connectRequest) - replyMsg, err := connectRequest.WithTimeout(options.ConnectTimeout).SendForReply(conn.Channel) + replyMsg, err := connectRequest.WithTimeout(options.ConnectTimeout).SendForReply(conn.GetControlSender()) if err != nil { logger.Error(err) return nil, err @@ -386,7 +386,7 @@ func (conn *edgeConn) establishServerCrypto(keypair *kx.KeyPair, peerKey []byte, } func (conn *edgeConn) listen(session *rest_model.SessionDetail, service *rest_model.ServiceDetail, options *edge.ListenOptions) (*edgeListener, error) { - logger := pfxlog.ContextLogger(conn.Channel.Label()). + logger := pfxlog.ContextLogger(conn.GetChannel().Label()). WithField("connId", conn.Id()). WithField("serviceName", *service.Name). WithField("sessionId", *session.ID) @@ -420,7 +420,7 @@ func (conn *edgeConn) listen(session *rest_model.SessionDetail, service *rest_mo } bindRequest := edge.NewBindMsg(conn.Id(), *session.Token, pub, options) conn.TraceMsg("listen", bindRequest) - replyMsg, err := bindRequest.WithTimeout(5 * time.Second).SendForReply(conn.Channel) + replyMsg, err := bindRequest.WithTimeout(5 * time.Second).SendForReply(conn.GetControlSender()) if err != nil { logger.WithError(err).Error("failed to bind") return nil, err @@ -449,7 +449,7 @@ func (conn *edgeConn) unbind(logger *logrus.Entry, token string) { conn.hosting.Remove(token) unbindRequest := edge.NewUnbindMsg(conn.Id(), token) - if err := unbindRequest.WithTimeout(5 * time.Second).SendAndWaitForWire(conn.Channel); err != nil { + if err := unbindRequest.WithTimeout(5 * time.Second).SendAndWaitForWire(conn.GetControlSender()); err != nil { logger.WithError(err).Error("unable to send unbind msg for conn") } else { logger.Debug("unbind message sent successfully") @@ -624,7 +624,7 @@ func (conn *edgeConn) newChildConnection(message *channel.Message) { logger.Warn("listener not found") reply := edge.NewDialFailedMsg(conn.Id(), "invalid token") reply.ReplyTo(message) - if err := reply.WithPriority(channel.Highest).WithTimeout(5 * time.Second).SendAndWaitForWire(conn.Channel); err != nil { + if err := reply.WithPriority(channel.Highest).WithTimeout(5 * time.Second).SendAndWaitForWire(conn.GetControlSender()); err != nil { logger.WithError(err).Error("failed to send reply to dial request") } return @@ -644,7 +644,7 @@ func (conn *edgeConn) newChildConnection(message *channel.Message) { marker, _ := message.GetStringHeader(edge.ConnectionMarkerHeader) edgeCh := &edgeConn{ - MsgChannel: *edge.NewEdgeMsgChannel(conn.Channel, id), + MsgChannel: *edge.NewEdgeMsgChannel(conn.SdkChannel, id), readQ: NewNoopSequencer[*channel.Message](4), msgMux: conn.msgMux, sourceIdentity: sourceIdentity, @@ -667,7 +667,7 @@ func (conn *edgeConn) newChildConnection(message *channel.Message) { newConnLogger.WithError(err).Error("invalid conn id, already in use") reply := edge.NewDialFailedMsg(conn.Id(), err.Error()) reply.ReplyTo(message) - if err := reply.WithPriority(channel.Highest).WithTimeout(5 * time.Second).SendAndWaitForWire(conn.Channel); err != nil { + if err := reply.WithPriority(channel.Highest).WithTimeout(5 * time.Second).SendAndWaitForWire(conn.GetControlSender()); err != nil { logger.WithError(err).Error("failed to send reply to dial request") } return @@ -692,7 +692,7 @@ func (conn *edgeConn) newChildConnection(message *channel.Message) { newConnLogger.WithError(err).Error("failed to establish connection") reply := edge.NewDialFailedMsg(conn.Id(), err.Error()) reply.ReplyTo(message) - if err := reply.WithPriority(channel.Highest).WithTimeout(5 * time.Second).SendAndWaitForWire(conn.Channel); err != nil { + if err := reply.WithPriority(channel.Highest).WithTimeout(5 * time.Second).SendAndWaitForWire(conn.GetControlSender()); err != nil { logger.WithError(err).Error("failed to send reply to dial request") } return @@ -739,7 +739,7 @@ func (conn *edgeConn) CompleteAcceptFailed(err error) { func (conn *edgeConn) TraceRoute(hops uint32, timeout time.Duration) (*edge.TraceRouteResult, error) { msg := edge.NewTraceRouteMsg(conn.Id(), hops, uint64(info.NowInMilliseconds())) - resp, err := msg.WithTimeout(timeout).SendForReply(conn.Channel) + resp, err := msg.WithTimeout(timeout).SendForReply(conn.GetDefaultSender()) if err != nil { return nil, err } @@ -787,7 +787,7 @@ func (self *newConnHandler) dialFailed(err error) { newConnLogger.WithError(err).Error("Failed to establish connection") reply := edge.NewDialFailedMsg(self.conn.Id(), err.Error()) reply.ReplyTo(self.message) - if err := reply.WithPriority(channel.Highest).WithTimeout(5 * time.Second).SendAndWaitForWire(self.conn.Channel); err != nil { + if err := reply.WithPriority(channel.Highest).WithTimeout(5 * time.Second).SendAndWaitForWire(self.conn.GetControlSender()); err != nil { logger.WithError(err).Error("Failed to send reply to dial request") } } @@ -807,7 +807,7 @@ func (self *newConnHandler) dialSucceeded() error { reply.ReplyTo(self.message) if !self.routerProvidedConnId { - startMsg, err := reply.WithPriority(channel.Highest).WithTimeout(5 * time.Second).SendForReply(self.conn.Channel) + startMsg, err := reply.WithPriority(channel.Highest).WithTimeout(5 * time.Second).SendForReply(self.conn.GetControlSender()) if err != nil { logger.WithError(err).Error("Failed to send reply to dial request") return err @@ -817,7 +817,7 @@ func (self *newConnHandler) dialSucceeded() error { logger.Errorf("failed to receive start after dial. got %v", startMsg) return errors.Errorf("failed to receive start after dial. got %v", startMsg) } - } else if err := reply.WithPriority(channel.Highest).WithTimeout(time.Second * 5).SendAndWaitForWire(self.conn.Channel); err != nil { + } else if err := reply.WithPriority(channel.Highest).WithTimeout(time.Second * 5).SendAndWaitForWire(self.conn.GetControlSender()); err != nil { logger.WithError(err).Error("Failed to send reply to dial request") return err } diff --git a/ziti/edge/network/conn_test.go b/ziti/edge/network/conn_test.go index c34902f7..dc5f8c9c 100644 --- a/ziti/edge/network/conn_test.go +++ b/ziti/edge/network/conn_test.go @@ -30,7 +30,7 @@ func BenchmarkConnWriteBaseLine(b *testing.B) { func BenchmarkConnWrite(b *testing.B) { mux := edge.NewCowMapMsgMux() - testChannel := &NoopTestChannel{} + testChannel := edge.NewSingleSdkChannel(&NoopTestChannel{}) conn := &edgeConn{ MsgChannel: *edge.NewEdgeMsgChannel(testChannel, 1), readQ: NewNoopSequencer[*channel.Message](4), @@ -53,7 +53,7 @@ func BenchmarkConnWrite(b *testing.B) { func BenchmarkConnRead(b *testing.B) { mux := edge.NewCowMapMsgMux() - testChannel := &NoopTestChannel{} + testChannel := edge.NewSingleSdkChannel(&NoopTestChannel{}) readQ := NewNoopSequencer[*channel.Message](4) conn := &edgeConn{ @@ -126,7 +126,7 @@ func BenchmarkSequencer(b *testing.B) { func TestReadMultipart(t *testing.T) { req := require.New(t) mux := edge.NewCowMapMsgMux() - testChannel := &NoopTestChannel{} + testChannel := edge.NewSingleSdkChannel(&NoopTestChannel{}) readQ := NewNoopSequencer[*channel.Message](4) conn := &edgeConn{ @@ -176,7 +176,7 @@ type NoopTestChannel struct { } func (ch *NoopTestChannel) Headers() map[int32][]byte { - panic("implement me") + return nil } func (ch *NoopTestChannel) TrySend(s channel.Sendable) (bool, error) { diff --git a/ziti/edge/network/factory.go b/ziti/edge/network/factory.go index f6cc5b6b..0107de4a 100644 --- a/ziti/edge/network/factory.go +++ b/ziti/edge/network/factory.go @@ -36,13 +36,13 @@ type RouterConnOwner interface { type routerConn struct { routerName string key string - ch channel.Channel + ch edge.SdkChannel msgMux edge.MsgMux owner RouterConnOwner } func (conn *routerConn) GetBoolHeader(key int32) bool { - val := conn.ch.Underlay().Headers()[key] + val := conn.ch.GetChannel().Headers()[key] return len(val) == 1 && val[0] == 1 } @@ -72,7 +72,12 @@ func NewEdgeConnFactory(routerName, key string, owner RouterConnOwner) edge.Rout } func (conn *routerConn) BindChannel(binding channel.Binding) error { - conn.ch = binding.GetChannel() + if multiChannel, ok := binding.GetChannel().(channel.MultiChannel); ok { + conn.ch = multiChannel.GetUnderlayHandler().(edge.SdkChannel) + conn.ch.InitChannel(multiChannel) + } else { + conn.ch = edge.NewSingleSdkChannel(binding.GetChannel()) + } binding.AddReceiveHandlerF(edge.ContentTypeDial, conn.msgMux.HandleReceive) binding.AddReceiveHandlerF(edge.ContentTypeStateClosed, conn.msgMux.HandleReceive) @@ -118,7 +123,7 @@ func (conn *routerConn) NewDialConn(service *rest_model.ServiceDetail) *edgeConn func (conn *routerConn) UpdateToken(token []byte, timeout time.Duration) error { msg := edge.NewUpdateTokenMsg(token) - resp, err := msg.WithTimeout(timeout).SendForReply(conn.ch) + resp, err := msg.WithTimeout(timeout).SendForReply(conn.ch.GetControlSender()) if err != nil { return err @@ -201,13 +206,9 @@ func (conn *routerConn) Listen(service *rest_model.ServiceDetail, session *rest_ } func (conn *routerConn) Close() error { - if !conn.ch.IsClosed() { - return conn.ch.Close() - } - - return nil + return conn.ch.GetChannel().Close() } func (conn *routerConn) IsClosed() bool { - return conn.ch.IsClosed() + return conn.ch.GetChannel().IsClosed() } diff --git a/ziti/edge/network/listener.go b/ziti/edge/network/listener.go index 92dae7a5..28e605ed 100644 --- a/ziti/edge/network/listener.go +++ b/ziti/edge/network/listener.go @@ -121,7 +121,7 @@ func (listener *edgeListener) updateCostAndPrecedence(cost *uint16, precedence * logger.Debug("sending update bind request to edge router") request := edge.NewUpdateBindMsg(listener.edgeChan.Id(), listener.token, cost, precedence) listener.edgeChan.TraceMsg("updateCostAndPrecedence", request) - return request.WithTimeout(5 * time.Second).SendAndWaitForWire(listener.edgeChan.Channel) + return request.WithTimeout(5 * time.Second).SendAndWaitForWire(listener.edgeChan.GetControlSender()) } func (listener *edgeListener) SendHealthEvent(pass bool) error { @@ -134,7 +134,7 @@ func (listener *edgeListener) SendHealthEvent(pass bool) error { logger.Debug("sending health event to edge router") request := edge.NewHealthEventMsg(listener.edgeChan.Id(), listener.token, pass) listener.edgeChan.TraceMsg("healthEvent", request) - return request.WithTimeout(5 * time.Second).SendAndWaitForWire(listener.edgeChan.Channel) + return request.WithTimeout(5 * time.Second).SendAndWaitForWire(listener.edgeChan.GetControlSender()) } func (listener *edgeListener) Close() error { @@ -163,7 +163,7 @@ func (listener *edgeListener) close(closedByRemote bool) error { unbindRequest := edge.NewUnbindMsg(edgeChan.Id(), listener.token) listener.edgeChan.TraceMsg("close", unbindRequest) - if err := unbindRequest.WithTimeout(5 * time.Second).SendAndWaitForWire(edgeChan.Channel); err != nil { + if err := unbindRequest.WithTimeout(5 * time.Second).SendAndWaitForWire(edgeChan.GetControlSender()); err != nil { logger.WithError(err).Error("unable to unbind session for conn") return err } diff --git a/ziti/sdkinfo/build_info.go b/ziti/sdkinfo/build_info.go index cb4c9959..aa892b6e 100644 --- a/ziti/sdkinfo/build_info.go +++ b/ziti/sdkinfo/build_info.go @@ -20,5 +20,5 @@ package sdkinfo const ( - Version = "v0.25.2" + Version = "v1.0.0" ) diff --git a/ziti/ziti.go b/ziti/ziti.go index 7717326e..48b6dd55 100644 --- a/ziti/ziti.go +++ b/ziti/ziti.go @@ -194,6 +194,8 @@ type ContextImpl struct { events.EventEmmiter lastSuccessfulApiSessionRefresh time.Time routerProxy func(addr string) *transport.ProxyConfiguration + + enableCtrlPlaneConnection bool } func (context *ContextImpl) AddServiceAddedListener(handler func(Context, *rest_model.ServiceDetail)) func() { @@ -1408,7 +1410,38 @@ func (context *ContextImpl) connectEdgeRouter(routerName, ingressUrl string) *ed edgeConn := network.NewEdgeConnFactory(routerName, ingressUrl, context) options := channel.DefaultOptions() options.ConnectTimeout = 15 * time.Second - ch, err := channel.NewChannel(fmt.Sprintf("ziti-sdk[router=%v]", ingressUrl), dialer, edgeConn, options) + + headers := channel.Headers{} + if context.enableCtrlPlaneConnection { + headers.PutBoolHeader(channel.IsGroupedHeader, true) + headers.PutStringHeader(channel.TypeHeader, edge.ChannelTypeDefault) + } + underlay, err := dialer.CreateWithHeaders(options.ConnectTimeout, headers) + if err != nil { + logger.Error(err) + return &edgeRouterConnResult{ + routerUrl: ingressUrl, + routerName: routerName, + err: err, + } + } + + var ch channel.Channel + + if isGrouped, _ := channel.Headers(underlay.Headers()).GetBoolHeader(channel.IsGroupedHeader); isGrouped { + var dialSdkChannel = edge.NewDialSdkChannel(dialer, underlay) + multiChannelConfig := &channel.MultiChannelConfig{ + LogicalName: fmt.Sprintf("ziti-sdk[router=%v]", ingressUrl), + Options: options, + UnderlayHandler: dialSdkChannel, + BindHandler: edgeConn, + Underlay: underlay, + } + ch, err = channel.NewMultiChannel(multiChannelConfig) + } else { + ch, err = channel.NewChannelWithUnderlay(fmt.Sprintf("ziti-sdk[router=%v]", ingressUrl), underlay, edgeConn, options) + } + if err != nil { logger.Error(err) return &edgeRouterConnResult{ @@ -1417,10 +1450,11 @@ func (context *ContextImpl) connectEdgeRouter(routerName, ingressUrl string) *ed err: err, } } + connectTime := time.Duration(time.Now().UnixNano() - start) logger.Debugf("routerConn[%s@%s] connected in %d ms", routerName, ingressUrl, connectTime.Milliseconds()) - if versionHeader, found := ch.Underlay().Headers()[channel.HelloVersionHeader]; found { + if versionHeader, found := ch.Headers()[channel.HelloVersionHeader]; found { versionInfo, err := versions.StdVersionEncDec.Decode(versionHeader) if err != nil { pfxlog.Logger().Errorf("could not parse hello version header: %v", err)