Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 48 additions & 13 deletions state/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,19 +465,43 @@ func (r *StateStore) registerSchemas(ctx context.Context) error {
return nil
}

func (r *StateStore) getKeyVersion(vals []interface{}) (data string, version *string, err error) {
seenData := false
seenVersion := false
for i := 0; i < len(vals); i += 2 {
field, _ := strconv.Unquote(fmt.Sprintf("%q", vals[i]))
switch field {
case "data":
data, _ = strconv.Unquote(fmt.Sprintf("%q", vals[i+1]))
seenData = true
case "version":
versionVal, _ := strconv.Unquote(fmt.Sprintf("%q", vals[i+1]))
version = ptr.Of(versionVal)
seenVersion = true
func (r *StateStore) getKeyVersion(vals []any) (data string, version *string, err error) {
var seenData, seenVersion bool

// step by 2: key, value. we only expect string or byte slice
for i := 0; i+1 < len(vals); i += 2 {
switch key := vals[i].(type) {
case string:
switch key {
case "data":
if s, ok := toString(vals[i+1]); ok {
data = s
seenData = true
}
case "version":
if s, ok := toString(vals[i+1]); ok {
version = &s
seenVersion = true
}
}
case []byte:
switch string(key) {
case "data":
if s, ok := toString(vals[i+1]); ok {
data = s
seenData = true
}
case "version":
if s, ok := toString(vals[i+1]); ok {
version = &s
seenVersion = true
}
}
}

// Early exit once both values have been found
if seenData && seenVersion {
break
}
}
if !seenData || !seenVersion {
Expand All @@ -487,6 +511,17 @@ func (r *StateStore) getKeyVersion(vals []interface{}) (data string, version *st
return data, version, nil
}

func toString(v any) (string, bool) {
switch x := v.(type) {
case string:
return x, true
case []byte:
return string(x), true // some allocation here unless we go to unsafe: return unsafe.String(unsafe.SliceData(x), len(x)), true
default:
return "", false
}
}

func (r *StateStore) parseETag(req *state.SetRequest) (int, error) {
if req.Options.Concurrency == state.LastWrite || req.ETag == nil || *req.ETag == "" {
return 0, nil
Expand Down
40 changes: 40 additions & 0 deletions state/redis/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,3 +521,43 @@ func setupMiniredis() (*miniredis.Miniredis, rediscomponent.RedisClient) {

return s, rediscomponent.ClientFromV8Client(redis.NewClient(opts))
}

func TestToString(t *testing.T) {
// happy paths
if s, ok := toString("abc"); assert.True(t, ok) {
assert.Equal(t, "abc", s)
}
if s, ok := toString([]byte("def")); assert.True(t, ok) {
assert.Equal(t, "def", s)
}
// unsupported
_, ok := toString(123)
assert.False(t, ok)
}

func BenchmarkGetKeyVersion(b *testing.B) {
/*
On a Mac M1 Pro:
BenchmarkGetKeyVersion-10 13651144 83.84 ns/op 64 B/op 6 allocs/op

// old getkeyversion method
BenchmarkGetKeyVersionOld-10 1631097 729.1 ns/op 96 B/op 10 allocs/op

// ~8x speed - ~1/2 allocations

// unsafe comparison
BenchmarkGetKeyVersion-10 28636363 41.53 ns/op 32 B/op 2 allocs/op
*/
store := newStateStore(logger.NewLogger("bench"))
input1 := []any{[]byte("data"), []byte("payload"), []byte("version"), []byte("42")}
input2 := []any{[]byte("data"), []byte("payload2"), []byte("version"), []byte("43")}
b.ReportAllocs()
for range b.N {
if _, _, err := store.getKeyVersion(input1); err != nil {
b.Fatal(err)
}
if _, _, err := store.getKeyVersion(input2); err != nil {
b.Fatal(err)
}
}
}
Loading