Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions caddyDeploymentFiles/Caddyfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
(common_web_config) {
handle_path / {
reverse_proxy index:8080
}
handle /composite/* {
reverse_proxy index:8080
}
handle /ui {
reverse_proxy index:8080
}
handle_path /ui/* {
reverse_proxy ui:8080
}
handle_path /api/* {
reverse_proxy api:8585
}
handle_path /uat/* {
reverse_proxy uat:9999
}
handle_path /jobs/* {
reverse_proxy jobs:8686
}
}

localhost {
import common_web_config

}
17 changes: 17 additions & 0 deletions caddyDeploymentFiles/services.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"index": {
"url": "http://reportportal-index:8080/"
},
"jobs": {
"url": "http://reportportal-jobs:8686/"
},
"uat": {
"url": "http://reportportal-uat:9999/"
},
"ui": {
"url": "http://reportportal-ui:8080/"
},
"api": {
"url": "http://reportportal-api:8585/"
}
}
144 changes: 144 additions & 0 deletions jsonFile/jsonFile.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package jsonFile

import (
"encoding/json"
"errors"
"fmt"
"net/http"
"net/url"
"os"
"sync"
"time"

"github.com/go-resty/resty/v2"
log "github.com/sirupsen/logrus"
)

// Aggregator holds the services URLs from Json file.
type Aggregator struct {
services map[string]*NodeInfo
r *resty.Client
}

// NodeInfo embeds node-related information
type NodeInfo struct {
URL string `json:"url"`
}

// GetInfoEndpoint returns info endpoint URL
func (ni *NodeInfo) GetInfoEndpoint() string {
infoEndpoint, err := url.JoinPath(ni.URL, "/info")
if nil != err {
log.Errorf("Unable to join URL: %v", err)
}

return infoEndpoint
}

// GetHealthEndpoint returns health check URL
func (ni *NodeInfo) GetHealthEndpoint() string {
healthEndpoint, err := url.JoinPath(ni.URL, "/health")
if nil != err {
log.Errorf("Unable to join URL: %v", err)
}

return healthEndpoint
}

// NewAggregator creates new aggregator from Json file
func NewAggregator(configFile string, timeout time.Duration) (*Aggregator, error) {
services, err := loadProperties(configFile)
if err != nil {
return nil, err
}

return &Aggregator{
r: resty.NewWithClient(&http.Client{
Timeout: timeout,
}),
services: services,
}, nil

}

// LoadServices URLs from a Json file
func loadProperties(filename string) (map[string]*NodeInfo, error) {
byteValue, err := os.ReadFile(filename)
if err != nil {
log.Errorf("Failed to read file: %v", err)
}
var nodesInfo map[string]*NodeInfo
json.Unmarshal(byteValue, &nodesInfo)

if len(nodesInfo) == 0 {
log.Errorf("Couldn't read any service from Json file")
}

log.Infof("Loaded services URLs:")
for service, info := range nodesInfo {
log.Infof("Service: %s, URL: %s", service, info.URL)
}

return nodesInfo, nil
}

// AggregateHealth aggregates health info
func (a *Aggregator) AggregateHealth() map[string]interface{} {
return a.aggregate(func(ni *NodeInfo) (interface{}, error) {
var rs map[string]interface{}
if ni.GetHealthEndpoint() != "" {
_, e := a.r.R().SetResult(&rs).SetError(&rs).Get(ni.GetHealthEndpoint())
if nil != e {
rs = map[string]interface{}{"status": "DOWN"}
}
} else {
rs = map[string]interface{}{"status": "UNKNOWN"}
}

return rs, nil
})
}

// AggregateInfo aggregates info
func (a *Aggregator) AggregateInfo() map[string]interface{} {
return a.aggregate(func(info *NodeInfo) (interface{}, error) {
var rs map[string]interface{}
_, e := a.r.R().SetResult(&rs).Get(info.GetInfoEndpoint())
if nil != e {
log.Errorf("Unable to aggregate info: %v", e)

return nil, fmt.Errorf("unable to aggregate nodes info: %w", e)
}
if nil == rs {
log.Error("Unable to collect info endpoint response")

return nil, errors.New("response is empty")
}

return rs, nil
})
}

func (a *Aggregator) aggregate(f func(ni *NodeInfo) (interface{}, error)) map[string]interface{} {

nodeLen := len(a.services)
aggregated := make(map[string]interface{}, nodeLen)
var wg sync.WaitGroup

wg.Add(nodeLen)
var mu sync.Mutex
for node, info := range a.services {
go func(node string, info *NodeInfo) {
defer wg.Done()
res, err := f(info)
if nil == err {
mu.Lock()
aggregated[node] = res
mu.Unlock()
}
}(node, info)
}
wg.Wait()

return aggregated
}
9 changes: 9 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
log "github.com/sirupsen/logrus"

"github.com/reportportal/service-index/aggregator"
"github.com/reportportal/service-index/jsonFile"
"github.com/reportportal/service-index/k8s"
"github.com/reportportal/service-index/traefik"
)
Expand All @@ -30,6 +31,8 @@ func main() {
TraefikLbURL string `env:"LB_URL" envDefault:"http://localhost:8081"`
LogLevel string `env:"LOG_LEVEL" envDefault:"info"`
Path string `env:"RESOURCE_PATH" envDefault:""`
FileMode bool `env:"FILE_MODE" envDefault:"false"`
FilePath string `env:"FILE_PATH" envDefault:"services.json"`
}{
ServerConfig: cfg,
}
Expand All @@ -50,12 +53,18 @@ func main() {
srv := server.New(rpCfg.ServerConfig, info)

log.Infof("K8S mode enabled: %t", rpCfg.K8sMode)
log.Infof("File mode enabled: %t", rpCfg.FileMode)
var aggreg aggregator.Aggregator
if rpCfg.K8sMode {
aggreg, err = k8s.NewAggregator(httpClientTimeout)
if nil != err {
log.Fatalf("Incorrect K8S config %s", err.Error())
}
} else if rpCfg.FileMode {
aggreg, err = jsonFile.NewAggregator(rpCfg.FilePath, httpClientTimeout)
if nil != err {
log.Fatalf("Incorrect Json file config %s", err.Error())
}
} else {
aggreg = traefik.NewAggregator(
rpCfg.TraefikLbURL,
Expand Down