Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions util/rpcclient/rpcclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type ClientConfig struct {
RetryErrors string `json:"retry-errors,omitempty" koanf:"retry-errors"`
RetryDelay time.Duration `json:"retry-delay,omitempty" koanf:"retry-delay"`
WebsocketMessageSizeLimit int64 `json:"websocket-message-size-limit,omitempty" koanf:"websocket-message-size-limit"`
ExecKeepAliveInterval time.Duration `json:"exec-keep-alive-interval,omitempty" koanf:"exec-keep-alive-interval"`

retryErrors *regexp.Regexp
}
Expand Down Expand Up @@ -58,6 +59,7 @@ var TestClientConfig = ClientConfig{
URL: "self",
JWTSecret: "",
WebsocketMessageSizeLimit: 256 * 1024 * 1024,
ExecKeepAliveInterval: time.Minute,
}

var DefaultClientConfig = ClientConfig{
Expand All @@ -68,6 +70,7 @@ var DefaultClientConfig = ClientConfig{
RetryErrors: "websocket: close.*|dial tcp .*|.*i/o timeout|.*connection reset by peer|.*connection refused",
ArgLogLimit: 2048,
WebsocketMessageSizeLimit: 256 * 1024 * 1024,
ExecKeepAliveInterval: time.Minute,
}

func RPCClientAddOptions(prefix string, f *pflag.FlagSet, defaultConfig *ClientConfig) {
Expand All @@ -80,6 +83,7 @@ func RPCClientAddOptions(prefix string, f *pflag.FlagSet, defaultConfig *ClientC
f.String(prefix+".retry-errors", defaultConfig.RetryErrors, "Errors matching this regular expression are automatically retried")
f.Duration(prefix+".retry-delay", defaultConfig.RetryDelay, "delay between retries")
f.Int64(prefix+".websocket-message-size-limit", defaultConfig.WebsocketMessageSizeLimit, "websocket message size limit used by the RPC client. 0 means no limit")
f.Duration(prefix+".exec-keep-alive-interval", defaultConfig.ExecKeepAliveInterval, "interval for sending keep-alive messages to execution client runs")
}

type RpcClient struct {
Expand All @@ -100,6 +104,14 @@ func (c *RpcClient) Timeout() time.Duration {
return c.config().Timeout
}

func (c *RpcClient) ExecKeepAliveInterval() time.Duration {
interval := c.config().ExecKeepAliveInterval
if interval == 0 {
return time.Minute
}
return interval
}

func (c *RpcClient) Close() {
if c.client != nil {
c.client.Close()
Expand Down
3 changes: 1 addition & 2 deletions validator/client/validation_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,7 @@ func (r *ExecutionClientRun) SendKeepAlive(ctx context.Context) time.Duration {
if err != nil {
log.Error("execution run keepalive failed", "err", err)
}
return time.Minute // TODO: configurable
}
return r.client.client.ExecKeepAliveInterval()

func (r *ExecutionClientRun) CheckAlive(ctx context.Context) error {
return r.client.client.CallContext(ctx, nil, server_api.Namespace+"_checkAlive", r.id)
Expand Down