Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 34 additions & 6 deletions Exesh/cmd/coordinator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"errors"
executeAPI "exesh/internal/api/execute"
heartbeatAPI "exesh/internal/api/heartbeat"
"exesh/internal/config"
Expand All @@ -26,6 +27,9 @@ import (

"github.com/DIvanCode/filestorage/pkg/filestorage"
"github.com/go-chi/chi/v5"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

func main() {
Expand Down Expand Up @@ -73,11 +77,24 @@ func main() {
messageFactory := factory.NewMessageFactory(log)
messageSender := sender.NewKafkaSender(log, cfg.Sender)

promRegistry := prometheus.NewRegistry()
promRegistry.MustRegister(
collectors.NewGoCollector(),
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
)
promCoordReg := prometheus.WrapRegistererWithPrefix("coduels_exesh_coordinator_", promRegistry)

jobScheduler := schedule.NewJobScheduler(log)
exectuionScheduler := schedule.NewExecutionScheduler(log, cfg.ExecutionScheduler, unitOfWork, executionStorage,
executionScheduler := schedule.NewExecutionScheduler(log, cfg.ExecutionScheduler, unitOfWork, executionStorage,
jobFactory, jobScheduler, messageFactory, messageSender)

exectuionScheduler.Start(ctx)
err = executionScheduler.RegisterMetrics(promCoordReg)
if err != nil {
log.Error("could not register metrics from execution scheduler", slog.Any("err", err))
return
}

executionScheduler.Start(ctx)

executeUseCase := executeUC.NewUseCase(log, unitOfWork, executionStorage)
executeAPI.NewHandler(log, executeUseCase).Register(mux)
Expand All @@ -95,16 +112,27 @@ func main() {
Handler: mux,
}

msrv := &http.Server{
Addr: cfg.HttpServer.MetricsAddr,
Handler: promhttp.HandlerFor(promRegistry, promhttp.HandlerOpts{}),
}

go func() {
_ = srv.ListenAndServe()
}()

go func() {
if cfg.HttpServer.MetricsAddr != "" {
_ = msrv.ListenAndServe()
}
}()

log.Info("server started")

<-stop
log.Info("stopping server")

if err := srv.Shutdown(ctx); err != nil {
if err := errors.Join(srv.Shutdown(ctx), msrv.Shutdown(ctx)); err != nil {
log.Error("failed to stop server", slog.Any("error", err))
return
}
Expand All @@ -122,7 +150,7 @@ func setupLogger(env string) (log *slog.Logger, err error) {
err = fmt.Errorf("failed setup logger for env %s", env)
}

return
return log, err
}

func setupStorage(log *slog.Logger, cfg config.StorageConfig) (
Expand All @@ -136,7 +164,7 @@ func setupStorage(log *slog.Logger, cfg config.StorageConfig) (
unitOfWork, err = postgres.NewUnitOfWork(cfg)
if err != nil {
err = fmt.Errorf("failed to create unit of work: %w", err)
return
return unitOfWork, executionStorage, err
}

err = unitOfWork.Do(ctx, func(ctx context.Context) error {
Expand All @@ -146,7 +174,7 @@ func setupStorage(log *slog.Logger, cfg config.StorageConfig) (
return nil
})

return
return unitOfWork, executionStorage, err
}

func setupInputProvider(cfg config.InputProviderConfig, filestorageAdapter *adapter.FilestorageAdapter) *provider.InputProvider {
Expand Down
39 changes: 30 additions & 9 deletions Exesh/cmd/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,7 @@ package main

import (
"context"
"fmt"
flog "log"
"log/slog"
"net/http"
"os"
"os/signal"
"syscall"

"errors"
"exesh/internal/config"
"exesh/internal/executor"
"exesh/internal/executor/executors"
Expand All @@ -18,9 +11,20 @@ import (
"exesh/internal/provider/providers/adapter"
"exesh/internal/runtime/docker"
"exesh/internal/worker"
"fmt"
flog "log"
"log/slog"
"net/http"
"os"
"os/signal"
"syscall"

"github.com/DIvanCode/filestorage/pkg/filestorage"
"github.com/go-chi/chi/v5"
"github.com/prometheus/client_golang/prometheus/promhttp"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
)

func main() {
Expand Down Expand Up @@ -60,6 +64,12 @@ func main() {

worker.NewWorker(log, cfg.Worker, jobExecutor).Start(ctx)

promRegistry := prometheus.NewRegistry()
promRegistry.MustRegister(
collectors.NewGoCollector(),
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
)

log.Info("starting server", slog.String("address", cfg.HttpServer.Addr))

stop := make(chan os.Signal, 1)
Expand All @@ -70,16 +80,27 @@ func main() {
Handler: mux,
}

msrv := &http.Server{
Addr: cfg.HttpServer.MetricsAddr,
Handler: promhttp.HandlerFor(promRegistry, promhttp.HandlerOpts{}),
}

go func() {
_ = srv.ListenAndServe()
}()

go func() {
if cfg.HttpServer.MetricsAddr != "" {
_ = msrv.ListenAndServe()
}
}()

log.Info("server started")

<-stop
log.Info("stopping server")

if err := srv.Shutdown(ctx); err != nil {
if err := errors.Join(srv.Shutdown(ctx), msrv.Shutdown(ctx)); err != nil {
log.Error("failed to stop server", slog.Any("error", err))
return
}
Expand Down
3 changes: 2 additions & 1 deletion Exesh/config/coordinator/docker.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
env: docker
http_server:
addr: 0.0.0.0:5253
metrics_addr: 0.0.0.0:9090
storage:
connection_string: host=exesh_postgres port=5432 database=exesh user=coordinator password=secret
init_timeout: 60s
Expand Down Expand Up @@ -30,4 +31,4 @@ artifact_registry:
sender:
brokers:
- kafka:9092
topic: exesh.step-updates
topic: exesh.step-updates
1 change: 1 addition & 0 deletions Exesh/config/worker-1/docker.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
env: docker
http_server:
addr: 0.0.0.0:5254
metrics_addr: 0.0.0.0:9091
filestorage:
root_dir: file_storage/worker-1
trasher:
Expand Down
1 change: 1 addition & 0 deletions Exesh/config/worker-2/docker.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
env: docker
http_server:
addr: 0.0.0.0:5255
metrics_addr: 0.0.0.0:9092
filestorage:
root_dir: file_storage/worker-2
trasher:
Expand Down
3 changes: 3 additions & 0 deletions Exesh/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ services:
- coduels
ports:
- "5253:5253"
- "9080:9090"
environment:
CONFIG_PATH: /config/coordinator/docker.yml
depends_on:
Expand All @@ -57,6 +58,7 @@ services:
- coduels
ports:
- "5254:5254"
- "9081:9091"
environment:
CONFIG_PATH: /config/worker-1/docker.yml
volumes:
Expand All @@ -74,6 +76,7 @@ services:
- coduels
ports:
- "5255:5255"
- "9082:9092"
environment:
CONFIG_PATH: /config/worker-2/docker.yml
volumes:
Expand Down
12 changes: 10 additions & 2 deletions Exesh/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@ require (
github.com/ilyakaznacheev/cleanenv v1.5.0
github.com/jackc/pgx/v5 v5.7.6
github.com/opencontainers/image-spec v1.1.1
github.com/prometheus/client_golang v1.23.2
github.com/segmentio/kafka-go v0.4.49
)

require (
github.com/BurntSushi/toml v1.2.1 // indirect
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/ajg/form v1.5.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/containerd/errdefs v1.0.0 // indirect
github.com/containerd/errdefs/pkg v0.3.0 // indirect
github.com/containerd/log v0.1.0 // indirect
Expand All @@ -31,26 +34,31 @@ require (
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/joho/godotenv v1.5.1 // indirect
github.com/klauspost/compress v1.15.9 // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/moby/docker-image-spec v1.3.1 // indirect
github.com/moby/sys/atomicwriter v0.1.0 // indirect
github.com/moby/term v0.5.2 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.66.1 // indirect
github.com/prometheus/procfs v0.16.1 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.63.0 // indirect
go.opentelemetry.io/otel v1.38.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.38.0 // indirect
go.opentelemetry.io/otel/metric v1.38.0 // indirect
go.opentelemetry.io/otel/trace v1.38.0 // indirect
go.yaml.in/yaml/v2 v2.4.2 // indirect
golang.org/x/crypto v0.41.0 // indirect
golang.org/x/net v0.43.0 // indirect
golang.org/x/sync v0.16.0 // indirect
golang.org/x/sys v0.35.0 // indirect
golang.org/x/text v0.28.0 // indirect
golang.org/x/time v0.14.0 // indirect
google.golang.org/protobuf v1.36.8 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
gotest.tools/v3 v3.5.2 // indirect
olympos.io/encoding/edn v0.0.0-20201019073823-d3554ca0b0a3 // indirect
Expand Down
24 changes: 22 additions & 2 deletions Exesh/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@ github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERo
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
github.com/ajg/form v1.5.1 h1:t9c7v8JUKu/XxOGBU0yjNpaMloxGEJhUkqFRq0ibGeU=
github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM=
github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/containerd/errdefs v1.0.0 h1:tg5yIfIlQIrxYtu9ajqY42W3lpS19XqdxRQeEwYG8PI=
github.com/containerd/errdefs v1.0.0/go.mod h1:+YBYIdtsnF4Iw6nWZhJcqGSg/dwvV7tyJ/kCkyJ2k+M=
github.com/containerd/errdefs/pkg v0.3.0 h1:9IKJ06FvyNlexW690DXuQNx2KA2cUJXx151Xdx3ZPPE=
Expand Down Expand Up @@ -56,12 +60,14 @@ github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY=
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0=
github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo=
github.com/moby/sys/atomicwriter v0.1.0 h1:kw5D/EqkBwsBFi0ss9v1VG3wIkVhzGvLklJ+w3A14Sw=
Expand All @@ -72,6 +78,8 @@ github.com/moby/term v0.5.2 h1:6qk3FJAFDs6i/q3W/pQ97SX192qKfZgGjCQqfCJkgzQ=
github.com/moby/term v0.5.2/go.mod h1:d3djjFCrjnB+fl8NJux+EJzu0msscUP+f8it8hPkFLc=
github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A=
github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
github.com/opencontainers/image-spec v1.1.1 h1:y0fUlFfIZhPF1W537XOLg0/fcx6zcHCJwooC2xJA040=
Expand All @@ -82,6 +90,14 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o=
github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg=
github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk=
github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE=
github.com/prometheus/common v0.66.1 h1:h5E0h5/Y8niHc5DlaLlWLArTQI7tMrsfQjHV+d9ZoGs=
github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA=
github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg=
github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is=
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
github.com/segmentio/kafka-go v0.4.49 h1:GJiNX1d/g+kG6ljyJEoi9++PUMdXGAxb7JGPiDCuNmk=
Expand Down Expand Up @@ -121,6 +137,10 @@ go.opentelemetry.io/otel/trace v1.38.0 h1:Fxk5bKrDZJUH+AMyyIXGcFAPah0oRcT+LuNtJr
go.opentelemetry.io/otel/trace v1.38.0/go.mod h1:j1P9ivuFsTceSWe1oY+EeW3sc+Pp42sO++GHkg4wwhs=
go.opentelemetry.io/proto/otlp v1.7.1 h1:gTOMpGDb0WTBOP8JaO72iL3auEZhVmAQg4ipjOVAtj4=
go.opentelemetry.io/proto/otlp v1.7.1/go.mod h1:b2rVh6rfI/s2pHWNlB7ILJcRALpcNDzKhACevjI+ZnE=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI=
go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU=
golang.org/x/crypto v0.41.0 h1:WKYxWedPGCTVVl5+WHSSrOBT0O8lx32+zxmHxijgXp4=
golang.org/x/crypto v0.41.0/go.mod h1:pO5AFd7FA68rFak7rOAGVuygIISepHftHnr8dr6+sUc=
golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE=
Expand Down
3 changes: 2 additions & 1 deletion Exesh/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import "time"

type (
HttpServerConfig struct {
Addr string `yaml:"addr"`
Addr string `yaml:"addr"`
MetricsAddr string `yaml:"metrics_addr"`
}

InputProviderConfig struct {
Expand Down
Loading
Loading