Skip to content

Commit da7f913

Browse files
committed
add flag to allow broker (and for on-prem the consumers) to avoid actual store health checks by returning healthy for the checks
1 parent b9aed4e commit da7f913

File tree

5 files changed

+32
-18
lines changed

5 files changed

+32
-18
lines changed

.github/workflows/ci-workflow.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ jobs:
3838
- uses: actions/setup-go@v4
3939
with:
4040
go-version-file: 'go.mod'
41-
- run: sudo apt install -y protobuf-compiler librocksdb-dev libsqlite3-dev etcd-server
41+
- run: sudo apt-get update && sudo apt-get install -y protobuf-compiler librocksdb-dev libsqlite3-dev etcd-server
4242
- run: |
4343
sha=${{ github.sha }}
4444
if [[ '${{ github.ref }}' == 'refs/heads/arize' ]]; then

broker/stores/health_check.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,10 @@ func runCheck(ctx context.Context, s *ActiveStore) error {
9090
testContent = "health-check\n"
9191
)
9292

93+
if ForceStoreHealthCheckToHealthy {
94+
return nil
95+
}
96+
9397
// 1. PUT test file
9498
var content = strings.NewReader(testContent)
9599
if err := s.Store.Put(ctx, testPath, content, int64(len(testContent)), ""); err != nil {

broker/stores/stores.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ var (
1717

1818
// Whether to return an unsigned URL when a signed URL is requested. Useful when clients do not require the signing.
1919
DisableSignedUrls bool = false
20+
21+
// Whether to force the health check of fragment stores to healthy.
22+
ForceStoreHealthCheckToHealthy bool = false
2023
)
2124

2225
// RegisterProviders registers store constructors for different storage schemes.

cmd/gazette/main.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -39,16 +39,17 @@ const iniFilename = "gazette.ini"
3939
var Config = new(struct {
4040
Broker struct {
4141
mbp.ServiceConfig
42-
Limit uint32 `long:"limit" env:"LIMIT" default:"1024" description:"Maximum number of Journals the broker will allocate"`
43-
FileRoot string `long:"file-root" env:"FILE_ROOT" description:"Local path which roots file:// fragment stores (optional)"`
44-
FileOnly bool `long:"file-only" env:"FILE_ONLY" description:"Use the local file:// store for all journal fragments, ignoring cloud bucket storage configuration (for example, S3)"`
45-
MaxAppendRate uint32 `long:"max-append-rate" env:"MAX_APPEND_RATE" default:"0" description:"Max rate (in bytes-per-sec) that any one journal may be appended to. If zero, there is no max rate"`
46-
MaxReplication uint32 `long:"max-replication" env:"MAX_REPLICATION" default:"9" description:"Maximum effective replication of any one journal, which upper-bounds its stated replication."`
47-
MinAppendRate uint32 `long:"min-append-rate" env:"MIN_APPEND_RATE" default:"65536" description:"Min rate (in bytes-per-sec) at which a client may stream Append RPC content. RPCs unable to sustain this rate are aborted"`
48-
WatchDelay time.Duration `long:"watch-delay" env:"WATCH_DELAY" default:"30ms" description:"Delay applied to the application of watched Etcd events. Larger values amortize the processing of fast-changing Etcd keys."`
49-
AuthKeys string `long:"auth-keys" env:"AUTH_KEYS" description:"Whitespace or comma separated, base64-encoded keys used to sign (first key) and verify (all keys) Authorization tokens." json:"-"`
50-
AutoSuspend bool `long:"auto-suspend" env:"AUTO_SUSPEND" description:"Automatically suspend journals which have persisted all fragments"`
51-
DisableSignedUrls bool `long:"disable-signed-urls" env:"DISABLE_SIGNED_URLS" description:"When a signed URL is requested, return an unsigned URL instead. This is useful when clients do not require the signing."`
42+
Limit uint32 `long:"limit" env:"LIMIT" default:"1024" description:"Maximum number of Journals the broker will allocate"`
43+
FileRoot string `long:"file-root" env:"FILE_ROOT" description:"Local path which roots file:// fragment stores (optional)"`
44+
FileOnly bool `long:"file-only" env:"FILE_ONLY" description:"Use the local file:// store for all journal fragments, ignoring cloud bucket storage configuration (for example, S3)"`
45+
MaxAppendRate uint32 `long:"max-append-rate" env:"MAX_APPEND_RATE" default:"0" description:"Max rate (in bytes-per-sec) that any one journal may be appended to. If zero, there is no max rate"`
46+
MaxReplication uint32 `long:"max-replication" env:"MAX_REPLICATION" default:"9" description:"Maximum effective replication of any one journal, which upper-bounds its stated replication."`
47+
MinAppendRate uint32 `long:"min-append-rate" env:"MIN_APPEND_RATE" default:"65536" description:"Min rate (in bytes-per-sec) at which a client may stream Append RPC content. RPCs unable to sustain this rate are aborted"`
48+
WatchDelay time.Duration `long:"watch-delay" env:"WATCH_DELAY" default:"30ms" description:"Delay applied to the application of watched Etcd events. Larger values amortize the processing of fast-changing Etcd keys."`
49+
AuthKeys string `long:"auth-keys" env:"AUTH_KEYS" description:"Whitespace or comma separated, base64-encoded keys used to sign (first key) and verify (all keys) Authorization tokens." json:"-"`
50+
AutoSuspend bool `long:"auto-suspend" env:"AUTO_SUSPEND" description:"Automatically suspend journals which have persisted all fragments"`
51+
DisableSignedUrls bool `long:"disable-signed-urls" env:"DISABLE_SIGNED_URLS" description:"When a signed URL is requested, return an unsigned URL instead. This is useful when clients do not require the signing."`
52+
ForceStoreHealthCheckToHealthy bool `long:"force-store-health-check-to-healthy" env:"FORCE_STORE_HEALTH_CHECK_TO_HEALTHY" description:"Force the health check of fragment stores to healthy"`
5253
} `group:"Broker" namespace:"broker" env-namespace:"BROKER"`
5354

5455
Etcd struct {
@@ -151,6 +152,7 @@ func (cmdServe) Execute(args []string) error {
151152
broker.MinAppendRate = int64(Config.Broker.MinAppendRate)
152153
pb.MaxReplication = int32(Config.Broker.MaxReplication)
153154
stores.DisableSignedUrls = Config.Broker.DisableSignedUrls
155+
stores.ForceStoreHealthCheckToHealthy = Config.Broker.ForceStoreHealthCheckToHealthy
154156

155157
var (
156158
lo = pb.NewAuthJournalClient(pb.NewJournalClient(srv.GRPCLoopback), authorizer)

mainboilerplate/runconsumer/run_consumer.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -92,13 +92,14 @@ type Config interface {
9292
type BaseConfig struct {
9393
Consumer struct {
9494
mbp.ServiceConfig
95-
Limit uint32 `long:"limit" env:"LIMIT" default:"32" description:"Maximum number of Shards this consumer process will allocate"`
96-
MaxHotStandbys uint32 `long:"max-hot-standbys" env:"MAX_HOT_STANDBYS" default:"3" description:"Maximum effective hot standbys of any one shard, which upper-bounds its stated hot-standbys."`
97-
WatchDelay time.Duration `long:"watch-delay" env:"WATCH_DELAY" default:"30ms" description:"Delay applied to the application of watched Etcd events. Larger values amortize the processing of fast-changing Etcd keys."`
98-
SkipSignedURLs bool `long:"skip-signed-urls" env:"SKIP_SIGNED_URLS" description:"When a signed URL is received, use fragment info instead to retrieve data with auth header. This is useful when clients do not wish/require the signing."`
99-
AuthKeys string `long:"auth-keys" env:"AUTH_KEYS" description:"Whitespace or comma separated, base64-encoded keys used to sign (first key) and verify (all keys) Authorization tokens." json:"-"`
100-
AWSAccessKeyIDPath string `long:"aws-access-key-id-path" env:"AWS_ACCESS_KEY_ID_PATH" default:"" description:"file path to the aws access key id secret"`
101-
AWSSecretAccessKeyPath string `long:"aws-secret-access-key-path" env:"AWS_SECRET_ACCESS_KEY_PATH" default:"" description:"file path to the aws secret access key secret"`
95+
Limit uint32 `long:"limit" env:"LIMIT" default:"32" description:"Maximum number of Shards this consumer process will allocate"`
96+
MaxHotStandbys uint32 `long:"max-hot-standbys" env:"MAX_HOT_STANDBYS" default:"3" description:"Maximum effective hot standbys of any one shard, which upper-bounds its stated hot-standbys."`
97+
WatchDelay time.Duration `long:"watch-delay" env:"WATCH_DELAY" default:"30ms" description:"Delay applied to the application of watched Etcd events. Larger values amortize the processing of fast-changing Etcd keys."`
98+
SkipSignedURLs bool `long:"skip-signed-urls" env:"SKIP_SIGNED_URLS" description:"When a signed URL is received, use fragment info instead to retrieve data with auth header. This is useful when clients do not wish/require the signing."`
99+
AuthKeys string `long:"auth-keys" env:"AUTH_KEYS" description:"Whitespace or comma separated, base64-encoded keys used to sign (first key) and verify (all keys) Authorization tokens." json:"-"`
100+
AWSAccessKeyIDPath string `long:"aws-access-key-id-path" env:"AWS_ACCESS_KEY_ID_PATH" default:"" description:"file path to the aws access key id secret"`
101+
AWSSecretAccessKeyPath string `long:"aws-secret-access-key-path" env:"AWS_SECRET_ACCESS_KEY_PATH" default:"" description:"file path to the aws secret access key secret"`
102+
ForceStoreHealthCheckToHealthy bool `long:"force-store-health-check-to-healthy" env:"FORCE_STORE_HEALTH_CHECK_TO_HEALTHY" description:"Force the health check of fragment stores to healthy"`
102103
} `group:"Consumer" namespace:"consumer" env-namespace:"CONSUMER"`
103104

104105
Broker struct {
@@ -196,6 +197,10 @@ func (sc Cmd) Execute(args []string) error {
196197
stores.DisableSignedUrls = true
197198
}
198199

200+
if bc.Consumer.ForceStoreHealthCheckToHealthy {
201+
stores.ForceStoreHealthCheckToHealthy = true
202+
}
203+
199204
// Load AWS credentials from file paths if both are configured.
200205
if bc.Consumer.AWSAccessKeyIDPath != "" && bc.Consumer.AWSSecretAccessKeyPath != "" {
201206
accessKeyID, err := os.ReadFile(bc.Consumer.AWSAccessKeyIDPath)

0 commit comments

Comments
 (0)