Skip to content

Commit e96c7b5

Browse files
Function stats, function kill, fcall and fcall_ro (#2486)
1 parent 9aba95a commit e96c7b5

File tree

3 files changed

+447
-55
lines changed

3 files changed

+447
-55
lines changed

command.go

Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3993,6 +3993,231 @@ func (cmd *FunctionListCmd) readFunctions(rd *proto.Reader) ([]Function, error)
39933993
return functions, nil
39943994
}
39953995

3996+
// FunctionStats contains information about the scripts currently executing on the server, and the available engines
3997+
// - Engines:
3998+
// Statistics about the engine like number of functions and number of libraries
3999+
// - RunningScript:
4000+
// The script currently running on the shard we're connecting to.
4001+
// For Redis Enterprise and Redis Cloud, this represents the
4002+
// function with the longest running time, across all the running functions, on all shards
4003+
// - RunningScripts
4004+
// All scripts currently running in a Redis Enterprise clustered database.
4005+
// Only available on Redis Enterprise
4006+
type FunctionStats struct {
4007+
Engines []Engine
4008+
isRunning bool
4009+
rs RunningScript
4010+
allrs []RunningScript
4011+
}
4012+
4013+
func (fs *FunctionStats) Running() bool {
4014+
return fs.isRunning
4015+
}
4016+
4017+
func (fs *FunctionStats) RunningScript() (RunningScript, bool) {
4018+
return fs.rs, fs.isRunning
4019+
}
4020+
4021+
// AllRunningScripts returns all scripts currently running in a Redis Enterprise clustered database.
4022+
// Only available on Redis Enterprise
4023+
func (fs *FunctionStats) AllRunningScripts() []RunningScript {
4024+
return fs.allrs
4025+
}
4026+
4027+
type RunningScript struct {
4028+
Name string
4029+
Command []string
4030+
Duration time.Duration
4031+
}
4032+
4033+
type Engine struct {
4034+
Language string
4035+
LibrariesCount int64
4036+
FunctionsCount int64
4037+
}
4038+
4039+
type FunctionStatsCmd struct {
4040+
baseCmd
4041+
val FunctionStats
4042+
}
4043+
4044+
var _ Cmder = (*FunctionStatsCmd)(nil)
4045+
4046+
func NewFunctionStatsCmd(ctx context.Context, args ...interface{}) *FunctionStatsCmd {
4047+
return &FunctionStatsCmd{
4048+
baseCmd: baseCmd{
4049+
ctx: ctx,
4050+
args: args,
4051+
},
4052+
}
4053+
}
4054+
4055+
func (cmd *FunctionStatsCmd) SetVal(val FunctionStats) {
4056+
cmd.val = val
4057+
}
4058+
4059+
func (cmd *FunctionStatsCmd) String() string {
4060+
return cmdString(cmd, cmd.val)
4061+
}
4062+
4063+
func (cmd *FunctionStatsCmd) Val() FunctionStats {
4064+
return cmd.val
4065+
}
4066+
4067+
func (cmd *FunctionStatsCmd) Result() (FunctionStats, error) {
4068+
return cmd.val, cmd.err
4069+
}
4070+
4071+
func (cmd *FunctionStatsCmd) readReply(rd *proto.Reader) (err error) {
4072+
n, err := rd.ReadMapLen()
4073+
if err != nil {
4074+
return err
4075+
}
4076+
4077+
var key string
4078+
var result FunctionStats
4079+
for f := 0; f < n; f++ {
4080+
key, err = rd.ReadString()
4081+
if err != nil {
4082+
return err
4083+
}
4084+
4085+
switch key {
4086+
case "running_script":
4087+
result.rs, result.isRunning, err = cmd.readRunningScript(rd)
4088+
case "engines":
4089+
result.Engines, err = cmd.readEngines(rd)
4090+
case "all_running_scripts": // Redis Enterprise only
4091+
result.allrs, result.isRunning, err = cmd.readRunningScripts(rd)
4092+
default:
4093+
return fmt.Errorf("redis: function stats unexpected key %s", key)
4094+
}
4095+
4096+
if err != nil {
4097+
return err
4098+
}
4099+
}
4100+
4101+
cmd.val = result
4102+
return nil
4103+
}
4104+
4105+
func (cmd *FunctionStatsCmd) readRunningScript(rd *proto.Reader) (RunningScript, bool, error) {
4106+
err := rd.ReadFixedMapLen(3)
4107+
if err != nil {
4108+
if err == Nil {
4109+
return RunningScript{}, false, nil
4110+
}
4111+
return RunningScript{}, false, err
4112+
}
4113+
4114+
var runningScript RunningScript
4115+
for i := 0; i < 3; i++ {
4116+
key, err := rd.ReadString()
4117+
if err != nil {
4118+
return RunningScript{}, false, err
4119+
}
4120+
4121+
switch key {
4122+
case "name":
4123+
runningScript.Name, err = rd.ReadString()
4124+
case "duration_ms":
4125+
runningScript.Duration, err = cmd.readDuration(rd)
4126+
case "command":
4127+
runningScript.Command, err = cmd.readCommand(rd)
4128+
default:
4129+
return RunningScript{}, false, fmt.Errorf("redis: function stats unexpected running_script key %s", key)
4130+
}
4131+
4132+
if err != nil {
4133+
return RunningScript{}, false, err
4134+
}
4135+
}
4136+
4137+
return runningScript, true, nil
4138+
}
4139+
4140+
func (cmd *FunctionStatsCmd) readEngines(rd *proto.Reader) ([]Engine, error) {
4141+
n, err := rd.ReadMapLen()
4142+
if err != nil {
4143+
return nil, err
4144+
}
4145+
4146+
engines := make([]Engine, 0, n)
4147+
for i := 0; i < n; i++ {
4148+
engine := Engine{}
4149+
engine.Language, err = rd.ReadString()
4150+
if err != nil {
4151+
return nil, err
4152+
}
4153+
4154+
err = rd.ReadFixedMapLen(2)
4155+
if err != nil {
4156+
return nil, fmt.Errorf("redis: function stats unexpected %s engine map length", engine.Language)
4157+
}
4158+
4159+
for i := 0; i < 2; i++ {
4160+
key, err := rd.ReadString()
4161+
switch key {
4162+
case "libraries_count":
4163+
engine.LibrariesCount, err = rd.ReadInt()
4164+
case "functions_count":
4165+
engine.FunctionsCount, err = rd.ReadInt()
4166+
}
4167+
if err != nil {
4168+
return nil, err
4169+
}
4170+
}
4171+
4172+
engines = append(engines, engine)
4173+
}
4174+
return engines, nil
4175+
}
4176+
4177+
func (cmd *FunctionStatsCmd) readDuration(rd *proto.Reader) (time.Duration, error) {
4178+
t, err := rd.ReadInt()
4179+
if err != nil {
4180+
return time.Duration(0), err
4181+
}
4182+
return time.Duration(t) * time.Millisecond, nil
4183+
}
4184+
4185+
func (cmd *FunctionStatsCmd) readCommand(rd *proto.Reader) ([]string, error) {
4186+
4187+
n, err := rd.ReadArrayLen()
4188+
if err != nil {
4189+
return nil, err
4190+
}
4191+
4192+
command := make([]string, 0, n)
4193+
for i := 0; i < n; i++ {
4194+
x, err := rd.ReadString()
4195+
if err != nil {
4196+
return nil, err
4197+
}
4198+
command = append(command, x)
4199+
}
4200+
4201+
return command, nil
4202+
}
4203+
func (cmd *FunctionStatsCmd) readRunningScripts(rd *proto.Reader) ([]RunningScript, bool, error) {
4204+
n, err := rd.ReadArrayLen()
4205+
if err != nil {
4206+
return nil, false, err
4207+
}
4208+
4209+
runningScripts := make([]RunningScript, 0, n)
4210+
for i := 0; i < n; i++ {
4211+
rs, _, err := cmd.readRunningScript(rd)
4212+
if err != nil {
4213+
return nil, false, err
4214+
}
4215+
runningScripts = append(runningScripts, rs)
4216+
}
4217+
4218+
return runningScripts, len(runningScripts) > 0, nil
4219+
}
4220+
39964221
//------------------------------------------------------------------------------
39974222

