Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ FROM alpine:3.18 AS dockerize
# appears to require `docker buildx` or an explicit `--platform` at build time
ARG TARGETARCH

RUN apk add --no-cache openssl
RUN apk add --no-cache openssl ca-certificates && update-ca-certificates

ENV DOCKERIZE_VERSION=v0.9.3
RUN wget https://github.com/jwilder/dockerize/releases/download/$DOCKERIZE_VERSION/dockerize-linux-$TARGETARCH-$DOCKERIZE_VERSION.tar.gz \
Expand Down Expand Up @@ -91,8 +91,8 @@ CMD /start-cadence.sh
# All-in-one Cadence server (~450mb)
FROM cadence-server AS cadence-auto-setup

RUN apk add --update --no-cache ca-certificates py3-pip mysql-client
RUN pip3 install cqlsh && cqlsh --version
RUN apk add --update --no-cache ca-certificates py3-pip py3-setuptools py3-wheel mysql-client
RUN pip3 install --no-build-isolation cqlsh && cqlsh --version

COPY docker/start.sh /start.sh
COPY docker/domain /etc/cadence/domain
Expand Down
34 changes: 31 additions & 3 deletions simulation/replication/replication_simulation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"encoding/json"
"flag"
"fmt"
"io"
"net/http"
"reflect"
"sort"
Expand Down Expand Up @@ -654,11 +655,27 @@ func waitUntilWorkersReady(t *testing.T) {
"http://cadence-worker1:6060/health",
}

client := &http.Client{Timeout: 1 * time.Second}

for {
allHealthy := true
lastStatus := make(map[string]int, len(workerEndpoints))
lastBody := make(map[string]string, len(workerEndpoints))
lastErr := make(map[string]error, len(workerEndpoints))
for _, endpoint := range workerEndpoints {
resp, err := http.Get(endpoint)
if err != nil || resp.StatusCode != http.StatusOK {
resp, err := client.Get(endpoint)
if err != nil {
lastErr[endpoint] = err
allHealthy = false
break
}
body, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
_ = resp.Body.Close()
lastStatus[endpoint] = resp.StatusCode
if len(body) > 0 {
lastBody[endpoint] = strings.TrimSpace(string(body))
}
if resp.StatusCode != http.StatusOK {
allHealthy = false
break
}
Expand All @@ -668,7 +685,18 @@ func waitUntilWorkersReady(t *testing.T) {
break
}

simTypes.Logf(t, "Workers are not reporting healthy yet. Sleep for 2s and try again")
// Include details so CI logs are actionable (DNS vs connection vs 503 readiness).
for _, endpoint := range workerEndpoints {
if err := lastErr[endpoint]; err != nil {
simTypes.Logf(t, "Workers are not reporting healthy yet (%s error: %v). Sleep for 2s and try again", endpoint, err)
} else if status, ok := lastStatus[endpoint]; ok {
if body, ok := lastBody[endpoint]; ok && body != "" {
simTypes.Logf(t, "Workers are not reporting healthy yet (%s status: %d body: %s). Sleep for 2s and try again", endpoint, status, body)
} else {
simTypes.Logf(t, "Workers are not reporting healthy yet (%s status: %d). Sleep for 2s and try again", endpoint, status)
}
}
}
time.Sleep(2 * time.Second)
}

Expand Down
39 changes: 38 additions & 1 deletion simulation/replication/worker/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package main

import (
"context"
"encoding/json"
"flag"
"fmt"
"net/http"
Expand Down Expand Up @@ -54,6 +55,9 @@ var (
clusterName = flag.String("cluster", "", "cluster name")

ready int32

domainErrMu sync.Mutex
domainLastErr = map[string]string{}
)

func main() {
Expand Down Expand Up @@ -101,12 +105,41 @@ func main() {
apiv1.NewVisibilityAPIYARPCClient(clientConfig),
)

type healthStatus struct {
Cluster string `json:"cluster"`
GRPCEndpoint string `json:"grpcEndpoint"`
ReadyDomains int32 `json:"readyDomains"`
TotalDomains int `json:"totalDomains"`
LastErrors map[string]string `json:"lastErrors,omitempty"`
}

http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
if atomic.LoadInt32(&ready) == int32(len(simCfg.Domains)) {
readyDomains := atomic.LoadInt32(&ready)
isReady := readyDomains == int32(len(simCfg.Domains))

domainErrMu.Lock()
lastErrors := make(map[string]string, len(domainLastErr))
for k, v := range domainLastErr {
lastErrors[k] = v
}
domainErrMu.Unlock()

w.Header().Set("Content-Type", "application/json")
status := healthStatus{
Cluster: *clusterName,
GRPCEndpoint: cluster.GRPCEndpoint,
ReadyDomains: readyDomains,
TotalDomains: len(simCfg.Domains),
LastErrors: lastErrors,
}
b, _ := json.Marshal(status) // best-effort diagnostics

if isReady {
w.WriteHeader(http.StatusOK)
} else {
w.WriteHeader(http.StatusServiceUnavailable)
}
_, _ = w.Write(b)
})
go http.ListenAndServe(":6060", nil)

Expand Down Expand Up @@ -186,6 +219,10 @@ func waitUntilDomainReady(logger *zap.Logger, client workflowserviceclient.Inter
return
}

domainErrMu.Lock()
domainLastErr[domainName] = err.Error()
domainErrMu.Unlock()

logger.Info("Domains not ready", zap.String("domain", domainName), zap.Error(err))
time.Sleep(2 * time.Second)
}
Expand Down
Loading