diff --git a/.env b/.env index 6af24be6..2b735a68 100644 --- a/.env +++ b/.env @@ -1,14 +1,23 @@ -MGATE_MQTT_WITHOUT_TLS_ADDRESS=:1884 -MGATE_MQTT_WITHOUT_TLS_TARGET=localhost:1883 +MGATE_MQTT_WITHOUT_TLS_HOST=localhost +MGATE_MQTT_WITHOUT_TLS_PORT=1884 +MGATE_MQTT_WITHOUT_TLS_TARGET_PROTOCOL=mqtt +MGATE_MQTT_WITHOUT_TLS_TARGET_HOST=localhost +MGATE_MQTT_WITHOUT_TLS_TARGET_PORT=1883 -MGATE_MQTT_WITH_TLS_ADDRESS=:8883 -MGATE_MQTT_WITH_TLS_TARGET=localhost:1883 +MGATE_MQTT_WITH_TLS_HOST=localhost +MGATE_MQTT_WITH_TLS_PORT=8883 +MGATE_MQTT_WITH_TLS_TARGET_PROTOCOL=mqtt +MGATE_MQTT_WITH_TLS_TARGET_HOST=localhost +MGATE_MQTT_WITH_TLS_TARGET_PORT=1883 MGATE_MQTT_WITH_TLS_CERT_FILE=ssl/certs/server.crt MGATE_MQTT_WITH_TLS_KEY_FILE=ssl/certs/server.key MGATE_MQTT_WITH_TLS_SERVER_CA_FILE=ssl/certs/ca.crt -MGATE_MQTT_WITH_MTLS_ADDRESS=:8884 -MGATE_MQTT_WITH_MTLS_TARGET=localhost:1883 +MGATE_MQTT_WITH_MTLS_HOST=localhost +MGATE_MQTT_WITH_MTLS_PORT=8884 +MGATE_MQTT_WITH_MTLS_TARGET_PROTOCOL=mqtt +MGATE_MQTT_WITH_MTLS_TARGET_HOST=localhost +MGATE_MQTT_WITH_MTLS_TARGET_PORT=1883 MGATE_MQTT_WITH_MTLS_CERT_FILE=ssl/certs/server.crt MGATE_MQTT_WITH_MTLS_KEY_FILE=ssl/certs/server.key MGATE_MQTT_WITH_MTLS_SERVER_CA_FILE=ssl/certs/ca.crt @@ -16,18 +25,32 @@ MGATE_MQTT_WITH_MTLS_CLIENT_CA_FILE=ssl/certs/ca.crt MGATE_MQTT_WITH_MTLS_CERT_VERIFICATION_METHODS=ocsp MGATE_MQTT_WITH_MTLS_OCSP_RESPONDER_URL=http://localhost:8080/ocsp -MGATE_MQTT_WS_WITHOUT_TLS_ADDRESS=:8083 -MGATE_MQTT_WS_WITHOUT_TLS_TARGET=ws://localhost:8000/ +MGATE_MQTT_WS_WITHOUT_TLS_HOST=localhost +MGATE_MQTT_WS_WITHOUT_TLS_PORT=8083 +MGATE_MQTT_WS_WITHOUT_TLS_PATH_PREFIX=/mgate-ws +MGATE_MQTT_WS_WITHOUT_TLS_TARGET_PROTOCOL=ws +MGATE_MQTT_WS_WITHOUT_TLS_TARGET_HOST=localhost +MGATE_MQTT_WS_WITHOUT_TLS_TARGET_PORT=8000 +MGATE_MQTT_WS_WITHOUT_TLS_TARGET_PATH= -MGATE_MQTT_WS_WITH_TLS_ADDRESS=:8084 -MGATE_MQTT_WS_WITH_TLS_TARGET=ws://localhost:8000/ +MGATE_MQTT_WS_WITH_TLS_HOST=localhost +MGATE_MQTT_WS_WITH_TLS_PORT=8084 +MGATE_MQTT_WS_WITH_TLS_PATH_PREFIX=/mgate-ws +MGATE_MQTT_WS_WITH_TLS_TARGET_PROTOCOL=ws +MGATE_MQTT_WS_WITH_TLS_TARGET_HOST=localhost +MGATE_MQTT_WS_WITH_TLS_TARGET_PORT=8000 +MGATE_MQTT_WS_WITH_TLS_TARGET_PATH= MGATE_MQTT_WS_WITH_TLS_CERT_FILE=ssl/certs/server.crt MGATE_MQTT_WS_WITH_TLS_KEY_FILE=ssl/certs/server.key MGATE_MQTT_WS_WITH_TLS_SERVER_CA_FILE=ssl/certs/ca.crt -MGATE_MQTT_WS_WITH_MTLS_ADDRESS=:8085 -MGATE_MQTT_WS_WITH_MTLS_PATH_PREFIX=/mqtt -MGATE_MQTT_WS_WITH_MTLS_TARGET=ws://localhost:8000/ +MGATE_MQTT_WS_WITH_MTLS_HOST=localhost +MGATE_MQTT_WS_WITH_MTLS_PORT=8085 +MGATE_MQTT_WS_WITH_MTLS_PATH_PREFIX=/mgate-ws +MGATE_MQTT_WS_WITH_MTLS_TARGET_PROTOCOL=ws +MGATE_MQTT_WS_WITH_MTLS_TARGET_HOST=localhost +MGATE_MQTT_WS_WITH_MTLS_TARGET_PORT=8000 +MGATE_MQTT_WS_WITH_MTLS_TARGET_PATH= MGATE_MQTT_WS_WITH_MTLS_CERT_FILE=ssl/certs/server.crt MGATE_MQTT_WS_WITH_MTLS_KEY_FILE=ssl/certs/server.key MGATE_MQTT_WS_WITH_MTLS_SERVER_CA_FILE=ssl/certs/ca.crt @@ -35,19 +58,31 @@ MGATE_MQTT_WS_WITH_MTLS_CLIENT_CA_FILE=ssl/certs/ca.crt MGATE_MQTT_WS_WITH_MTLS_CERT_VERIFICATION_METHODS=ocsp MGATE_MQTT_WS_WITH_MTLS_OCSP_RESPONDER_URL=http://localhost:8080/ocsp -MGATE_HTTP_WITHOUT_TLS_ADDRESS=:8086 -MGATE_HTTP_WITHOUT_TLS_PATH_PREFIX=/messages -MGATE_HTTP_WITHOUT_TLS_TARGET=http://localhost:8888/ +MGATE_HTTP_WITHOUT_TLS_PORT=8086 +MGATE_HTTP_WITHOUT_TLS_PATH_PREFIX=/mgate-http +MGATE_HTTP_WITHOUT_TLS_TARGET_PROTOCOL=http +MGATE_HTTP_WITHOUT_TLS_TARGET_HOST=localhost +MGATE_HTTP_WITHOUT_TLS_TARGET_PORT=8888 +MGATE_HTTP_WITHOUT_TLS_TARGET_PATH=/messages -MGATE_HTTP_WITH_TLS_ADDRESS=:8087 -MGATE_HTTP_WITH_TLS_PATH_PREFIX=/messages + +MGATE_HTTP_WITH_TLS_PORT=8087 +MGATE_HTTP_WITH_TLS_PATH_PREFIX=/mgate-http +MGATE_HTTP_WITH_TLS_TARGET_PROTOCOL=http +MGATE_HTTP_WITH_TLS_TARGET_HOST=localhost +MGATE_HTTP_WITH_TLS_TARGET_PORT=8888 +MGATE_HTTP_WITH_TLS_TARGET_PATH=/messages MGATE_HTTP_WITH_TLS_TARGET=http://localhost:8888/ MGATE_HTTP_WITH_TLS_CERT_FILE=ssl/certs/server.crt MGATE_HTTP_WITH_TLS_KEY_FILE=ssl/certs/server.key MGATE_HTTP_WITH_TLS_SERVER_CA_FILE=ssl/certs/ca.crt -MGATE_HTTP_WITH_MTLS_ADDRESS=:8088 -MGATE_HTTP_WITH_MTLS_PATH_PREFIX=/messages +MGATE_HTTP_WITH_MTLS_PORT=8088 +MGATE_HTTP_WITH_MTLS_PATH_PREFIX=/mgate-http +MGATE_HTTP_WITH_MTLS_TARGET_PROTOCOL=http +MGATE_HTTP_WITH_MTLS_TARGET_HOST=localhost +MGATE_HTTP_WITH_MTLS_TARGET_PORT=8888 +MGATE_HTTP_WITH_MTLS_TARGET_PATH=/messages MGATE_HTTP_WITH_MTLS_TARGET=http://localhost:8888/ MGATE_HTTP_WITH_MTLS_CERT_FILE=ssl/certs/server.crt MGATE_HTTP_WITH_MTLS_KEY_FILE=ssl/certs/server.key diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 01122d70..edfe942b 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -23,13 +23,13 @@ jobs: - name: Setup Go uses: actions/setup-go@v4 with: - go-version: 1.21.x + go-version: 1.22.x cache-dependency-path: "go.sum" - name: golangci-lint - uses: golangci/golangci-lint-action@v6 + uses: golangci/golangci-lint-action@v7 with: - version: v1.61.0 + version: v2.0.2 args: --config .golangci.yml - name: Build Binaries diff --git a/.golangci.yml b/.golangci.yml index 7a4dc00c..b855bb07 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,75 +1,91 @@ -# Copyright (c) Abstract Machines -# SPDX-License-Identifier: Apache-2.0 - -run: - timeout: 10m - -issues: - max-issues-per-linter: 100 - max-same-issues: 100 - -linters-settings: - gocritic: - enabled-checks: - - importShadow - - httpNoBody - - paramTypeCombine - - emptyStringTest - - builtinShadow - - exposedSyncMutex - disabled-checks: - - appendAssign - enabled-tags: - - diagnostic - disabled-tags: - - performance - - style - - experimental - - opinionated - misspell: - ignore-words: - - "mosquitto" - stylecheck: - checks: ["-ST1000", "-ST1003", "-ST1020", "-ST1021", "-ST1022"] - goheader: - template: |- - Copyright (c) Abstract Machines - SPDX-License-Identifier: Apache-2.0 - +version: "2" linters: - disable-all: true + default: none enable: - - gocritic - - gosimple - - errcheck - - govet - - unused - - goconst - - godot - - godox - - ineffassign - - misspell - - stylecheck - - whitespace - - gci - - gofmt - - goimports - - loggercheck - - goheader - asasalint - asciicheck - bidichk + - copyloopvar - decorder - dogsled + - dupword + - errcheck - errchkjson - errname - - copyloopvar - ginkgolinter - gocheckcompilerdirectives - - gofumpt + - goconst + - gocritic + - godot + - godox + - goheader - goprintffuncname + - govet - importas + - ineffassign + - loggercheck - makezero - mirror + - misspell - nakedret - - dupword + - staticcheck + - unused + - whitespace + settings: + gocritic: + enabled-checks: + - importShadow + - httpNoBody + - paramTypeCombine + - emptyStringTest + - builtinShadow + - exposedSyncMutex + disabled-checks: + - appendAssign + enabled-tags: + - diagnostic + disabled-tags: + - performance + - style + - experimental + - opinionated + goheader: + template: |- + Copyright (c) Abstract Machines + SPDX-License-Identifier: Apache-2.0 + misspell: + ignore-rules: + - mosquitto + staticcheck: + checks: + - -ST1000 + - -ST1003 + - -ST1020 + - -ST1021 + - -ST1022 + exclusions: + generated: lax + presets: + - comments + - common-false-positives + - legacy + - std-error-handling + paths: + - third_party$ + - builtin$ + - examples$ +issues: + max-issues-per-linter: 100 + max-same-issues: 100 +formatters: + enable: + - gci + - gofmt + - gofumpt + - goimports + exclusions: + generated: lax + paths: + - third_party$ + - builtin$ + - examples$ diff --git a/cmd/main.go b/cmd/main.go index 0995bb67..17425733 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -134,7 +134,7 @@ func main() { } // mGate server for HTTP without TLS - httpProxy, err := http.NewProxy(httpConfig, handler, logger) + httpProxy, err := http.NewProxy(httpConfig, handler, logger, []string{}, []string{}) if err != nil { panic(err) } @@ -149,7 +149,7 @@ func main() { } // mGate server for HTTP with TLS - httpTLSProxy, err := http.NewProxy(httpTLSConfig, handler, logger) + httpTLSProxy, err := http.NewProxy(httpTLSConfig, handler, logger, []string{}, []string{}) if err != nil { panic(err) } @@ -164,7 +164,7 @@ func main() { } // mGate server for HTTP with mTLS - httpMTLSProxy, err := http.NewProxy(httpMTLSConfig, handler, logger) + httpMTLSProxy, err := http.NewProxy(httpMTLSConfig, handler, logger, []string{}, []string{}) if err != nil { panic(err) } diff --git a/config.go b/config.go index a550a17c..60682373 100644 --- a/config.go +++ b/config.go @@ -11,10 +11,14 @@ import ( ) type Config struct { - Address string `env:"ADDRESS" envDefault:""` - PathPrefix string `env:"PATH_PREFIX" envDefault:"/"` - Target string `env:"TARGET" envDefault:""` - TLSConfig *tls.Config + Host string `env:"HOST" envDefault:""` + Port string `env:"PORT,required" envDefault:""` + PathPrefix string `env:"PATH_PREFIX" envDefault:""` + TargetHost string `env:"TARGET_HOST,required" envDefault:""` + TargetPort string `env:"TARGET_PORT,required" envDefault:""` + TargetProtocol string `env:"TARGET_PROTOCOL,required" envDefault:""` + TargetPath string `env:"TARGET_PATH" envDefault:""` + TLSConfig *tls.Config } func NewConfig(opts env.Options) (Config, error) { diff --git a/examples/client/http/websocket/Readme.md b/examples/client/http/websocket/Readme.md new file mode 100644 index 00000000..aa004d7b --- /dev/null +++ b/examples/client/http/websocket/Readme.md @@ -0,0 +1,3 @@ +## Requirements to run scripts +- [Websocat 4.0.0](https://github.com/vi/websocat) +- OpenSSL diff --git a/examples/client/http/websocket/with_mtls.sh b/examples/client/http/websocket/with_mtls.sh new file mode 100755 index 00000000..16d6fcde --- /dev/null +++ b/examples/client/http/websocket/with_mtls.sh @@ -0,0 +1,38 @@ +#!/bin/bash +protocol=wss +host=localhost +port=8088 +path="mgate-http/messages/ws" +content="application/json" +message="{\"message\": \"Hello mGate\"}" +invalidPath="invalid_path" +cafile=ssl/certs/ca.crt +certfile=ssl/certs/client.crt +keyfile=ssl/certs/client.key +reovokedcertfile=ssl/certs/client_revoked.crt +reovokedkeyfile=ssl/certs/client_revoked.key +unknowncertfile=ssl/certs/client_unknown.crt +unknownkeyfile=ssl/certs/client_unknown.key + +echo "Posting message to ${protocol}://${host}:${port}/${path} with tls, Authorization header, ca & client certificates ${cafile} ${certfile} ${keyfile}..." +echo "${message}" | websocat --binary --ws-c-uri="${protocol}://${host}:${port}/${path}" -H "content-type:${content}" -H "Authorization:TOKEN" - ws-c:cmd:"openssl s_client -connect ${host}:${port} -quiet -verify_quiet -CAfile ${cafile} -cert ${certfile} -key ${keyfile}" + + +echo -e "\nPosting message to ${protocol}://${host}:${port}/${path} with tls, basic authentication ca & client certificates ${cafile} ${certfile} ${keyfile}..." +encoded=$(printf "username:password" | base64) +echo "${message}" | websocat --binary --ws-c-uri="${protocol}://${host}:${port}/${path}" -H "content-type:${content}" -H "Authorization: Basic $encoded" - ws-c:cmd:"openssl s_client -connect ${host}:${port} -quiet -verify_quiet -CAfile ${cafile} -cert ${certfile} -key ${keyfile}" + +echo -e "\nPosting message to invalid path ${protocol}://${host}:${port}/${path}/${invalidPath} with tls, Authorization header, ca & client certificates ${cafile} ${certfile} ${keyfile}..." +echo "${message}" | websocat --binary --ws-c-uri="${protocol}://${host}:${port}/${invalidPath}" -H "content-type:${content}" -H "Authorization:TOKEN" - ws-c:cmd:"openssl s_client -connect ${host}:${port} -quiet -verify_quiet -CAfile ${cafile} -cert ${certfile} -key ${keyfile}" + +echo -e "\nPosting message to ${protocol}://${host}:${port}/${path} with tls, Authorization header, ca certificates ${cafile} & reovked client certificate ${reovokedcertfile} ${reovokedkeyfile}..." +echo "${message}" | websocat --binary --ws-c-uri="${protocol}://${host}:${port}/${path}" -H "content-type:${content}" -H "Authorization:TOKEN" - ws-c:cmd:"openssl s_client -connect ${host}:${port} -quiet -verify_quiet -CAfile ${cafile} -cert ${reovokedcertfile} -key ${reovokedkeyfile}" + +echo -e "\nPosting message to ${protocol}://${host}:${port}/${path} with tls, Authorization header, ca certificates ${cafile} & unknown client certificate ${unknowncertfile} ${unknownkeyfile}..." +echo "${message}" | websocat --binary --ws-c-uri="${protocol}://${host}:${port}/${path}" -H "content-type:${content}" -H "Authorization:TOKEN" - ws-c:cmd:"openssl s_client -connect ${host}:${port} -quiet -verify_quiet -CAfile ${cafile} -cert ${unknowncertfile} -key ${unknownkeyfile}" + +echo -e "\nPosting message to ${protocol}://${host}:${port}/${path} with tls, Authorization header, ca certificate ${cafile} & without client certificates.." +echo "${message}" | websocat --binary --ws-c-uri="${protocol}://${host}:${port}/${path}" -H "content-type:${content}" -H "Authorization:TOKEN" - ws-c:cmd:"openssl s_client -connect ${host}:${port} -quiet -verify_quiet -CAfile ${cafile}" + +echo -e "\nPosting message to ${protocol}://${host}:${port}/${path} with tls, Authorization header, & without ca , client certificates.." +echo "${message}" | websocat --binary --ws-c-uri="${protocol}://${host}:${port}/${path}" -H "content-type:${content}" -H "Authorization:TOKEN" - ws-c:cmd:"openssl s_client -connect ${host}:${port} -quiet -verify_quiet" diff --git a/examples/client/http/websocket/with_tls.sh b/examples/client/http/websocket/with_tls.sh new file mode 100755 index 00000000..13dbf360 --- /dev/null +++ b/examples/client/http/websocket/with_tls.sh @@ -0,0 +1,29 @@ +#!/bin/bash +protocol=wss +host=localhost +port=8087 +path="mgate-http/messages/ws" +content="application/json" +message="{\"message\": \"Hello mGate\"}" +invalidPath="invalid_path" +cafile=ssl/certs/ca.crt +certfile=ssl/certs/client.crt +keyfile=ssl/certs/client.key +reovokedcertfile=ssl/certs/client_revoked.crt +reovokedkeyfile=ssl/certs/client_revoked.key +unknowncertfile=ssl/certs/client_unknown.crt +unknownkeyfile=ssl/certs/client_unknown.key + +echo "Posting message to ${protocol}://${host}:${port}/${path} with tls, Authorization header, ca certificate ${cafile}..." +# echo "${message}" | websocat -H "content-type:${content}" -H "Authorization:TOKEN" --binary --ws-c-uri="${protocol}://${host}:${port}/${path}" - ws-c:cmd:"openssl s_client -connect ${host}:${port} -quiet -verify_quiet -CAfile ${cafile}" +echo "${message}" | SSL_CERT_FILE="${cafile}" websocat "${protocol}://${host}:${port}/${path}" -H "content-type:${content}" -H "Authorization:TOKEN" + + +echo -e "\nPosting message to ${protocol}://${host}:${port}/${path} with tls, basic authentication ca certificate ${cafile}...." +encoded=$(printf "username:password" | base64) +echo "${message}" | SSL_CERT_FILE="${cafile}" websocat "${protocol}://${host}:${port}/${path}" -H "content-type:${content}" -H "Authorization: Basic $encoded" + + +echo -e "\nPosting message to ${protocol}://${host}:${port}/${path} with tls, Authorization header, and without ca certificate.." +echo "${message}" | websocat "${protocol}://${host}:${port}/${path}" -H "content-type:${content}" -H "Authorization: Basic $encoded" + diff --git a/examples/client/http/websocket/without_tls.sh b/examples/client/http/websocket/without_tls.sh new file mode 100755 index 00000000..0dc8a44e --- /dev/null +++ b/examples/client/http/websocket/without_tls.sh @@ -0,0 +1,23 @@ +#!/bin/bash +protocol=ws +host=localhost +port=8086 +path="mgate-http/messages/ws" +content="application/json" +message="{\"message\": \"Hello mGate\"}" +invalidPath="invalid_path" + +echo "Posting message to ${protocol}://${host}:${port}/${path} without tls ..." +echo "${message}" | websocat "${protocol}://${host}:${port}/${path}" -H "content-type:${content}" -H "Authorization:TOKEN" + + +echo -e "\nPosting message to ${protocol}://${host}:${port}/${path} without tls and with basic authentication..." +echo "${message}" | websocat --basic-auth "${protocol}://${host}:${port}/${path}" -H "content-type:${content}" + + +echo -e "\nPosting message to ${protocol}://${host}:${port}/${path} without tls and with authentication in query params..." +echo "${message}" | websocat "${protocol}://${host}:${port}/${path}?authorization=TOKEN" -H "content-type:${content}" + + +echo -e "\nPosting message to invalid path ${protocol}://${host}:${port}/${invalidPath} without tls..." +echo "${message}" | websocat "${protocol}://${host}:${port}/${invalidPath}" -H "content-type:${content}" -H "Authorization:TOKEN" diff --git a/examples/client/http/with_mtls.sh b/examples/client/http/with_mtls.sh index 960fe4ec..f33cad05 100755 --- a/examples/client/http/with_mtls.sh +++ b/examples/client/http/with_mtls.sh @@ -2,7 +2,7 @@ protocol=https host=localhost port=8088 -path="messages" +path="mgate-http/messages/http" content="application/json" message="{\"message\": \"Hello mGate\"}" invalidPath="invalid_path" diff --git a/examples/client/http/with_tls.sh b/examples/client/http/with_tls.sh index ea1f2477..4dc062da 100755 --- a/examples/client/http/with_tls.sh +++ b/examples/client/http/with_tls.sh @@ -2,7 +2,7 @@ protocol=https host=localhost port=8087 -path="messages" +path="mgate-http/messages/http" content="application/json" message="{\"message\": \"Hello mGate\"}" invalidPath="invalid_path" diff --git a/examples/client/http/without_tls.sh b/examples/client/http/without_tls.sh index 20207e2b..e5be8b1f 100755 --- a/examples/client/http/without_tls.sh +++ b/examples/client/http/without_tls.sh @@ -2,7 +2,7 @@ protocol=http host=localhost port=8086 -path="messages" +path="mgate-http/messages/http" content="application/json" message="{\"message\": \"Hello mGate\"}" invalidPath="invalid_path" diff --git a/examples/client/websocket/connect.go b/examples/client/websocket/connect.go index d93a28fc..8eb7dcc9 100644 --- a/examples/client/websocket/connect.go +++ b/examples/client/websocket/connect.go @@ -1,3 +1,6 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + package websocket import ( @@ -31,7 +34,7 @@ func Connect(brokerAddress string, tlsCfg *tls.Config) (mqtt.Client, error) { return client, nil } -// Load return a TLS configuration that can be used in TLS servers +// Load return a TLS configuration that can be used in TLS servers. func LoadTLS(certFile, keyFile, serverCAFile, clientCAFile string) (*tls.Config, error) { tlsConfig := &tls.Config{} diff --git a/examples/client/websocket/with_mtls/main.go b/examples/client/websocket/with_mtls/main.go index aa128cfb..f2d73da5 100644 --- a/examples/client/websocket/with_mtls/main.go +++ b/examples/client/websocket/with_mtls/main.go @@ -1,3 +1,6 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + package main import ( diff --git a/examples/client/websocket/with_tls/main.go b/examples/client/websocket/with_tls/main.go index 4c0de0c5..7018df8d 100644 --- a/examples/client/websocket/with_tls/main.go +++ b/examples/client/websocket/with_tls/main.go @@ -1,3 +1,6 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + package main import ( diff --git a/examples/client/websocket/without_tls/main.go b/examples/client/websocket/without_tls/main.go index 835437a5..42692b9e 100644 --- a/examples/client/websocket/without_tls/main.go +++ b/examples/client/websocket/without_tls/main.go @@ -1,3 +1,6 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + package main import ( diff --git a/examples/ocsp-crl-responder/main.go b/examples/ocsp-crl-responder/main.go index 5112f1fa..0a320c34 100644 --- a/examples/ocsp-crl-responder/main.go +++ b/examples/ocsp-crl-responder/main.go @@ -133,8 +133,8 @@ func fileHandler(w http.ResponseWriter, r *http.Request, crlFile, fileName strin } logger.Info("Request complete successfully ", args...) - } + func ocspHandler(w http.ResponseWriter, r *http.Request, cert, issuerCert *x509.Certificate, privateKey interface{}, goodCerts, revokedCerts []*big.Int, logger slog.Logger) { ocspStatus := ocsp.Unknown diff --git a/examples/server/http-echo/main.go b/examples/server/http-echo/main.go index 943dfe16..9923964a 100644 --- a/examples/server/http-echo/main.go +++ b/examples/server/http-echo/main.go @@ -1,23 +1,73 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + package main import ( "log" "net/http" + + "github.com/gorilla/websocket" ) const defaultPort = "8888" +var upgrader = websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + // Allow any origin (similar to your CORS setting) + return true + }, +} + +func notFoundHandler(w http.ResponseWriter, r *http.Request) { + log.Println("Got request at " + r.URL.Path) + http.NotFound(w, r) +} + func echoHandler(writer http.ResponseWriter, request *http.Request) { log.Println("Echoing back request made to " + request.URL.Path + " to client (" + request.RemoteAddr + ")") writer.Header().Set("Access-Control-Allow-Origin", "*") writer.Header().Set("Content-Type", request.Header.Get("Content-Type")) - request.Write(writer) + _ = request.Write(writer) +} + +func wsHandler(w http.ResponseWriter, r *http.Request) { + log.Println("Upgrading to WebSocket connection from " + r.RemoteAddr) + + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Println("WebSocket upgrade error:", err) + return + } + defer conn.Close() + + conn.SetCloseHandler(func(code int, text string) error { + log.Printf("WebSocket closed by remote client code %d, text %s\n", code, text) + return nil + }) + + for { + messageType, message, err := conn.ReadMessage() + if err != nil { + log.Println("Read error:", err) + break + } + log.Printf("Received message from %s: %s", r.RemoteAddr, string(message)) + + err = conn.WriteMessage(messageType, message) + if err != nil { + log.Println("Write error:", err) + break + } + } } func main() { - log.Println("starting echo server, listening on port " + defaultPort) + log.Println("Starting echo server, listening on port " + defaultPort) mux := http.NewServeMux() - mux.HandleFunc("/", echoHandler) + mux.HandleFunc("/messages/http", echoHandler) + mux.HandleFunc("/messages/ws", wsHandler) + mux.HandleFunc("/", notFoundHandler) if err := http.ListenAndServe(":"+defaultPort, mux); err != nil { panic(err) } diff --git a/examples/simple/simple.go b/examples/simple/simple.go index ec19f1a7..1a524041 100644 --- a/examples/simple/simple.go +++ b/examples/simple/simple.go @@ -15,57 +15,54 @@ var errSessionMissing = errors.New("session is missing") var _ session.Handler = (*Handler)(nil) -// Handler implements mqtt.Handler interface +// Handler implements mqtt.Handler interface. type Handler struct { logger *slog.Logger } -// New creates new Event entity +// New creates new Event entity. func New(logger *slog.Logger) *Handler { return &Handler{ logger: logger, } } -// AuthConnect is called on device connection, -// prior forwarding to the MQTT broker +// prior forwarding to the MQTT broker. func (h *Handler) AuthConnect(ctx context.Context) error { return h.logAction(ctx, "AuthConnect", nil, nil) } -// AuthPublish is called on device publish, -// prior forwarding to the MQTT broker +// prior forwarding to the MQTT broker. func (h *Handler) AuthPublish(ctx context.Context, topic *string, payload *[]byte) error { return h.logAction(ctx, "AuthPublish", &[]string{*topic}, payload) } -// AuthSubscribe is called on device publish, -// prior forwarding to the MQTT broker +// prior forwarding to the MQTT broker. func (h *Handler) AuthSubscribe(ctx context.Context, topics *[]string) error { return h.logAction(ctx, "AuthSubscribe", topics, nil) } -// Connect - after client successfully connected +// Connect - after client successfully connected. func (h *Handler) Connect(ctx context.Context) error { return h.logAction(ctx, "Connect", nil, nil) } -// Publish - after client successfully published +// Publish - after client successfully published. func (h *Handler) Publish(ctx context.Context, topic *string, payload *[]byte) error { return h.logAction(ctx, "Publish", &[]string{*topic}, payload) } -// Subscribe - after client successfully subscribed +// Subscribe - after client successfully subscribed. func (h *Handler) Subscribe(ctx context.Context, topics *[]string) error { return h.logAction(ctx, "Subscribe", topics, nil) } -// Unsubscribe - after client unsubscribed +// Unsubscribe - after client unsubscribed. func (h *Handler) Unsubscribe(ctx context.Context, topics *[]string) error { return h.logAction(ctx, "Unsubscribe", topics, nil) } -// Disconnect on connection lost +// Disconnect on connection lost. func (h *Handler) Disconnect(ctx context.Context) error { return h.logAction(ctx, "Disconnect", nil, nil) } diff --git a/go.mod b/go.mod index 63743287..6ded0f62 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,8 @@ module github.com/absmach/mgate go 1.23.0 +toolchain go1.24.1 + require ( github.com/caarlos0/env/v11 v11.3.1 github.com/eclipse/paho.mqtt.golang v1.5.0 diff --git a/pkg/http/checker.go b/pkg/http/checker.go new file mode 100644 index 00000000..bff6d5f4 --- /dev/null +++ b/pkg/http/checker.go @@ -0,0 +1,85 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package http + +import ( + "errors" + "fmt" + "net/http" + "regexp" +) + +const errNotBypassFmt = "route - %s is not in bypass list" + +type bypassChecker struct { + enabled bool + byPassPatterns []*regexp.Regexp +} + +type originChecker struct { + enabled bool + allowedOrigins map[string]struct{} +} + +var ( + errBypassDisabled = errors.New("bypass disabled") + errNotAllowed = "origin - %s is not allowed" + + _ Checker = (*originChecker)(nil) + _ Checker = (*bypassChecker)(nil) +) + +func NewBypassChecker(byPassPatterns []string) (Checker, error) { + enabled := len(byPassPatterns) != 0 + var byp []*regexp.Regexp + for _, expr := range byPassPatterns { + re, err := regexp.Compile(expr) + if err != nil { + return nil, err + } + byp = append(byp, re) + } + + return &bypassChecker{ + enabled: enabled, + byPassPatterns: byp, + }, nil +} + +func (bpc *bypassChecker) Check(r *http.Request) error { + if !bpc.enabled { + return errBypassDisabled + } + for _, pattern := range bpc.byPassPatterns { + if pattern.MatchString(r.URL.Path) { + return nil + } + } + return fmt.Errorf(errNotBypassFmt, r.URL.Path) +} + +func NewOriginChecker(allowedOrigins []string) Checker { + enabled := len(allowedOrigins) != 0 + ao := make(map[string]struct{}) + for _, allowedOrigin := range allowedOrigins { + ao[allowedOrigin] = struct{}{} + } + + return &originChecker{ + enabled: enabled, + allowedOrigins: ao, + } +} + +func (oc *originChecker) Check(r *http.Request) error { + if !oc.enabled { + return nil + } + origin := r.Header.Get("Origin") + _, allowed := oc.allowedOrigins[origin] + if allowed { + return nil + } + return fmt.Errorf(errNotAllowed, origin) +} diff --git a/pkg/http/http.go b/pkg/http/http.go index ae75c062..e1ffb6a9 100644 --- a/pkg/http/http.go +++ b/pkg/http/http.go @@ -8,7 +8,6 @@ import ( "context" "crypto/tls" "encoding/json" - "errors" "fmt" "io" "log/slog" @@ -21,41 +20,69 @@ import ( "github.com/absmach/mgate" "github.com/absmach/mgate/pkg/session" mptls "github.com/absmach/mgate/pkg/tls" + "github.com/absmach/mgate/pkg/transport" + "github.com/gorilla/websocket" "golang.org/x/sync/errgroup" ) -const contentType = "application/json" +const ( + contentType = "application/json" + authzQueryKey = "authorization" + authzHeaderKey = "Authorization" + connHeaderKey = "Connection" + connHeaderVal = "upgrade" + upgradeHeaderKey = "Upgrade" + upgradeHeaderVal = "websocket" +) -// ErrMissingAuthentication returned when no basic or Authorization header is set. -var ErrMissingAuthentication = errors.New("missing authorization") +type Checker interface { + Check(r *http.Request) error +} -func (p Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { - // Metrics and health endpoints are served directly. - if r.URL.Path == "/metrics" || r.URL.Path == "/health" { - p.target.ServeHTTP(w, r) - return +func isWebSocketRequest(r *http.Request) bool { + return strings.EqualFold(r.Header.Get(connHeaderKey), connHeaderVal) && + strings.EqualFold(r.Header.Get(upgradeHeaderKey), upgradeHeaderVal) +} + +func (p Proxy) getUserPass(r *http.Request) (string, string) { + username, password, ok := r.BasicAuth() + switch { + case ok: + return username, password + case r.URL.Query().Get(authzQueryKey) != "": + password = r.URL.Query().Get(authzQueryKey) + return username, password + case r.Header.Get(authzHeaderKey) != "": + password = r.Header.Get(authzHeaderKey) + return username, password } + return username, password +} - if !strings.HasPrefix(r.URL.Path, p.config.PathPrefix) { +func (p Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if !strings.HasPrefix(r.URL.Path, transport.AddSuffixSlash(p.config.PathPrefix+p.config.TargetPath)) { http.NotFound(w, r) return } - username, password, ok := r.BasicAuth() - switch { - case ok: - break - case r.Header.Get("Authorization") != "": - password = r.Header.Get("Authorization") - default: - encodeError(w, http.StatusBadGateway, ErrMissingAuthentication) + r.URL.Path = strings.TrimPrefix(r.URL.Path, p.config.PathPrefix) + + if err := p.bypass.Check(r); err == nil { + p.target.ServeHTTP(w, r) return } + username, password := p.getUserPass(r) s := &session.Session{ Password: []byte(password), Username: username, } + + if isWebSocketRequest(r) { + p.handleWebSocket(w, r, s) + return + } + ctx := session.NewContext(r.Context(), s) payload, err := io.ReadAll(r.Body) if err != nil { @@ -82,9 +109,17 @@ func (p Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { p.logger.Error("Failed to publish", slog.Any("error", err)) return } + p.target.ServeHTTP(w, r) } +func checkOrigin(allowedOrigins []string) func(r *http.Request) bool { + oc := NewOriginChecker(allowedOrigins) + return func(r *http.Request) bool { + return oc.Check(r) == nil + } +} + func encodeError(w http.ResponseWriter, defStatusCode int, err error) { hpe, ok := err.(HTTPProxyError) if !ok { @@ -99,28 +134,40 @@ func encodeError(w http.ResponseWriter, defStatusCode int, err error) { // Proxy represents HTTP Proxy. type Proxy struct { - config mgate.Config - target *httputil.ReverseProxy - session session.Handler - logger *slog.Logger + config mgate.Config + target *httputil.ReverseProxy + session session.Handler + logger *slog.Logger + wsUpgrader websocket.Upgrader + bypass Checker } -func NewProxy(config mgate.Config, handler session.Handler, logger *slog.Logger) (Proxy, error) { - target, err := url.Parse(config.Target) +func NewProxy(config mgate.Config, handler session.Handler, logger *slog.Logger, allowedOrigins []string, bypassPaths []string) (Proxy, error) { + targetUrl := &url.URL{ + Scheme: config.TargetProtocol, + Host: net.JoinHostPort(config.TargetHost, config.TargetPort), + } + + bpc, err := NewBypassChecker(bypassPaths) if err != nil { return Proxy{}, err } + wsUpgrader := websocket.Upgrader{CheckOrigin: checkOrigin(allowedOrigins)} + return Proxy{ - config: config, - target: httputil.NewSingleHostReverseProxy(target), - session: handler, - logger: logger, + config: config, + target: httputil.NewSingleHostReverseProxy(targetUrl), + session: handler, + logger: logger, + wsUpgrader: wsUpgrader, + bypass: bpc, }, nil } func (p Proxy) Listen(ctx context.Context) error { - l, err := net.Listen("tcp", p.config.Address) + listenAddress := net.JoinHostPort(p.config.Host, p.config.Port) + l, err := net.Listen("tcp", listenAddress) if err != nil { return err } @@ -130,13 +177,14 @@ func (p Proxy) Listen(ctx context.Context) error { } status := mptls.SecurityStatus(p.config.TLSConfig) - p.logger.Info(fmt.Sprintf("HTTP proxy server started at %s%s with %s", p.config.Address, p.config.PathPrefix, status)) + p.logger.Info(fmt.Sprintf("HTTP proxy server started at %s%s with %s", listenAddress, p.config.PathPrefix, status)) var server http.Server g, ctx := errgroup.WithContext(ctx) mux := http.NewServeMux() - mux.Handle(p.config.PathPrefix, p) + + mux.Handle(transport.AddSuffixSlash(p.config.PathPrefix), p) server.Handler = mux g.Go(func() error { @@ -148,9 +196,9 @@ func (p Proxy) Listen(ctx context.Context) error { return server.Close() }) if err := g.Wait(); err != nil { - p.logger.Info(fmt.Sprintf("HTTP proxy server at %s%s with %s exiting with errors", p.config.Address, p.config.PathPrefix, status), slog.String("error", err.Error())) + p.logger.Info(fmt.Sprintf("HTTP proxy server at %s%s with %s exiting with errors", listenAddress, p.config.PathPrefix, status), slog.String("error", err.Error())) } else { - p.logger.Info(fmt.Sprintf("HTTP proxy server at %s%s with %s exiting...", p.config.Address, p.config.PathPrefix, status)) + p.logger.Info(fmt.Sprintf("HTTP proxy server at %s%s with %s exiting...", listenAddress, p.config.PathPrefix, status)) } return nil } diff --git a/pkg/http/ws.go b/pkg/http/ws.go new file mode 100644 index 00000000..22c32f66 --- /dev/null +++ b/pkg/http/ws.go @@ -0,0 +1,149 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package http + +import ( + "context" + "errors" + "fmt" + "log/slog" + "net" + "net/http" + + "github.com/absmach/mgate/pkg/session" + "github.com/gorilla/websocket" + "golang.org/x/sync/errgroup" +) + +const ( + upstreamDesc = "from mGate Proxy to websocket server" + downStreamDesc = "from websocket server to mGate Proxy" +) + +func (p *Proxy) handleWebSocket(w http.ResponseWriter, r *http.Request, s *session.Session) { + topic := r.URL.Path + ctx := session.NewContext(context.Background(), s) + if err := p.session.AuthConnect(ctx); err != nil { + encodeError(w, http.StatusUnauthorized, err) + return + } + if err := p.session.AuthSubscribe(ctx, &[]string{topic}); err != nil { + encodeError(w, http.StatusUnauthorized, err) + return + } + if err := p.session.Subscribe(ctx, &[]string{topic}); err != nil { + encodeError(w, http.StatusBadRequest, err) + return + } + + header := http.Header{} + + if auth := r.Header.Get(authzHeaderKey); auth != "" { + header.Set(authzHeaderKey, auth) + } + + target := fmt.Sprintf("%s://%s:%s%s", wsScheme(p.config.TargetProtocol), p.config.TargetHost, p.config.TargetPort, r.URL.RequestURI()) + + targetConn, _, err := websocket.DefaultDialer.Dial(target, header) + if err != nil { + http.Error(w, err.Error(), http.StatusBadGateway) + return + } + defer targetConn.Close() + + inConn, err := p.wsUpgrader.Upgrade(w, r, nil) + if err != nil { + p.logger.Warn("WS Proxy failed to upgrade connection", slog.Any("error", err)) + return + } + defer inConn.Close() + + g, ctx := errgroup.WithContext(ctx) + + g.Go(func() error { + upstream := true + err := p.stream(ctx, topic, inConn, targetConn, upstream) + if err := targetConn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "client closed")); err != nil { + p.logger.Warn(fmt.Sprintf("failed to send close connection %s", getPrefix(upstream)), slog.Any("error", err)) + } + if err := targetConn.Close(); err != nil { + p.logger.Warn("failed to send close connection to websocket server", slog.Any("error", err)) + } + return err + }) + g.Go(func() error { + upstream := false + err := p.stream(ctx, topic, targetConn, inConn, upstream) + if err := inConn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "client closed")); err != nil { + p.logger.Warn(fmt.Sprintf("failed to send close connection %s", getPrefix(upstream)), slog.Any("error", err)) + } + if err := inConn.Close(); err != nil { + p.logger.Warn("failed to send close connection to client", slog.Any("error", err)) + } + return err + }) + + gErr := g.Wait() + if err := p.session.Unsubscribe(ctx, &[]string{topic}); err != nil { + p.logger.Error("Unsubscribe failed", slog.String("topic", topic), slog.Any("error", err)) + } + if gErr != nil { + p.logger.Error("WS Proxy session terminated", slog.Any("error", gErr)) + return + } + p.logger.Info("WS Proxy session terminated") +} + +func (p *Proxy) stream(ctx context.Context, topic string, src, dest *websocket.Conn, upstream bool) error { + for { + messageType, payload, err := src.ReadMessage() + if err != nil { + return handleStreamErr(err, upstream) + } + if upstream { + if err := p.session.AuthPublish(ctx, &topic, &payload); err != nil { + return err + } + if err := p.session.Publish(ctx, &topic, &payload); err != nil { + return err + } + } + if err := dest.WriteMessage(messageType, payload); err != nil { + return err + } + } +} + +func handleStreamErr(err error, upstream bool) error { + if err == nil { + return nil + } + + if upstream && websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) { + return nil + } + if errors.Is(err, net.ErrClosed) { + return nil + } + return fmt.Errorf("%s error: %w", getPrefix(upstream), err) +} + +func getPrefix(upstream bool) string { + prefix := downStreamDesc + if upstream { + prefix = upstreamDesc + } + return prefix +} + +func wsScheme(scheme string) string { + switch scheme { + case "http": + return "ws" + case "https": + return "wss" + default: + return scheme + } +} diff --git a/pkg/mqtt/mqtt.go b/pkg/mqtt/mqtt.go index 263471dd..86e080aa 100644 --- a/pkg/mqtt/mqtt.go +++ b/pkg/mqtt/mqtt.go @@ -57,9 +57,10 @@ func (p Proxy) accept(ctx context.Context, l net.Listener) { func (p Proxy) handle(ctx context.Context, inbound net.Conn) { defer p.close(inbound) - outbound, err := p.dialer.Dial("tcp", p.config.Target) + targetAddress := net.JoinHostPort(p.config.TargetHost, p.config.TargetPort) + outbound, err := p.dialer.Dial("tcp", targetAddress) if err != nil { - p.logger.Error("Cannot connect to remote broker " + p.config.Target + " due to: " + err.Error()) + p.logger.Error("Cannot connect to remote broker " + targetAddress + " due to: " + err.Error()) return } defer p.close(outbound) @@ -77,7 +78,8 @@ func (p Proxy) handle(ctx context.Context, inbound net.Conn) { // Listen of the server, this will block. func (p Proxy) Listen(ctx context.Context) error { - l, err := net.Listen("tcp", p.config.Address) + listenAddress := net.JoinHostPort(p.config.Host, p.config.Port) + l, err := net.Listen("tcp", listenAddress) if err != nil { return err } @@ -86,7 +88,7 @@ func (p Proxy) Listen(ctx context.Context) error { l = tls.NewListener(l, p.config.TLSConfig) } status := mptls.SecurityStatus(p.config.TLSConfig) - p.logger.Info(fmt.Sprintf("MQTT proxy server started at %s with %s", p.config.Address, status)) + p.logger.Info(fmt.Sprintf("MQTT proxy server started at %s with %s", listenAddress, status)) g, ctx := errgroup.WithContext(ctx) // Acceptor loop @@ -100,9 +102,9 @@ func (p Proxy) Listen(ctx context.Context) error { return l.Close() }) if err := g.Wait(); err != nil { - p.logger.Info(fmt.Sprintf("MQTT proxy server at %s with %s exiting with errors", p.config.Address, status), slog.String("error", err.Error())) + p.logger.Info(fmt.Sprintf("MQTT proxy server at %s with %s exiting with errors", listenAddress, status), slog.String("error", err.Error())) } else { - p.logger.Info(fmt.Sprintf("MQTT proxy server at %s with %s exiting...", p.config.Address, status)) + p.logger.Info(fmt.Sprintf("MQTT proxy server at %s with %s exiting...", listenAddress, status)) } return nil } diff --git a/pkg/mqtt/websocket/websocket.go b/pkg/mqtt/websocket/websocket.go index 2713fdf2..4aa899b1 100644 --- a/pkg/mqtt/websocket/websocket.go +++ b/pkg/mqtt/websocket/websocket.go @@ -16,6 +16,7 @@ import ( "github.com/absmach/mgate" "github.com/absmach/mgate/pkg/session" mptls "github.com/absmach/mgate/pkg/tls" + "github.com/absmach/mgate/pkg/transport" "github.com/gorilla/websocket" "golang.org/x/sync/errgroup" ) @@ -52,10 +53,13 @@ var upgrader = websocket.Upgrader{ } func (p Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { - if !strings.HasPrefix(r.URL.Path, p.config.PathPrefix) { + if !strings.HasPrefix(r.URL.Path, transport.AddSuffixSlash(p.config.PathPrefix+p.config.TargetPath)) { http.NotFound(w, r) return } + + r.URL.Path = strings.TrimPrefix(r.URL.Path, p.config.PathPrefix) + cconn, err := upgrader.Upgrade(w, r, nil) if err != nil { p.logger.Error("Error upgrading connection", slog.Any("error", err)) @@ -75,7 +79,9 @@ func (p Proxy) pass(in *websocket.Conn) { dialer := &websocket.Dialer{ Subprotocols: []string{"mqtt"}, } - srv, _, err := dialer.Dial(p.config.Target, nil) + target := fmt.Sprintf("%s://%s:%s", p.config.TargetProtocol, p.config.TargetHost, p.config.TargetPath) + + srv, _, err := dialer.Dial(target, nil) if err != nil { p.logger.Error("Unable to connect to broker", slog.Any("error", err)) return @@ -100,7 +106,8 @@ func (p Proxy) pass(in *websocket.Conn) { } func (p Proxy) Listen(ctx context.Context) error { - l, err := net.Listen("tcp", p.config.Address) + listenAddress := net.JoinHostPort(p.config.Host, p.config.Port) + l, err := net.Listen("tcp", listenAddress) if err != nil { return err } @@ -113,7 +120,8 @@ func (p Proxy) Listen(ctx context.Context) error { g, ctx := errgroup.WithContext(ctx) mux := http.NewServeMux() - mux.Handle(p.config.PathPrefix, p) + + mux.Handle(transport.AddSuffixSlash(p.config.PathPrefix), p) server.Handler = mux g.Go(func() error { @@ -121,16 +129,16 @@ func (p Proxy) Listen(ctx context.Context) error { }) status := mptls.SecurityStatus(p.config.TLSConfig) - p.logger.Info(fmt.Sprintf("MQTT websocket proxy server started at %s%s with %s", p.config.Address, p.config.PathPrefix, status)) + p.logger.Info(fmt.Sprintf("MQTT websocket proxy server started at %s%s with %s", listenAddress, p.config.PathPrefix, status)) g.Go(func() error { <-ctx.Done() return server.Close() }) if err := g.Wait(); err != nil { - p.logger.Info(fmt.Sprintf("MQTT websocket proxy server at %s%s with %s exiting with errors", p.config.Address, p.config.PathPrefix, status), slog.String("error", err.Error())) + p.logger.Info(fmt.Sprintf("MQTT websocket proxy server at %s%s with %s exiting with errors", listenAddress, p.config.PathPrefix, status), slog.String("error", err.Error())) } else { - p.logger.Info(fmt.Sprintf("MQTT websocket proxy server at %s%s with %s exiting...", p.config.Address, p.config.PathPrefix, status)) + p.logger.Info(fmt.Sprintf("MQTT websocket proxy server at %s%s with %s exiting...", listenAddress, p.config.PathPrefix, status)) } return nil } diff --git a/pkg/tls/verifier/crl/crl.go b/pkg/tls/verifier/crl/crl.go index 1cfca549..fba86744 100644 --- a/pkg/tls/verifier/crl/crl.go +++ b/pkg/tls/verifier/crl/crl.go @@ -8,7 +8,6 @@ import ( "crypto/x509/pkix" "encoding/pem" "errors" - "fmt" "io" "net/http" "net/url" @@ -150,7 +149,6 @@ func (c *config) loadOfflineCRL() (*x509.RevocationList, error) { if len(offlineCRLBytes) == 0 { return nil, nil } - fmt.Println(c.OfflineCRLIssuerCertFile) issuer, err := c.loadOfflineCRLIssuerCert() if err != nil { return nil, err diff --git a/pkg/transport/path.go b/pkg/transport/path.go new file mode 100644 index 00000000..d94d66bd --- /dev/null +++ b/pkg/transport/path.go @@ -0,0 +1,13 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package transport + +import "strings" + +func AddSuffixSlash(path string) string { + if !strings.HasSuffix(path, "/") { + path += "/" + } + return path +} diff --git a/pkg/websockets/websockets.go b/pkg/websockets/websockets.go deleted file mode 100644 index 2b51b2b5..00000000 --- a/pkg/websockets/websockets.go +++ /dev/null @@ -1,126 +0,0 @@ -// Copyright (c) Abstract Machines -// SPDX-License-Identifier: Apache-2.0 - -package websockets - -import ( - "context" - "errors" - "fmt" - "log/slog" - "net/http" - - "github.com/absmach/mgate/pkg/session" - "github.com/gorilla/websocket" - "golang.org/x/sync/errgroup" -) - -var ( - upgrader = websocket.Upgrader{} - ErrAuthorizationNotSet = errors.New("authorization not set") -) - -type Proxy struct { - target string - address string - event session.Handler - logger *slog.Logger -} - -func (p *Proxy) Handler(w http.ResponseWriter, r *http.Request) { - var token string - headers := http.Header{} - switch { - case len(r.URL.Query()["authorization"]) != 0: - token = r.URL.Query()["authorization"][0] - case r.Header.Get("Authorization") != "": - token = r.Header.Get("Authorization") - headers.Add("Authorization", token) - default: - http.Error(w, ErrAuthorizationNotSet.Error(), http.StatusUnauthorized) - return - } - - target := fmt.Sprintf("%s%s", p.target, r.RequestURI) - - targetConn, _, err := websocket.DefaultDialer.Dial(target, headers) - if err != nil { - http.Error(w, err.Error(), http.StatusBadGateway) - return - } - defer targetConn.Close() - - topic := r.URL.Path - s := session.Session{Password: []byte(token)} - ctx := session.NewContext(context.Background(), &s) - if err := p.event.AuthConnect(ctx); err != nil { - http.Error(w, err.Error(), http.StatusUnauthorized) - return - } - if err := p.event.AuthSubscribe(ctx, &[]string{topic}); err != nil { - http.Error(w, err.Error(), http.StatusUnauthorized) - return - } - if err := p.event.Subscribe(ctx, &[]string{topic}); err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - inConn, err := upgrader.Upgrade(w, r, nil) - if err != nil { - p.logger.Warn("WS Proxy failed to upgrade connection", slog.Any("error", err)) - return - } - defer inConn.Close() - - g, ctx := errgroup.WithContext(ctx) - - g.Go(func() error { - return p.stream(ctx, topic, inConn, targetConn, true) - }) - g.Go(func() error { - return p.stream(ctx, topic, targetConn, inConn, false) - }) - - if err := g.Wait(); err != nil { - if err := p.event.Unsubscribe(ctx, &[]string{topic}); err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - p.logger.Error("WS Proxy terminated", slog.Any("error", err)) - return - } -} - -func (p *Proxy) stream(ctx context.Context, topic string, src, dest *websocket.Conn, upstream bool) error { - for { - messageType, payload, err := src.ReadMessage() - if err != nil { - return err - } - if upstream { - if err := p.event.AuthPublish(ctx, &topic, &payload); err != nil { - return err - } - if err := p.event.Publish(ctx, &topic, &payload); err != nil { - return err - } - } - if err := dest.WriteMessage(messageType, payload); err != nil { - return err - } - } -} - -func NewProxy(address, target string, logger *slog.Logger, handler session.Handler) (*Proxy, error) { - return &Proxy{target: target, address: address, logger: logger, event: handler}, nil -} - -// Listen - listen withrout tls. -func (p *Proxy) Listen() error { - return http.ListenAndServe(p.address, http.HandlerFunc(p.Handler)) -} - -// ListenTLS - version of Listen with TLS encryption. -func (p Proxy) ListenTLS(crt, key string) error { - return http.ListenAndServeTLS(p.address, crt, key, http.HandlerFunc(p.Handler)) -}