39984223
// LCSQuery is a parameter used for the LCS command

commands.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -408,10 +408,14 @@ type Cmdable interface {
408408
FunctionLoadReplace(ctx context.Context, code string) *StringCmd
409409
FunctionDelete(ctx context.Context, libName string) *StringCmd
410410
FunctionFlush(ctx context.Context) *StringCmd
411+
FunctionKill(ctx context.Context) *StringCmd
411412
FunctionFlushAsync(ctx context.Context) *StringCmd
412413
FunctionList(ctx context.Context, q FunctionListQuery) *FunctionListCmd
413414
FunctionDump(ctx context.Context) *StringCmd
414415
FunctionRestore(ctx context.Context, libDump string) *StringCmd
416+
FunctionStats(ctx context.Context) *FunctionStatsCmd
417+
FCall(ctx context.Context, function string, keys []string, args ...interface{}) *Cmd
418+
FCallRo(ctx context.Context, function string, keys []string, args ...interface{}) *Cmd
415419

416420
Publish(ctx context.Context, channel string, message interface{}) *IntCmd
417421
SPublish(ctx context.Context, channel string, message interface{}) *IntCmd
@@ -3401,6 +3405,12 @@ func (c cmdable) FunctionFlush(ctx context.Context) *StringCmd {
34013405
return cmd
34023406
}
34033407

3408+
func (c cmdable) FunctionKill(ctx context.Context) *StringCmd {
3409+
cmd := NewStringCmd(ctx, "function", "kill")
3410+
_ = c(ctx, cmd)
3411+
return cmd
3412+
}
3413+
34043414
func (c cmdable) FunctionFlushAsync(ctx context.Context) *StringCmd {
34053415
cmd := NewStringCmd(ctx, "function", "flush", "async")
34063416
_ = c(ctx, cmd)
@@ -3434,6 +3444,44 @@ func (c cmdable) FunctionRestore(ctx context.Context, libDump string) *StringCmd
34343444
return cmd
34353445
}
34363446

3447+
func (c cmdable) FunctionStats(ctx context.Context) *FunctionStatsCmd {
3448+
cmd := NewFunctionStatsCmd(ctx, "function", "stats")
3449+
_ = c(ctx, cmd)
3450+
return cmd
3451+
}
3452+
3453+
func (c cmdable) FCall(ctx context.Context, function string, keys []string, args ...interface{}) *Cmd {
3454+
cmdArgs := fcallArgs("fcall", function, keys, args...)
3455+
cmd := NewCmd(ctx, cmdArgs...)
3456+
if len(keys) > 0 {
3457+
cmd.SetFirstKeyPos(3)
3458+
}
3459+
_ = c(ctx, cmd)
3460+
return cmd
3461+
}
3462+
func (c cmdable) FCallRo(ctx context.Context, function string, keys []string, args ...interface{}) *Cmd {
3463+
cmdArgs := fcallArgs("fcall_ro", function, keys, args...)
3464+
cmd := NewCmd(ctx, cmdArgs...)
3465+
if len(keys) > 0 {
3466+
cmd.SetFirstKeyPos(3)
3467+
}
3468+
_ = c(ctx, cmd)
3469+
return cmd
3470+
}
3471+
3472+
func fcallArgs(command string, function string, keys []string, args ...interface{}) []interface{} {
3473+
cmdArgs := make([]interface{}, 3+len(keys), 3+len(keys)+len(args))
3474+
cmdArgs[0] = command
3475+
cmdArgs[1] = function
3476+
cmdArgs[2] = len(keys)
3477+
for i, key := range keys {
3478+
cmdArgs[3+i] = key
3479+
}
3480+
3481+
cmdArgs = append(cmdArgs, args...)
3482+
return cmdArgs
3483+
}
3484+
34373485
//------------------------------------------------------------------------------
34383486

34393487
// Publish posts the message to the channel.

0 commit comments

Comments
 (0)