diff --git a/docs/howto/config_db_bootstrap.md b/docs/howto/config_db_bootstrap.md index 829f62c56d..f349e1ae6f 100644 --- a/docs/howto/config_db_bootstrap.md +++ b/docs/howto/config_db_bootstrap.md @@ -50,6 +50,12 @@ Or you can use the `config init` command with the `--metrics` flag, since metric pgwatch --metrics=postgresql://pgwatch:pgwatchadmin@localhost/pgwatch config init ``` +If you're using a PostgreSQL sink for storing measurements, you can also initialize the sink database schema: + +```terminal +pgwatch --sink=postgresql://pgwatch:pgwatchadmin@localhost/measurements config init +``` + ## Usage You can now configure pgwatch to use the `pgwatch` database as the configuration database for storing monitored sources, diff --git a/docs/reference/cli_env.md b/docs/reference/cli_env.md index 6c00d8c4c0..5529ec609a 100644 --- a/docs/reference/cli_env.md +++ b/docs/reference/cli_env.md @@ -8,7 +8,7 @@ title: Command-Line Options & Environment Variables pgwatch [OPTIONS] [config | metric | source] ``` -When no command is specified, pgwatch starts the monitoring process. +When no command is specified, pgwatch starts the monitoring process. It reads the configuration from the specified sources and metrics, then begins collecting measurements from the resolved databases. ## Options @@ -194,16 +194,19 @@ It reads the configuration from the specified sources and metrics, then begins c ``` !!! info - To use `config` command, you need to specify the `-s`, `--sources` and\or `-m`, `--metrics` options. + To use `config` command, you need to specify at least one of the following: `-s`, `--sources`, `-m`, `--metrics`, or `--sink` options. - `init` - Initialize the configuration database with the required tables and functions. If file is used, it will - be created in the specified location and filled with built-in defaults. + Initialize the configuration and/or sink database with the required tables and functions. If file is used, it will + be created in the specified location and filled with built-in defaults. Works with PostgreSQL-based sources, + metrics, and sink databases. - `upgrade` - Upgrade the database to the latest version. File or folder based configurations are not supported yet. + Upgrade the configuration and/or sink database to the latest version by applying all pending migrations. + File or folder based configurations are not supported. The command will automatically detect which + databases (sources, metrics, sinks) require migrations and apply them. ### Manage metrics diff --git a/docs/tutorial/upgrading.md b/docs/tutorial/upgrading.md index cc37f0a2ce..e29c57de09 100644 --- a/docs/tutorial/upgrading.md +++ b/docs/tutorial/upgrading.md @@ -89,8 +89,8 @@ There are no update or migration scripts for the built-in Grafana dashboards as it would break possible user applied changes. If you know that there are no user changes, then one can just delete or rename the existing ones in bulk and import the latest JSON definitions. -See [here](../concept/long_term_installations.md) for -some more advice on how to manage dashboards. +See [some more advice](../concept/long_term_installations.md) on how to +manage dashboards. ### Updating the config / metrics DB version @@ -105,31 +105,38 @@ problem-free, consisting of running something like: sudo systemctl restart postgresql For PostgreSQL major version upgrades one should read through the -according release notes (e.g. -[here](https://www.postgresql.org/docs/17/release-17.html#id-1.11.6.5.4)) +according [release notes](https://www.postgresql.org/docs/current/release.html) and be prepared for the unavoidable downtime. ### Updating the pgwatch schema This is the pgwatch specific part, with some coupling between the -following components - Configuration DB SQL schema and pgwatch binary. +following components - Configuration DB SQL schema, +Sink DB SQL schema (if using PostgreSQL sink), and pgwatch binary. First of all, the pgwatch binary needs to be updated to a newer version. Then try to run the pgwatch as usual: pgwatch --sources=postgresql://pgwatch:pgwatchadmin@localhost/pgwatch --sink=postgresql://pgwatch:pgwatchadmin@localhost/pgwatch_metrics - [ERROR] configuration needs upgrade, use "config upgrade" command + [ERROR] config database schema is outdated, please run migrations using `pgwatch config upgrade` command exit status 4 If you see the above error message, then the pgwatch schema needs updating. This is done by running the following command, which will apply all -the necessary SQL migrations to the configuration database: +the necessary SQL migrations to the configuration database and sink database: - pgwatch --sources=postgresql://pgwatch:pgwatchadmin@localhost/pgwatch config upgrade + pgwatch --sources=postgresql://pgwatch:pgwatchadmin@localhost/pgwatch --sink=postgresql://pgwatch:pgwatchadmin@localhost/pgwatch_metrics config upgrade + [INFO] Applying migration to config database... [INFO] Applying migration named '00824 Refactor recommendations'... [INFO] Applied migration named '00824 Refactor recommendations' + [INFO] Applying migration to sink database... + [INFO] All migrations applied successfully + +!!! info + The `config upgrade` command will automatically detect which databases (sources, metrics, sinks) need migrations and apply them. + You only need to provide the connection strings for the databases you're using. ### Updating the metrics collector diff --git a/go.mod b/go.mod index a432200ccc..524776d5d8 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/cybertec-postgresql/pgwatch/v3 go 1.25.0 require ( - github.com/cybertec-postgresql/pgx-migrator v1.2.0 + github.com/cybertec-postgresql/pgx-migrator v1.3.0 github.com/golang-jwt/jwt/v5 v5.3.0 github.com/gorilla/websocket v1.5.3 github.com/jackc/pgx/v5 v5.8.0 @@ -54,15 +54,15 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/google/uuid v1.6.0 // indirect - github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.4 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect - github.com/klauspost/compress v1.18.1 // indirect + github.com/klauspost/compress v1.18.2 // indirect github.com/lufia/plan9stats v0.0.0-20251013123823-9fd1530e3ec3 // indirect github.com/magiconair/properties v1.8.10 // indirect github.com/moby/docker-image-spec v1.3.1 // indirect - github.com/moby/go-archive v0.1.0 // indirect + github.com/moby/go-archive v0.2.0 // indirect github.com/moby/patternmatcher v0.6.0 // indirect github.com/moby/sys/sequential v0.6.0 // indirect github.com/moby/sys/user v0.4.0 // indirect @@ -70,7 +70,7 @@ require ( github.com/moby/term v0.5.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect - github.com/morikuni/aec v1.0.0 // indirect + github.com/morikuni/aec v1.1.0 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.1 // indirect @@ -86,20 +86,20 @@ require ( go.etcd.io/etcd/api/v3 v3.6.7 // indirect go.etcd.io/etcd/client/pkg/v3 v3.6.7 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect - go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.63.0 // indirect - go.opentelemetry.io/otel v1.38.0 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.64.0 // indirect + go.opentelemetry.io/otel v1.39.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.37.0 // indirect - go.opentelemetry.io/otel/metric v1.38.0 // indirect - go.opentelemetry.io/otel/trace v1.38.0 // indirect + go.opentelemetry.io/otel/metric v1.39.0 // indirect + go.opentelemetry.io/otel/trace v1.39.0 // indirect go.opentelemetry.io/proto/otlp v1.7.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.yaml.in/yaml/v2 v2.4.3 // indirect - golang.org/x/crypto v0.45.0 // indirect - golang.org/x/net v0.47.0 // indirect - golang.org/x/sync v0.18.0 // indirect - golang.org/x/sys v0.38.0 // indirect - golang.org/x/text v0.31.0 // indirect + golang.org/x/crypto v0.46.0 // indirect + golang.org/x/net v0.48.0 // indirect + golang.org/x/sync v0.19.0 // indirect + golang.org/x/sys v0.39.0 // indirect + golang.org/x/text v0.32.0 // indirect golang.org/x/time v0.12.0 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20251111163417-95abcf5c77ba // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20251111163417-95abcf5c77ba // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20251222181119-0a764e51fe1b // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20251222181119-0a764e51fe1b // indirect ) diff --git a/go.sum b/go.sum index b429710517..511338c095 100644 --- a/go.sum +++ b/go.sum @@ -30,6 +30,8 @@ github.com/creack/pty v1.1.18 h1:n56/Zwd5o6whRC5PMGretI4IdRLlmBXYNjScPaBgsbY= github.com/creack/pty v1.1.18/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4= github.com/cybertec-postgresql/pgx-migrator v1.2.0 h1:e96gr058i/yCoJZCXGUUZ7cRD+d9O7ttUygZlFzFrlE= github.com/cybertec-postgresql/pgx-migrator v1.2.0/go.mod h1:g9qBzWOnxlgFa0JW5ujWfWgRhko4YBk03w/QIxuFJ1Q= +github.com/cybertec-postgresql/pgx-migrator v1.3.0 h1:aE5LzhhD77BjssMFBSPGurJvXStcg+sEIEA+NABvpA4= +github.com/cybertec-postgresql/pgx-migrator v1.3.0/go.mod h1:P4uJtUAYFun+UQ2+XA7VIU66taJt+OBAXBE9C3FaULs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= @@ -69,6 +71,8 @@ github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aN github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3 h1:NmZ1PKzSTQbuGHw9DGPFomqkkLWMC+vZCkfs+FHv1Vg= github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3/go.mod h1:zQrxl1YP88HQlA6i9c63DSVPFklWpGX4OWAc9bFuaH4= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.4 h1:kEISI/Gx67NzH3nJxAmY/dGac80kKZgZt134u7Y/k1s= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.4/go.mod h1:6Nz966r3vQYCqIzWsuEl9d7cf7mRhtDmm++sOxlnfxI= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= @@ -85,6 +89,8 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co= github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0= +github.com/klauspost/compress v1.18.2 h1:iiPHWW0YrcFgpBYhsA6D1+fqHssJscY/Tm/y2Uqnapk= +github.com/klauspost/compress v1.18.2/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -103,6 +109,8 @@ github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3N github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo= github.com/moby/go-archive v0.1.0 h1:Kk/5rdW/g+H8NHdJW2gsXyZ7UnzvJNOy6VKJqueWdcQ= github.com/moby/go-archive v0.1.0/go.mod h1:G9B+YoujNohJmrIYFBpSd54GTUB4lt9S+xVQvsJyFuo= +github.com/moby/go-archive v0.2.0 h1:zg5QDUM2mi0JIM9fdQZWC7U8+2ZfixfTYoHL7rWUcP8= +github.com/moby/go-archive v0.2.0/go.mod h1:mNeivT14o8xU+5q1YnNrkQVpK+dnNe/K6fHqnTg4qPU= github.com/moby/patternmatcher v0.6.0 h1:GmP9lR19aU5GqSSFko+5pRqHi+Ohk1O69aFiKkVGiPk= github.com/moby/patternmatcher v0.6.0/go.mod h1:hDPoyOpDY7OrrMDLaYoY3hf52gNCR/YOUYxkhApJIxc= github.com/moby/sys/atomicwriter v0.1.0 h1:kw5D/EqkBwsBFi0ss9v1VG3wIkVhzGvLklJ+w3A14Sw= @@ -122,6 +130,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= +github.com/morikuni/aec v1.1.0 h1:vBBl0pUnvi/Je71dsRrhMBtreIqNMYErSAbEeb8jrXQ= +github.com/morikuni/aec v1.1.0/go.mod h1:xDRgiq/iw5l+zkao76YTKzKttOp2cwPEne25HDkJnBw= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= @@ -188,20 +198,30 @@ go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.63.0 h1:RbKq8BG0FI8OiXhBfcRtqqHcZcka+gU3cskNuf05R18= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.63.0/go.mod h1:h06DGIukJOevXaj/xrNjhi/2098RZzcLTbc0jDAUbsg= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.64.0 h1:ssfIgGNANqpVFCndZvcuyKbl0g+UAVcbBcqGkG28H0Y= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.64.0/go.mod h1:GQ/474YrbE4Jx8gZ4q5I4hrhUzM6UPzyrqJYV2AqPoQ= go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8= go.opentelemetry.io/otel v1.38.0/go.mod h1:zcmtmQ1+YmQM9wrNsTGV/q/uyusom3P8RxwExxkZhjM= +go.opentelemetry.io/otel v1.39.0 h1:8yPrr/S0ND9QEfTfdP9V+SiwT4E0G7Y5MO7p85nis48= +go.opentelemetry.io/otel v1.39.0/go.mod h1:kLlFTywNWrFyEdH0oj2xK0bFYZtHRYUdv1NklR/tgc8= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.37.0 h1:Ahq7pZmv87yiyn3jeFz/LekZmPLLdKejuO3NcK9MssM= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.37.0/go.mod h1:MJTqhM0im3mRLw1i8uGHnCvUEeS7VwRyxlLC78PA18M= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0 h1:IeMeyr1aBvBiPVYihXIaeIZba6b8E1bYp7lbdxK8CQg= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0/go.mod h1:oVdCUtjq9MK9BlS7TtucsQwUcXcymNiEDjgDD2jMtZU= go.opentelemetry.io/otel/metric v1.38.0 h1:Kl6lzIYGAh5M159u9NgiRkmoMKjvbsKtYRwgfrA6WpA= go.opentelemetry.io/otel/metric v1.38.0/go.mod h1:kB5n/QoRM8YwmUahxvI3bO34eVtQf2i4utNVLr9gEmI= +go.opentelemetry.io/otel/metric v1.39.0 h1:d1UzonvEZriVfpNKEVmHXbdf909uGTOQjA0HF0Ls5Q0= +go.opentelemetry.io/otel/metric v1.39.0/go.mod h1:jrZSWL33sD7bBxg1xjrqyDjnuzTUB0x1nBERXd7Ftcs= go.opentelemetry.io/otel/sdk v1.38.0 h1:l48sr5YbNf2hpCUj/FoGhW9yDkl+Ma+LrVl8qaM5b+E= go.opentelemetry.io/otel/sdk v1.38.0/go.mod h1:ghmNdGlVemJI3+ZB5iDEuk4bWA3GkTpW+DOoZMYBVVg= +go.opentelemetry.io/otel/sdk v1.39.0 h1:nMLYcjVsvdui1B/4FRkwjzoRVsMK8uL/cj0OyhKzt18= go.opentelemetry.io/otel/sdk/metric v1.38.0 h1:aSH66iL0aZqo//xXzQLYozmWrXxyFkBJ6qT5wthqPoM= go.opentelemetry.io/otel/sdk/metric v1.38.0/go.mod h1:dg9PBnW9XdQ1Hd6ZnRz689CbtrUp0wMMs9iPcgT9EZA= +go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2WKg+sEJTtB8= go.opentelemetry.io/otel/trace v1.38.0 h1:Fxk5bKrDZJUH+AMyyIXGcFAPah0oRcT+LuNtJrmcNLE= go.opentelemetry.io/otel/trace v1.38.0/go.mod h1:j1P9ivuFsTceSWe1oY+EeW3sc+Pp42sO++GHkg4wwhs= +go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI= +go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= go.opentelemetry.io/proto/otlp v1.7.0 h1:jX1VolD6nHuFzOYso2E73H85i92Mv8JQYk0K9vz09os= go.opentelemetry.io/proto/otlp v1.7.0/go.mod h1:fSKjH6YJ7HDlwzltzyMj036AJ3ejJLCgCSHGj4efDDo= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= @@ -217,6 +237,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.45.0 h1:jMBrvKuj23MTlT0bQEOBcAE0mjg8mK9RXFhRH6nyF3Q= golang.org/x/crypto v0.45.0/go.mod h1:XTGrrkGJve7CYK7J8PEww4aY7gM3qMCElcJQ8n8JdX4= +golang.org/x/crypto v0.46.0 h1:cKRW/pmt1pKAfetfu+RCEvjvZkA9RimPbh7bhFjGVBU= +golang.org/x/crypto v0.46.0/go.mod h1:Evb/oLKmMraqjZ2iQTwDwvCtJkczlDuTmdJXoZVzqU0= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= @@ -225,11 +247,15 @@ golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY= golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU= +golang.org/x/net v0.48.0 h1:zyQRTTrjc33Lhh0fBgT/H3oZq9WuvRR5gPC70xpDiQU= +golang.org/x/net v0.48.0/go.mod h1:+ndRgGjkh8FGtu1w1FGbEC31if4VrNVMuKTgcAAnQRY= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I= golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= +golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -240,12 +266,17 @@ golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc= golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/sys v0.39.0 h1:CvCKL8MeisomCi6qNZ+wbb0DN9E5AATixKsvNtMoMFk= +golang.org/x/sys v0.39.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/term v0.37.0 h1:8EGAD0qCmHYZg6J17DvsMy9/wJ7/D/4pV/wfnld5lTU= golang.org/x/term v0.37.0/go.mod h1:5pB4lxRNYYVZuTLmy8oR2BH8dflOR+IbTYFD8fi3254= +golang.org/x/term v0.38.0 h1:PQ5pkm/rLO6HnxFR7N2lJHOZX6Kez5Y1gDSJla6jo7Q= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.31.0 h1:aC8ghyu4JhP8VojJ2lEHBnochRno1sgL6nEi9WGFGMM= golang.org/x/text v0.31.0/go.mod h1:tKRAlv61yKIjGGHX/4tP1LTbc13YSec1pxVEWXzfoeM= +golang.org/x/text v0.32.0 h1:ZD01bjUt1FQ9WJ0ClOL5vxgxOI/sVCNgX1YtKwcY0mU= +golang.org/x/text v0.32.0/go.mod h1:o/rUWzghvpD5TXrTIBuJU77MTaN0ljMWE47kxGJQ7jY= golang.org/x/time v0.12.0 h1:ScB/8o8olJvc+CQPWrK3fPZNfh7qgwCrY0zJmoEQLSE= golang.org/x/time v0.12.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -260,8 +291,12 @@ gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= google.golang.org/genproto/googleapis/api v0.0.0-20251111163417-95abcf5c77ba h1:B14OtaXuMaCQsl2deSvNkyPKIzq3BjfxQp8d00QyWx4= google.golang.org/genproto/googleapis/api v0.0.0-20251111163417-95abcf5c77ba/go.mod h1:G5IanEx8/PgI9w6CFcYQf7jMtHQhZruvfM1i3qOqk5U= +google.golang.org/genproto/googleapis/api v0.0.0-20251222181119-0a764e51fe1b h1:uA40e2M6fYRBf0+8uN5mLlqUtV192iiksiICIBkYJ1E= +google.golang.org/genproto/googleapis/api v0.0.0-20251222181119-0a764e51fe1b/go.mod h1:Xa7le7qx2vmqB/SzWUBa7KdMjpdpAHlh5QCSnjessQk= google.golang.org/genproto/googleapis/rpc v0.0.0-20251111163417-95abcf5c77ba h1:UKgtfRM7Yh93Sya0Fo8ZzhDP4qBckrrxEr2oF5UIVb8= google.golang.org/genproto/googleapis/rpc v0.0.0-20251111163417-95abcf5c77ba/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk= +google.golang.org/genproto/googleapis/rpc v0.0.0-20251222181119-0a764e51fe1b h1:Mv8VFug0MP9e5vUxfBcE3vUkV6CImK3cMNMIDFjmzxU= +google.golang.org/genproto/googleapis/rpc v0.0.0-20251222181119-0a764e51fe1b/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ= google.golang.org/grpc v1.78.0 h1:K1XZG/yGDJnzMdd/uZHAkVqJE+xIDOcmdSFZkBUicNc= google.golang.org/grpc v1.78.0/go.mod h1:I47qjTo4OKbMkjA/aOOwxDIiPSBofUtQUI5EfpWvW7U= google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= diff --git a/internal/cmdopts/cmdconfig.go b/internal/cmdopts/cmdconfig.go index 5a01845e50..e165de84a0 100644 --- a/internal/cmdopts/cmdconfig.go +++ b/internal/cmdopts/cmdconfig.go @@ -37,6 +37,9 @@ func (cmd *ConfigInitCommand) Execute([]string) (err error) { if cmd.owner.Sources.Sources > "" && cmd.owner.Metrics.Metrics != cmd.owner.Sources.Sources { err = errors.Join(err, cmd.InitSources()) } + if len(cmd.owner.Sinks.Sinks) > 0 { + err = errors.Join(err, cmd.InitSinks()) + } cmd.owner.CompleteCommand(map[bool]int32{ true: ExitCodeOK, false: ExitCodeConfigError, @@ -68,6 +71,13 @@ func (cmd *ConfigInitCommand) InitMetrics() (err error) { return opts.MetricsReaderWriter.WriteMetrics(defMetrics) } +// InitSinks initializes the sinks configuration. +func (cmd *ConfigInitCommand) InitSinks() (err error) { + ctx := context.Background() + opts := cmd.owner + return opts.InitSinkWriter(ctx) +} + type ConfigUpgradeCommand struct { owner *Options } @@ -78,22 +88,43 @@ func (cmd *ConfigUpgradeCommand) Execute([]string) (err error) { if err = opts.ValidateConfig(); err != nil { return } - // for now only postgres configuration is upgradable + ctx := context.Background() + // Upgrade metrics/sources configuration if it's postgres if opts.IsPgConnStr(opts.Metrics.Metrics) && opts.IsPgConnStr(opts.Sources.Sources) { - err = opts.InitMetricReader(context.Background()) + err = opts.InitMetricReader(ctx) if err != nil { opts.CompleteCommand(ExitCodeConfigError) return } if m, ok := opts.MetricsReaderWriter.(metrics.Migrator); ok { err = m.Migrate() - opts.CompleteCommand(map[bool]int32{ - true: ExitCodeOK, - false: ExitCodeConfigError, - }[err == nil]) + if err != nil { + opts.CompleteCommand(ExitCodeConfigError) + return + } + } + } else { + opts.CompleteCommand(ExitCodeConfigError) + return errors.New("configuration storage does not support upgrade") + } + // Upgrade sinks configuration if it's postgres + if len(opts.Sinks.Sinks) > 0 { + err = opts.InitSinkWriter(ctx) + if err != nil { + opts.CompleteCommand(ExitCodeConfigError) return } + if m, ok := opts.SinksWriter.(metrics.Migrator); ok { + err = m.Migrate() + if err != nil { + opts.CompleteCommand(ExitCodeConfigError) + return + } + } else { + opts.CompleteCommand(ExitCodeConfigError) + return errors.New("sink storage does not support upgrade") + } } - opts.CompleteCommand(ExitCodeConfigError) - return errors.New("configuration storage does not support upgrade") + opts.CompleteCommand(ExitCodeOK) + return } diff --git a/internal/cmdopts/cmdconfig_test.go b/internal/cmdopts/cmdconfig_test.go index 9d9f356818..902330df12 100644 --- a/internal/cmdopts/cmdconfig_test.go +++ b/internal/cmdopts/cmdconfig_test.go @@ -4,7 +4,11 @@ import ( "io" "os" "testing" + "time" + "github.com/cybertec-postgresql/pgwatch/v3/internal/metrics" + "github.com/cybertec-postgresql/pgwatch/v3/internal/sinks" + "github.com/cybertec-postgresql/pgwatch/v3/internal/sources" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -86,3 +90,237 @@ func TestConfigUpgradeCommand_Execute(t *testing.T) { }) } + +// Mock types for testing Migrator interface with proper interface implementations + +type mockMigratableSourcesReader struct { + migrateErr error + needsMigration bool + needsMigrationErr error +} + +func (m *mockMigratableSourcesReader) Migrate() error { return m.migrateErr } +func (m *mockMigratableSourcesReader) NeedsMigration() (bool, error) { + return m.needsMigration, m.needsMigrationErr +} +func (m *mockMigratableSourcesReader) GetSources() (sources.Sources, error) { + return sources.Sources{}, nil +} +func (m *mockMigratableSourcesReader) WriteSources(sources.Sources) error { return nil } +func (m *mockMigratableSourcesReader) DeleteSource(string) error { return nil } +func (m *mockMigratableSourcesReader) UpdateSource(sources.Source) error { return nil } +func (m *mockMigratableSourcesReader) CreateSource(sources.Source) error { return nil } + +type mockMigratableMetricsReader struct { + migrateErr error + needsMigration bool + needsMigrationErr error +} + +func (m *mockMigratableMetricsReader) Migrate() error { return m.migrateErr } +func (m *mockMigratableMetricsReader) NeedsMigration() (bool, error) { + return m.needsMigration, m.needsMigrationErr +} +func (m *mockMigratableMetricsReader) GetMetrics() (*metrics.Metrics, error) { + return &metrics.Metrics{}, nil +} +func (m *mockMigratableMetricsReader) WriteMetrics(*metrics.Metrics) error { return nil } +func (m *mockMigratableMetricsReader) DeleteMetric(string) error { return nil } +func (m *mockMigratableMetricsReader) UpdateMetric(string, metrics.Metric) error { return nil } +func (m *mockMigratableMetricsReader) CreateMetric(string, metrics.Metric) error { return nil } +func (m *mockMigratableMetricsReader) DeletePreset(string) error { return nil } +func (m *mockMigratableMetricsReader) UpdatePreset(string, metrics.Preset) error { return nil } +func (m *mockMigratableMetricsReader) CreatePreset(string, metrics.Preset) error { return nil } + +type mockMigratableSinksWriter struct { + migrateErr error + needsMigration bool + needsMigrationErr error +} + +func (m *mockMigratableSinksWriter) Migrate() error { return m.migrateErr } +func (m *mockMigratableSinksWriter) NeedsMigration() (bool, error) { + return m.needsMigration, m.needsMigrationErr +} +func (m *mockMigratableSinksWriter) SyncMetric(string, string, sinks.SyncOp) error { return nil } +func (m *mockMigratableSinksWriter) Write(metrics.MeasurementEnvelope) error { return nil } + +func TestNeedsSchemaUpgrade(t *testing.T) { + tests := []struct { + name string + setupMocks func(*Options) + expectUpgrade bool + expectError bool + }{ + { + name: "sources needs migration", + setupMocks: func(opts *Options) { + opts.SourcesReaderWriter = &mockMigratableSourcesReader{needsMigration: true} + }, + expectUpgrade: true, + expectError: false, + }, + { + name: "metrics needs migration", + setupMocks: func(opts *Options) { + opts.SourcesReaderWriter = &mockMigratableSourcesReader{needsMigration: false} + opts.MetricsReaderWriter = &mockMigratableMetricsReader{needsMigration: true} + }, + expectUpgrade: true, + expectError: false, + }, + { + name: "sinks needs migration", + setupMocks: func(opts *Options) { + opts.SourcesReaderWriter = &mockMigratableSourcesReader{needsMigration: false} + opts.MetricsReaderWriter = &mockMigratableMetricsReader{needsMigration: false} + opts.SinksWriter = &mockMigratableSinksWriter{needsMigration: true} + }, + expectUpgrade: true, + expectError: false, + }, + { + name: "no migration needed", + setupMocks: func(opts *Options) { + opts.SourcesReaderWriter = &mockMigratableSourcesReader{needsMigration: false} + opts.MetricsReaderWriter = &mockMigratableMetricsReader{needsMigration: false} + opts.SinksWriter = &mockMigratableSinksWriter{needsMigration: false} + }, + expectUpgrade: false, + expectError: false, + }, + { + name: "error checking sources migration", + setupMocks: func(opts *Options) { + opts.SourcesReaderWriter = &mockMigratableSourcesReader{needsMigrationErr: assert.AnError} + }, + expectUpgrade: false, + expectError: true, + }, + { + name: "error checking metrics migration", + setupMocks: func(opts *Options) { + opts.SourcesReaderWriter = &mockMigratableSourcesReader{needsMigration: false} + opts.MetricsReaderWriter = &mockMigratableMetricsReader{needsMigrationErr: assert.AnError} + }, + expectUpgrade: false, + expectError: true, + }, + { + name: "error checking sinks migration", + setupMocks: func(opts *Options) { + opts.SourcesReaderWriter = &mockMigratableSourcesReader{needsMigration: false} + opts.MetricsReaderWriter = &mockMigratableMetricsReader{needsMigration: false} + opts.SinksWriter = &mockMigratableSinksWriter{needsMigrationErr: assert.AnError} + }, + expectUpgrade: false, + expectError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + opts := &Options{} + if tt.setupMocks != nil { + tt.setupMocks(opts) + } + + upgrade, err := opts.NeedsSchemaUpgrade() + + assert.Equal(t, tt.expectUpgrade, upgrade) + if tt.expectError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestConfigInitCommand_InitSources(t *testing.T) { + a := assert.New(t) + + t.Run("yaml file creation", func(*testing.T) { + fname := t.TempDir() + "/sources.yaml" + opts := &Options{ + Sources: sources.CmdOpts{Sources: fname}, + } + cmd := ConfigInitCommand{owner: opts} + err := cmd.InitSources() + a.NoError(err) + a.FileExists(fname) + }) + + t.Run("postgres connection - error without setup", func(*testing.T) { + opts := &Options{ + Sources: sources.CmdOpts{Sources: "postgresql://user@host/db"}, + } + cmd := ConfigInitCommand{owner: opts} + err := cmd.InitSources() + a.Error(err) + }) +} + +func TestConfigInitCommand_InitMetrics(t *testing.T) { + a := assert.New(t) + + t.Run("yaml file creation with default metrics", func(*testing.T) { + fname := t.TempDir() + "/metrics.yaml" + opts := &Options{ + Metrics: metrics.CmdOpts{Metrics: fname}, + } + cmd := ConfigInitCommand{owner: opts} + err := cmd.InitMetrics() + a.NoError(err) + a.FileExists(fname) + }) + + t.Run("postgres connection - error without setup", func(*testing.T) { + opts := &Options{ + Metrics: metrics.CmdOpts{Metrics: "postgresql://user@host/db"}, + } + cmd := ConfigInitCommand{owner: opts} + err := cmd.InitMetrics() + a.Error(err) + }) +} + +func TestConfigInitCommand_InitSinks(t *testing.T) { + a := assert.New(t) + + t.Run("postgres connection - error without setup", func(*testing.T) { + opts := &Options{ + Sinks: sinks.CmdOpts{Sinks: []string{"postgresql://user@host/db"}}, + } + cmd := ConfigInitCommand{owner: opts} + err := cmd.InitSinks() + a.Error(err) + }) +} + +func TestConfigUpgradeCommand_Errors(t *testing.T) { + a := assert.New(t) + + t.Run("non-postgres configuration not supported", func(*testing.T) { + opts := &Options{ + Metrics: metrics.CmdOpts{Metrics: "/tmp/metrics.yaml"}, + Sources: sources.CmdOpts{Sources: "/tmp/sources.yaml", Refresh: 120, MaxParallelConnectionsPerDb: 1}, + Sinks: sinks.CmdOpts{BatchingDelay: time.Second}, + } + cmd := ConfigUpgradeCommand{owner: opts} + err := cmd.Execute(nil) + a.Error(err) + a.ErrorContains(err, "does not support upgrade") + }) + + t.Run("init metrics reader fails", func(*testing.T) { + opts := &Options{ + Metrics: metrics.CmdOpts{Metrics: "postgresql://invalid@host/db"}, + Sources: sources.CmdOpts{Sources: "postgresql://invalid@host/db", Refresh: 120, MaxParallelConnectionsPerDb: 1}, + Sinks: sinks.CmdOpts{BatchingDelay: time.Second}, + } + cmd := ConfigUpgradeCommand{owner: opts} + err := cmd.Execute(nil) + a.Error(err) + }) +} diff --git a/internal/cmdopts/cmdoptions.go b/internal/cmdopts/cmdoptions.go index c06dc6b821..393d2f8761 100644 --- a/internal/cmdopts/cmdoptions.go +++ b/internal/cmdopts/cmdoptions.go @@ -174,6 +174,12 @@ func (c *Options) NeedsSchemaUpgrade() (upgrade bool, err error) { return } if m, ok := c.MetricsReaderWriter.(metrics.Migrator); ok { + upgrade, err = m.NeedsMigration() + } + if upgrade || err != nil { + return + } + if m, ok := c.SinksWriter.(metrics.Migrator); ok { return m.NeedsMigration() } return diff --git a/internal/metrics/postgres.go b/internal/metrics/postgres.go index 32054cf256..90872a96be 100644 --- a/internal/metrics/postgres.go +++ b/internal/metrics/postgres.go @@ -21,10 +21,17 @@ func NewPostgresMetricReaderWriterConn(ctx context.Context, conn db.PgxPoolIface if err := initSchema(ctx, conn); err != nil { return nil, err } - return &dbMetricReaderWriter{ + dmrw := &dbMetricReaderWriter{ ctx: ctx, configDb: conn, - }, conn.Ping(ctx) + } + // Check if migrations are needed + if needsMigration, err := dmrw.NeedsMigration(); err != nil { + return nil, err + } else if needsMigration { + return nil, ErrNeedsMigration + } + return dmrw, conn.Ping(ctx) } type dbMetricReaderWriter struct { @@ -33,6 +40,7 @@ type dbMetricReaderWriter struct { } var ( + ErrNeedsMigration = errors.New("config database schema is outdated, please run migrations using `pgwatch config upgrade` command") ErrMetricNotFound = errors.New("metric not found") ErrPresetNotFound = errors.New("preset not found") ErrInvalidMetric = errors.New("invalid metric") diff --git a/internal/metrics/postgres_schema.go b/internal/metrics/postgres_schema.go index a50a788abe..d608c5da31 100644 --- a/internal/metrics/postgres_schema.go +++ b/internal/metrics/postgres_schema.go @@ -64,6 +64,9 @@ func (dmrw *dbMetricReaderWriter) NeedsMigration() (bool, error) { return m.NeedUpgrade(dmrw.ctx, dmrw.configDb) } +// MigrationsCount is the total number of migrations in pgwatch.migration table +const MigrationsCount = 2 + // migrations holds function returning all updgrade migrations needed var migrations func() migrator.Option = func() migrator.Option { return migrator.Migrations( diff --git a/internal/metrics/postgres_test.go b/internal/metrics/postgres_test.go index c06753cc9b..3d3d3ce57a 100644 --- a/internal/metrics/postgres_test.go +++ b/internal/metrics/postgres_test.go @@ -55,6 +55,9 @@ func TestNewPostgresMetricReaderWriterConn(t *testing.T) { conn.ExpectExec(`INSERT.+preset`).WithArgs(AnyArgs(3)...).WillReturnResult(pgxmock.NewResult("INSERT", 1)).Times(uint(presetsCount)) conn.ExpectCommit() conn.ExpectCommit() + // Expect migration check + conn.ExpectQuery(`SELECT to_regclass`).WithArgs("pgwatch.migration").WillReturnRows(pgxmock.NewRows([]string{"to_regclass"}).AddRow(true)) + conn.ExpectQuery(`SELECT count`).WillReturnRows(pgxmock.NewRows([]string{"count"}).AddRow(metrics.MigrationsCount)) conn.ExpectPing() readerWriter, err := metrics.NewPostgresMetricReaderWriterConn(ctx, conn) @@ -159,6 +162,27 @@ func TestNewPostgresMetricReaderWriterConn(t *testing.T) { a.Nil(rw) a.NoError(conn.ExpectationsWereMet()) }) + + t.Run("MigrationCheckFail", func(*testing.T) { + conn.ExpectQuery(`SELECT EXISTS`).WithArgs("pgwatch").WillReturnRows(pgxmock.NewRows([]string{"exists"}).AddRow(true)) + conn.ExpectQuery(`SELECT to_regclass`).WithArgs("pgwatch.migration").WillReturnError(assert.AnError) + rw, err := metrics.NewPostgresMetricReaderWriterConn(ctx, conn) + a.Error(err) + a.Nil(rw) + a.NoError(conn.ExpectationsWereMet()) + }) + + t.Run("MigrationNeeded", func(*testing.T) { + conn.ExpectQuery(`SELECT EXISTS`).WithArgs("pgwatch").WillReturnRows(pgxmock.NewRows([]string{"exists"}).AddRow(true)) + conn.ExpectQuery(`SELECT to_regclass`).WithArgs("pgwatch.migration").WillReturnRows(pgxmock.NewRows([]string{"to_regclass"}).AddRow(true)) + conn.ExpectQuery(`SELECT count`).WillReturnRows(pgxmock.NewRows([]string{"count"}).AddRow(metrics.MigrationsCount - 1)) + rw, err := metrics.NewPostgresMetricReaderWriterConn(ctx, conn) + a.Error(err) + a.ErrorContains(err, "config database schema is outdated") + a.ErrorContains(err, "pgwatch config upgrade") + a.Nil(rw) + a.NoError(conn.ExpectationsWereMet()) + }) } func TestMetricsToPostgres(t *testing.T) { @@ -167,6 +191,9 @@ func TestMetricsToPostgres(t *testing.T) { a.NoError(err) conn.ExpectQuery(`SELECT EXISTS`).WithArgs("pgwatch").WillReturnRows(pgxmock.NewRows([]string{"exists"}).AddRow(true)) + // Expect migration check + conn.ExpectQuery(`SELECT to_regclass`).WithArgs("pgwatch.migration").WillReturnRows(pgxmock.NewRows([]string{"to_regclass"}).AddRow(true)) + conn.ExpectQuery(`SELECT count`).WillReturnRows(pgxmock.NewRows([]string{"count"}).AddRow(metrics.MigrationsCount)) conn.ExpectPing() readerWriter, err := metrics.NewPostgresMetricReaderWriterConn(ctx, conn) diff --git a/internal/sinks/json_test.go b/internal/sinks/json_test.go index 7602f6905d..c345088cad 100644 --- a/internal/sinks/json_test.go +++ b/internal/sinks/json_test.go @@ -1,4 +1,4 @@ -package sinks +package sinks_test import ( "context" @@ -8,6 +8,8 @@ import ( jsoniter "github.com/json-iterator/go" "github.com/cybertec-postgresql/pgwatch/v3/internal/metrics" + "github.com/cybertec-postgresql/pgwatch/v3/internal/sinks" + "github.com/cybertec-postgresql/pgwatch/v3/internal/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -26,8 +28,8 @@ func TestJSONWriter_Write(t *testing.T) { } tempFile := t.TempDir() + "/test.json" - ctx, cancel := context.WithCancel(context.Background()) - jw, err := NewJSONWriter(ctx, tempFile) + ctx, cancel := context.WithCancel(testutil.TestContext) + jw, err := sinks.NewJSONWriter(ctx, tempFile) r.NoError(err) err = jw.Write(msg) @@ -55,16 +57,16 @@ func TestJSONWriter_SyncMetric(t *testing.T) { // Create a temporary file for testing tempFile := t.TempDir() + "/test.json" - ctx, cancel := context.WithCancel(context.Background()) - jw, err := NewJSONWriter(ctx, tempFile) + ctx, cancel := context.WithCancel(testutil.TestContext) + jw, err := sinks.NewJSONWriter(ctx, tempFile) assert.NoError(t, err) // Call the function being tested - err = jw.SyncMetric("", "", InvalidOp) + err = jw.SyncMetric("", "", sinks.InvalidOp) assert.NoError(t, err) cancel() - err = jw.SyncMetric("", "", InvalidOp) + err = jw.SyncMetric("", "", sinks.InvalidOp) assert.Error(t, err, "context canceled") } diff --git a/internal/sinks/multiwriter.go b/internal/sinks/multiwriter.go index 8676cde52e..1de96c0fed 100644 --- a/internal/sinks/multiwriter.go +++ b/internal/sinks/multiwriter.go @@ -61,6 +61,10 @@ func NewSinkWriter(ctx context.Context, opts *CmdOpts) (w Writer, err error) { return mw, nil } +func (mw *MultiWriter) Count() int { + return len(mw.writers) +} + func (mw *MultiWriter) AddWriter(w Writer) { mw.Lock() mw.writers = append(mw.writers, w) @@ -89,3 +93,33 @@ func (mw *MultiWriter) Write(msg metrics.MeasurementEnvelope) (err error) { } return } + +// Migrator interface implementation for MultiWriter + +// Migrate runs migrations on all writers that support it +func (mw *MultiWriter) Migrate() (err error) { + for _, w := range mw.writers { + if m, ok := w.(interface { + Migrate() error + }); ok { + err = errors.Join(err, m.Migrate()) + } + } + return +} + +// NeedsMigration checks if any writer needs migration +func (mw *MultiWriter) NeedsMigration() (bool, error) { + for _, w := range mw.writers { + if m, ok := w.(interface { + NeedsMigration() (bool, error) + }); ok { + if needs, err := m.NeedsMigration(); err != nil { + return false, err + } else if needs { + return true, nil + } + } + } + return false, nil +} diff --git a/internal/sinks/multiwriter_test.go b/internal/sinks/multiwriter_test.go index 3089f5e10d..66dee55310 100644 --- a/internal/sinks/multiwriter_test.go +++ b/internal/sinks/multiwriter_test.go @@ -1,16 +1,17 @@ -package sinks +package sinks_test import ( - "context" "testing" "github.com/cybertec-postgresql/pgwatch/v3/internal/metrics" + "github.com/cybertec-postgresql/pgwatch/v3/internal/sinks" + "github.com/cybertec-postgresql/pgwatch/v3/internal/testutil" "github.com/stretchr/testify/assert" ) type MockWriter struct{} -func (mw *MockWriter) SyncMetric(_, _ string, _ SyncOp) error { +func (mw *MockWriter) SyncMetric(_, _ string, _ sinks.SyncOp) error { return nil } @@ -20,36 +21,36 @@ func (mw *MockWriter) Write(_ metrics.MeasurementEnvelope) error { func TestNewMultiWriter(t *testing.T) { input := []struct { - opts *CmdOpts + opts *sinks.CmdOpts w bool // Writer returned err bool // error returned }{ - {&CmdOpts{}, false, true}, - {&CmdOpts{ + {&sinks.CmdOpts{}, false, true}, + {&sinks.CmdOpts{ Sinks: []string{"foo"}, }, false, true}, - {&CmdOpts{ + {&sinks.CmdOpts{ Sinks: []string{"jsonfile://test.json"}, }, true, false}, - {&CmdOpts{ + {&sinks.CmdOpts{ Sinks: []string{"jsonfile://test.json", "jsonfile://test1.json"}, }, true, false}, - {&CmdOpts{ + {&sinks.CmdOpts{ Sinks: []string{"prometheus://foo/"}, }, false, true}, - {&CmdOpts{ + {&sinks.CmdOpts{ Sinks: []string{"rpc://foo/"}, }, false, true}, - {&CmdOpts{ + {&sinks.CmdOpts{ Sinks: []string{"postgresql:///baz"}, }, false, true}, - {&CmdOpts{ + {&sinks.CmdOpts{ Sinks: []string{"foo:///"}, }, false, true}, } for _, i := range input { - mw, err := NewSinkWriter(context.Background(), i.opts) + mw, err := sinks.NewSinkWriter(testutil.TestContext, i.opts) if i.err { assert.Error(t, err) } else { @@ -64,24 +65,192 @@ func TestNewMultiWriter(t *testing.T) { } func TestAddWriter(t *testing.T) { - mw := &MultiWriter{} + mw := &sinks.MultiWriter{} mockWriter := &MockWriter{} mw.AddWriter(mockWriter) - assert.Equal(t, 1, len(mw.writers)) + assert.Equal(t, 1, mw.Count()) } func TestSyncMetrics(t *testing.T) { - mw := &MultiWriter{} + mw := &sinks.MultiWriter{} mockWriter := &MockWriter{} mw.AddWriter(mockWriter) - err := mw.SyncMetric("db", "metric", InvalidOp) + err := mw.SyncMetric("db", "metric", sinks.InvalidOp) assert.NoError(t, err) } func TestWriteMeasurements(t *testing.T) { - mw := &MultiWriter{} + mw := &sinks.MultiWriter{} mockWriter := &MockWriter{} mw.AddWriter(mockWriter) err := mw.Write(metrics.MeasurementEnvelope{}) assert.NoError(t, err) } + +// mockMigratableWriter implements Writer and Migrator interfaces +type mockMigratableWriter struct { + migrateErr error + needsMigration bool + needsMigrationErr error +} + +func (m *mockMigratableWriter) SyncMetric(string, string, sinks.SyncOp) error { + return nil +} + +func (m *mockMigratableWriter) Write(metrics.MeasurementEnvelope) error { + return nil +} + +func (m *mockMigratableWriter) Migrate() error { + return m.migrateErr +} + +func (m *mockMigratableWriter) NeedsMigration() (bool, error) { + return m.needsMigration, m.needsMigrationErr +} + +func TestMultiWriterMigrate(t *testing.T) { + tests := []struct { + name string + writers []sinks.Writer + expectError bool + }{ + { + name: "no migratable writers", + writers: []sinks.Writer{ + &MockWriter{}, + }, + expectError: false, + }, + { + name: "single migratable writer success", + writers: []sinks.Writer{ + &mockMigratableWriter{}, + }, + expectError: false, + }, + { + name: "single migratable writer error", + writers: []sinks.Writer{ + &mockMigratableWriter{migrateErr: assert.AnError}, + }, + expectError: true, + }, + { + name: "multiple migratable writers success", + writers: []sinks.Writer{ + &mockMigratableWriter{}, + &mockMigratableWriter{}, + }, + expectError: false, + }, + { + name: "multiple writers with one error", + writers: []sinks.Writer{ + &mockMigratableWriter{}, + &mockMigratableWriter{migrateErr: assert.AnError}, + }, + expectError: true, + }, + { + name: "mixed writers with migration error", + writers: []sinks.Writer{ + &MockWriter{}, + &mockMigratableWriter{migrateErr: assert.AnError}, + }, + expectError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mw := &sinks.MultiWriter{} + for _, w := range tt.writers { + mw.AddWriter(w) + } + err := mw.Migrate() + if tt.expectError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestMultiWriterNeedsMigration(t *testing.T) { + tests := []struct { + name string + writers []sinks.Writer + expectNeedsMigrate bool + expectError bool + }{ + { + name: "no migratable writers", + writers: []sinks.Writer{ + &MockWriter{}, + }, + expectNeedsMigrate: false, + expectError: false, + }, + { + name: "single writer needs migration", + writers: []sinks.Writer{ + &mockMigratableWriter{needsMigration: true}, + }, + expectNeedsMigrate: true, + expectError: false, + }, + { + name: "single writer no migration needed", + writers: []sinks.Writer{ + &mockMigratableWriter{needsMigration: false}, + }, + expectNeedsMigrate: false, + expectError: false, + }, + { + name: "multiple writers one needs migration", + writers: []sinks.Writer{ + &mockMigratableWriter{needsMigration: false}, + &mockMigratableWriter{needsMigration: true}, + }, + expectNeedsMigrate: true, + expectError: false, + }, + { + name: "error checking migration", + writers: []sinks.Writer{ + &mockMigratableWriter{needsMigrationErr: assert.AnError}, + }, + expectNeedsMigrate: false, + expectError: true, + }, + { + name: "mixed writers one needs migration", + writers: []sinks.Writer{ + &MockWriter{}, + &mockMigratableWriter{needsMigration: true}, + }, + expectNeedsMigrate: true, + expectError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mw := &sinks.MultiWriter{} + for _, w := range tt.writers { + mw.AddWriter(w) + } + needs, err := mw.NeedsMigration() + assert.Equal(t, tt.expectNeedsMigrate, needs) + if tt.expectError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} diff --git a/internal/sinks/postgres.go b/internal/sinks/postgres.go index 843915f521..ccf42855ef 100644 --- a/internal/sinks/postgres.go +++ b/internal/sinks/postgres.go @@ -15,6 +15,7 @@ import ( "github.com/cybertec-postgresql/pgwatch/v3/internal/db" "github.com/cybertec-postgresql/pgwatch/v3/internal/log" "github.com/cybertec-postgresql/pgwatch/v3/internal/metrics" + migrator "github.com/cybertec-postgresql/pgx-migrator" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" ) @@ -25,6 +26,35 @@ var ( targetColumns = [...]string{"time", "dbname", "data", "tag_data"} ) +//go:embed sql/admin_schema.sql +var sqlMetricAdminSchema string + +//go:embed sql/admin_functions.sql +var sqlMetricAdminFunctions string + +//go:embed sql/ensure_partition_postgres.sql +var sqlMetricEnsurePartitionPostgres string + +//go:embed sql/ensure_partition_timescale.sql +var sqlMetricEnsurePartitionTimescale string + +//go:embed sql/change_chunk_interval.sql +var sqlMetricChangeChunkIntervalTimescale string + +//go:embed sql/change_compression_interval.sql +var sqlMetricChangeCompressionIntervalTimescale string + +var ( + metricSchemaSQLs = []string{ + sqlMetricAdminSchema, + sqlMetricAdminFunctions, + sqlMetricEnsurePartitionPostgres, + sqlMetricEnsurePartitionTimescale, + sqlMetricChangeChunkIntervalTimescale, + sqlMetricChangeCompressionIntervalTimescale, + } +) + // PostgresWriter is a sink that writes metric measurements to a Postgres database. // At the moment, it supports both Postgres and TimescaleDB as a storage backend. // However, one is able to use any Postgres-compatible database as a storage backend, @@ -48,6 +78,8 @@ func NewPostgresWriter(ctx context.Context, connstr string, opts *CmdOpts) (pgw return NewWriterFromPostgresConn(ctx, conn, opts) } +var ErrNeedsMigration = errors.New("sink database schema is outdated, please run migrations using `pgwatch config upgrade` command") + func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *CmdOpts) (pgw *PostgresWriter, err error) { l := log.GetLogger(ctx).WithField("sink", "postgres").WithField("db", conn.Config().ConnConfig.Database) ctx = log.WithLogger(ctx, l) @@ -58,11 +90,36 @@ func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts * lastError: make(chan error), sinkDb: conn, } - if err = db.Init(ctx, pgw.sinkDb, func(ctx context.Context, conn db.PgxIface) error { + l.Info("initialising measurements database...") + if err = pgw.init(); err != nil { + return nil, err + } + if needsMigration, e := pgw.NeedsMigration(); e != nil { + return nil, e + } else if needsMigration { + return nil, ErrNeedsMigration + } + if err = pgw.ReadMetricSchemaType(); err != nil { + return nil, err + } + if err = pgw.EnsureBuiltinMetricDummies(); err != nil { + return nil, err + } + pgw.scheduleJob(pgw.maintenanceInterval, func() { + pgw.DeleteOldPartitions() + pgw.MaintainUniqueSources() + }) + go pgw.poll() + l.Info(`measurements sink is activated`) + return +} + +func (pgw *PostgresWriter) init() (err error) { + return db.Init(pgw.ctx, pgw.sinkDb, func(ctx context.Context, conn db.PgxIface) error { var isValidPartitionInterval bool if err = conn.QueryRow(ctx, "SELECT extract(epoch from $1::interval), extract(epoch from $2::interval), $3::interval >= '1h'::interval", - opts.RetentionInterval, opts.MaintenanceInterval, opts.PartitionInterval, + pgw.opts.RetentionInterval, pgw.opts.MaintenanceInterval, pgw.opts.PartitionInterval, ).Scan(&pgw.retentionInterval, &pgw.maintenanceInterval, &isValidPartitionInterval); err != nil { return err } @@ -72,7 +129,7 @@ func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts * pgw.maintenanceInterval *= time.Second if !isValidPartitionInterval { - return fmt.Errorf("--partition-interval must be at least 1 hour, got: %s", opts.PartitionInterval) + return fmt.Errorf("--partition-interval must be at least 1 hour, got: %s", pgw.opts.PartitionInterval) } if pgw.maintenanceInterval < 0 { return errors.New("--maintenance-interval must be a positive PostgreSQL interval or 0 to disable it") @@ -81,7 +138,6 @@ func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts * return errors.New("--retention must be at least 1 hour PostgreSQL interval or 0 to disable it") } - l.Info("initialising measurements database...") exists, err := db.DoesSchemaExist(ctx, conn, "admin") if err != nil || exists { return err @@ -92,55 +148,9 @@ func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts * } } return nil - }); err != nil { - return - } - if err = pgw.ReadMetricSchemaType(); err != nil { - return - } - if err = pgw.EnsureBuiltinMetricDummies(); err != nil { - return - } - - pgw.scheduleJob(pgw.maintenanceInterval, func() { - pgw.DeleteOldPartitions() - pgw.MaintainUniqueSources() }) - - go pgw.poll() - l.Info(`measurements sink is activated`) - return } -//go:embed sql/admin_schema.sql -var sqlMetricAdminSchema string - -//go:embed sql/admin_functions.sql -var sqlMetricAdminFunctions string - -//go:embed sql/ensure_partition_postgres.sql -var sqlMetricEnsurePartitionPostgres string - -//go:embed sql/ensure_partition_timescale.sql -var sqlMetricEnsurePartitionTimescale string - -//go:embed sql/change_chunk_interval.sql -var sqlMetricChangeChunkIntervalTimescale string - -//go:embed sql/change_compression_interval.sql -var sqlMetricChangeCompressionIntervalTimescale string - -var ( - metricSchemaSQLs = []string{ - sqlMetricAdminSchema, - sqlMetricAdminFunctions, - sqlMetricEnsurePartitionPostgres, - sqlMetricEnsurePartitionTimescale, - sqlMetricChangeChunkIntervalTimescale, - sqlMetricChangeCompressionIntervalTimescale, - } -) - type ExistingPartitionInfo struct { StartTime time.Time EndTime time.Time @@ -527,3 +537,56 @@ func (pgw *PostgresWriter) AddDBUniqueMetricToListingTable(dbUnique, metric stri _, err := pgw.sinkDb.Exec(pgw.ctx, sql, dbUnique, metric) return err } + +var initMigrator = func(pgw *PostgresWriter) (*migrator.Migrator, error) { + return migrator.New( + migrator.TableName("admin.migration"), + migrator.SetNotice(func(s string) { + log.GetLogger(pgw.ctx).Info(s) + }), + migrations(), + ) +} + +// Migrate upgrades database with all migrations +func (pgw *PostgresWriter) Migrate() error { + m, err := initMigrator(pgw) + if err != nil { + return fmt.Errorf("cannot initialize migration: %w", err) + } + return m.Migrate(pgw.ctx, pgw.sinkDb) +} + +// NeedsMigration checks if database needs migration +func (pgw *PostgresWriter) NeedsMigration() (bool, error) { + m, err := initMigrator(pgw) + if err != nil { + return false, err + } + return m.NeedUpgrade(pgw.ctx, pgw.sinkDb) +} + +// MigrationsCount is the total number of migrations in admin.migration table +const MigrationsCount = 1 + +// migrations holds function returning all upgrade migrations needed +var migrations func() migrator.Option = func() migrator.Option { + return migrator.Migrations( + &migrator.Migration{ + Name: "01110 Apply postgres sink schema migrations", + Func: func(context.Context, pgx.Tx) error { + // "migration" table will be created automatically + return nil + }, + }, + + // adding new migration here, update "admin"."migration" in "admin_schema.sql"! + + // &migrator.Migration{ + // Name: "000XX Short description of a migration", + // Func: func(ctx context.Context, tx pgx.Tx) error { + // return executeMigrationScript(ctx, tx, "000XX.sql") + // }, + // }, + ) +} diff --git a/internal/sinks/postgres_schema_test.go b/internal/sinks/postgres_schema_test.go new file mode 100644 index 0000000000..c10859562c --- /dev/null +++ b/internal/sinks/postgres_schema_test.go @@ -0,0 +1,91 @@ +package sinks + +import ( + "testing" + + migrator "github.com/cybertec-postgresql/pgx-migrator" + "github.com/pashagolub/pgxmock/v4" + "github.com/stretchr/testify/assert" +) + +func TestPostgresWriterMigrate(t *testing.T) { + a := assert.New(t) + conn, err := pgxmock.NewPool() + a.NoError(err) + + // Expect migration table creation and migration execution + conn.ExpectExec(`CREATE TABLE IF NOT EXISTS admin\.migration`).WillReturnResult(pgxmock.NewResult("CREATE", 1)) + conn.ExpectQuery(`SELECT count`).WillReturnRows(pgxmock.NewRows([]string{"count"}).AddRow(0)) + conn.ExpectBegin() + conn.ExpectExec(`INSERT INTO`).WillReturnResult(pgxmock.NewResult("INSERT", 1)) + + pgw := &PostgresWriter{ctx: ctx, sinkDb: conn} + err = pgw.Migrate() + a.NoError(err) + a.NoError(conn.ExpectationsWereMet()) +} + +func TestPostgresWriterNeedsMigration(t *testing.T) { + a := assert.New(t) + conn, err := pgxmock.NewPool() + a.NoError(err) + + // Expect checks for migration table existence and pending migrations + conn.ExpectQuery(`SELECT to_regclass`). + WithArgs("admin.migration"). + WillReturnRows(pgxmock.NewRows([]string{"to_regclass"}).AddRow(true)) + conn.ExpectQuery(`SELECT count`).WillReturnRows(pgxmock.NewRows([]string{"count"}).AddRow(0)) + + pgw := &PostgresWriter{ctx: ctx, sinkDb: conn} + needs, err := pgw.NeedsMigration() + a.NoError(err) + a.True(needs) + a.NoError(conn.ExpectationsWereMet()) +} + +func TestPostgresWriterNeedsMigrationNoMigrationNeeded(t *testing.T) { + a := assert.New(t) + conn, err := pgxmock.NewPool() + a.NoError(err) + + // Expect checks for migration table existence and no pending migrations + conn.ExpectQuery(`SELECT to_regclass`). + WithArgs("admin.migration"). + WillReturnRows(pgxmock.NewRows([]string{"to_regclass"}).AddRow(true)) + conn.ExpectQuery(`SELECT count`).WillReturnRows(pgxmock.NewRows([]string{"count"}).AddRow(1)) + + pgw := &PostgresWriter{ctx: ctx, sinkDb: conn} + needs, err := pgw.NeedsMigration() + a.NoError(err) + a.False(needs) + a.NoError(conn.ExpectationsWereMet()) +} + +func TestPostgresWriterMigrateFail(t *testing.T) { + oldInitMigrator := initMigrator + t.Cleanup(func() { + initMigrator = oldInitMigrator + }) + a := assert.New(t) + pgw := &PostgresWriter{ctx: ctx} + initMigrator = func(*PostgresWriter) (*migrator.Migrator, error) { + return nil, assert.AnError + } + err := pgw.Migrate() + a.Error(err) + a.Contains(err.Error(), "cannot initialize migration") +} + +func TestPostgresWriterNeedsMigrationFail(t *testing.T) { + oldInitMigrator := initMigrator + t.Cleanup(func() { + initMigrator = oldInitMigrator + }) + a := assert.New(t) + pgw := &PostgresWriter{ctx: ctx} + initMigrator = func(*PostgresWriter) (*migrator.Migrator, error) { + return nil, assert.AnError + } + _, err := pgw.NeedsMigration() + a.Error(err) +} diff --git a/internal/sinks/postgres_test.go b/internal/sinks/postgres_test.go index 8da15d335a..b0c176698a 100644 --- a/internal/sinks/postgres_test.go +++ b/internal/sinks/postgres_test.go @@ -39,30 +39,124 @@ func TestReadMetricSchemaType(t *testing.T) { } func TestNewWriterFromPostgresConn(t *testing.T) { - conn, err := pgxmock.NewPool() - assert.NoError(t, err) - - conn.ExpectPing() - conn.ExpectQuery("SELECT extract").WithArgs("1 day", "1 day", "1 hour").WillReturnRows( - pgxmock.NewRows([]string{"col1", "col2", "col3"}).AddRow((24 * time.Hour).Seconds(), (24 * time.Hour).Seconds(), true), - ) - conn.ExpectQuery("SELECT EXISTS").WithArgs("admin").WillReturnRows(pgxmock.NewRows([]string{"schema_type"}).AddRow(true)) - conn.ExpectQuery("SELECT schema_type").WillReturnRows(pgxmock.NewRows([]string{"schema_type"}).AddRow(true)) - for _, m := range metrics.GetDefaultBuiltInMetrics() { - conn.ExpectExec("SELECT admin.ensure_dummy_metrics_table").WithArgs(m).WillReturnResult(pgxmock.NewResult("EXECUTE", 1)) - } - + a := assert.New(t) opts := &CmdOpts{ BatchingDelay: time.Hour, RetentionInterval: "1 day", MaintenanceInterval: "1 day", PartitionInterval: "1 hour", } - pgw, err := NewWriterFromPostgresConn(ctx, conn, opts) - assert.NoError(t, err) - assert.NotNil(t, pgw) - assert.NoError(t, conn.ExpectationsWereMet()) + t.Run("Success", func(*testing.T) { + conn, err := pgxmock.NewPool() + a.NoError(err) + + conn.ExpectPing() + conn.ExpectQuery("SELECT extract").WithArgs("1 day", "1 day", "1 hour").WillReturnRows( + pgxmock.NewRows([]string{"col1", "col2", "col3"}).AddRow((24 * time.Hour).Seconds(), (24 * time.Hour).Seconds(), true), + ) + conn.ExpectQuery("SELECT EXISTS").WithArgs("admin").WillReturnRows(pgxmock.NewRows([]string{"schema_type"}).AddRow(true)) + // Expect migration check (before ReadMetricSchemaType) + conn.ExpectQuery(`SELECT to_regclass`).WithArgs("admin.migration").WillReturnRows(pgxmock.NewRows([]string{"to_regclass"}).AddRow(true)) + conn.ExpectQuery(`SELECT count`).WillReturnRows(pgxmock.NewRows([]string{"count"}).AddRow(MigrationsCount)) + conn.ExpectQuery("SELECT schema_type").WillReturnRows(pgxmock.NewRows([]string{"schema_type"}).AddRow(true)) + for _, m := range metrics.GetDefaultBuiltInMetrics() { + conn.ExpectExec("SELECT admin.ensure_dummy_metrics_table").WithArgs(m).WillReturnResult(pgxmock.NewResult("EXECUTE", 1)) + } + + pgw, err := NewWriterFromPostgresConn(ctx, conn, opts) + a.NoError(err) + a.NotNil(pgw) + a.NoError(conn.ExpectationsWereMet()) + }) + + t.Run("InitFail", func(*testing.T) { + conn, err := pgxmock.NewPool() + a.NoError(err) + + conn.ExpectPing().WillReturnError(assert.AnError) + + pgw, err := NewWriterFromPostgresConn(ctx, conn, opts) + a.Error(err) + a.Nil(pgw) + a.NoError(conn.ExpectationsWereMet()) + }) + + t.Run("MigrationCheckFail", func(*testing.T) { + conn, err := pgxmock.NewPool() + a.NoError(err) + + conn.ExpectPing() + conn.ExpectQuery("SELECT extract").WithArgs("1 day", "1 day", "1 hour").WillReturnRows( + pgxmock.NewRows([]string{"col1", "col2", "col3"}).AddRow((24 * time.Hour).Seconds(), (24 * time.Hour).Seconds(), true), + ) + conn.ExpectQuery("SELECT EXISTS").WithArgs("admin").WillReturnRows(pgxmock.NewRows([]string{"schema_type"}).AddRow(true)) + conn.ExpectQuery(`SELECT to_regclass`).WithArgs("admin.migration").WillReturnError(assert.AnError) + + pgw, err := NewWriterFromPostgresConn(ctx, conn, opts) + a.Error(err) + a.Nil(pgw) + a.NoError(conn.ExpectationsWereMet()) + }) + + t.Run("MigrationNeeded", func(*testing.T) { + conn, err := pgxmock.NewPool() + a.NoError(err) + + conn.ExpectPing() + conn.ExpectQuery("SELECT extract").WithArgs("1 day", "1 day", "1 hour").WillReturnRows( + pgxmock.NewRows([]string{"col1", "col2", "col3"}).AddRow((24 * time.Hour).Seconds(), (24 * time.Hour).Seconds(), true), + ) + conn.ExpectQuery("SELECT EXISTS").WithArgs("admin").WillReturnRows(pgxmock.NewRows([]string{"schema_type"}).AddRow(true)) + conn.ExpectQuery(`SELECT to_regclass`).WithArgs("admin.migration").WillReturnRows(pgxmock.NewRows([]string{"to_regclass"}).AddRow(true)) + conn.ExpectQuery(`SELECT count`).WillReturnRows(pgxmock.NewRows([]string{"count"}).AddRow(MigrationsCount - 1)) + + pgw, err := NewWriterFromPostgresConn(ctx, conn, opts) + a.ErrorIs(err, ErrNeedsMigration) + a.ErrorContains(err, "sink database schema is outdated") + a.ErrorContains(err, "pgwatch config upgrade") + a.Nil(pgw) + a.NoError(conn.ExpectationsWereMet()) + }) + + t.Run("ReadMetricSchemaTypeFail", func(*testing.T) { + conn, err := pgxmock.NewPool() + a.NoError(err) + + conn.ExpectPing() + conn.ExpectQuery("SELECT extract").WithArgs("1 day", "1 day", "1 hour").WillReturnRows( + pgxmock.NewRows([]string{"col1", "col2", "col3"}).AddRow((24 * time.Hour).Seconds(), (24 * time.Hour).Seconds(), true), + ) + conn.ExpectQuery("SELECT EXISTS").WithArgs("admin").WillReturnRows(pgxmock.NewRows([]string{"schema_type"}).AddRow(true)) + conn.ExpectQuery(`SELECT to_regclass`).WithArgs("admin.migration").WillReturnRows(pgxmock.NewRows([]string{"to_regclass"}).AddRow(true)) + conn.ExpectQuery(`SELECT count`).WillReturnRows(pgxmock.NewRows([]string{"count"}).AddRow(MigrationsCount)) + conn.ExpectQuery("SELECT schema_type").WillReturnError(assert.AnError) + + pgw, err := NewWriterFromPostgresConn(ctx, conn, opts) + a.Error(err) + a.Nil(pgw) + a.NoError(conn.ExpectationsWereMet()) + }) + + t.Run("EnsureBuiltinMetricDummiesFail", func(*testing.T) { + conn, err := pgxmock.NewPool() + a.NoError(err) + + conn.ExpectPing() + conn.ExpectQuery("SELECT extract").WithArgs("1 day", "1 day", "1 hour").WillReturnRows( + pgxmock.NewRows([]string{"col1", "col2", "col3"}).AddRow((24 * time.Hour).Seconds(), (24 * time.Hour).Seconds(), true), + ) + conn.ExpectQuery("SELECT EXISTS").WithArgs("admin").WillReturnRows(pgxmock.NewRows([]string{"schema_type"}).AddRow(true)) + conn.ExpectQuery(`SELECT to_regclass`).WithArgs("admin.migration").WillReturnRows(pgxmock.NewRows([]string{"to_regclass"}).AddRow(true)) + conn.ExpectQuery(`SELECT count`).WillReturnRows(pgxmock.NewRows([]string{"count"}).AddRow(MigrationsCount)) + conn.ExpectQuery("SELECT schema_type").WillReturnRows(pgxmock.NewRows([]string{"schema_type"}).AddRow(true)) + conn.ExpectExec("SELECT admin.ensure_dummy_metrics_table").WithArgs(pgxmock.AnyArg()).WillReturnError(assert.AnError) + + pgw, err := NewWriterFromPostgresConn(ctx, conn, opts) + a.Error(err) + a.Nil(pgw) + a.NoError(conn.ExpectationsWereMet()) + }) } func TestSyncMetric(t *testing.T) { diff --git a/internal/sinks/rpc_test.go b/internal/sinks/rpc_test.go index c493e8e107..6469321647 100644 --- a/internal/sinks/rpc_test.go +++ b/internal/sinks/rpc_test.go @@ -6,7 +6,6 @@ import ( "os" "testing" - "github.com/cybertec-postgresql/pgwatch/v3/internal/log" "github.com/cybertec-postgresql/pgwatch/v3/internal/metrics" "github.com/cybertec-postgresql/pgwatch/v3/internal/sinks" "github.com/cybertec-postgresql/pgwatch/v3/internal/testutil" @@ -15,7 +14,7 @@ import ( "google.golang.org/grpc/status" ) -var ctx = log.WithLogger(context.Background(), log.NewNoopLogger()) +var ctx = testutil.TestContext func TestMain(m *testing.M) { // Setup @@ -40,7 +39,8 @@ func TestCACertParamValidation(t *testing.T) { _, err := sinks.NewRPCWriter(ctx, testutil.TLSConnStr) a.NoError(err) - _, _ = os.Create("badca.crt") + err = os.WriteFile("badca.crt", []byte(""), 0644) + a.NoError(err) defer func() { _ = os.Remove("badca.crt") }() BadRPCParams := map[string]string{ diff --git a/internal/sinks/sql/admin_schema.sql b/internal/sinks/sql/admin_schema.sql index 0366d4bd1c..2427effe26 100644 --- a/internal/sinks/sql/admin_schema.sql +++ b/internal/sinks/sql/admin_schema.sql @@ -106,3 +106,17 @@ comment on table admin.metrics_template IS 'used as a template for all new metri create index on admin.metrics_template (dbname, time); -- create index on admin.metrics_template using brin (dbname, time); /* consider BRIN instead for large data amounts */ -- create index on admin.metrics_template using gin (tag_data) where tag_data notnull; + +-- define migrations you need to apply +-- every change to the database schema should populate this table. +-- Version value should contain issue number zero padded followed by +-- short description of the issue\feature\bug implemented\resolved +CREATE TABLE admin.migration( + id bigint PRIMARY KEY, + version text NOT NULL +); + +INSERT INTO + admin.migration (id, version) +VALUES + (0, '01110 Apply postgres sink schema migrations');