Skip to content

Commit 95d38fd

Browse files
authored
fix: RPC event serialization error, dont send empty messages (#1871)
## This PR - adds a test for the notification error that I encountered in #1869 ### Related Issues None that I'm aware of ### Notes vibe coding warning: I don't know go well enough to fix this, but I did use TDD and also did a manual test to verify that my problem went away. If any part of my solution is technicaly or doesn't fit in the codebase for any reason: feel free to comment, I'll do my best to improve it. ### How to test Running the test suite should be sufficient Signed-off-by: Karel Vervaeke <karel@vervaeke.info>
1 parent 9334b5e commit 95d38fd

File tree

3 files changed

+100
-5
lines changed

3 files changed

+100
-5
lines changed

flagd/pkg/service/flag-evaluation/connect_service_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
mock "github.com/open-feature/flagd/core/pkg/evaluator/mock"
1616
"github.com/open-feature/flagd/core/pkg/logger"
1717
"github.com/open-feature/flagd/core/pkg/model"
18-
"github.com/open-feature/flagd/core/pkg/notifications"
1918
iservice "github.com/open-feature/flagd/core/pkg/service"
2019
"github.com/open-feature/flagd/core/pkg/store"
2120
"github.com/open-feature/flagd/core/pkg/telemetry"
@@ -254,8 +253,8 @@ func TestConnectServiceWatcher(t *testing.T) {
254253
select {
255254
case n := <-sChan:
256255
require.Equal(t, ofType, n.Type, "expected notification type: %s, but received %s", ofType, n.Type)
257-
notifications := n.Data["flags"].(notifications.Notifications)
258-
flag1, ok := notifications["flag1"].(map[string]interface{})
256+
flags := n.Data["flags"].(map[string]interface{})
257+
flag1, ok := flags["flag1"].(map[string]interface{})
259258
require.True(t, ok, "flag1 notification should be a map[string]interface{}")
260259
require.Equal(t, flag1["type"], string(model.NotificationCreate), "expected notification type: %s, but received %s", model.NotificationCreate, flag1["type"])
261260
case <-timeout.Done():

flagd/pkg/service/flag-evaluation/eventing.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,17 +41,23 @@ func (eventing *eventingConfiguration) Subscribe(ctx context.Context, id any, se
4141
for result := range watcher {
4242
newFlags := make(map[string]model.Flag)
4343
for _, flag := range result.Flags {
44-
// we should be either selecting on a flag set here, or using the source-priority - duplicates are already handled, so we don't have to worry about overwrites
44+
// we should be either selecting on a flag set here, or using the source-priority - duplicates are already handled, so we don't have to worry about overwrites
4545
newFlags[flag.Key] = flag
4646
}
4747

4848
// ignore the first notification (nil old flags), the watcher emits on initialization, but for RPC we don't care until there's a change
4949
if oldFlags != nil {
5050
notifications := notifications.NewFromFlags(oldFlags, newFlags)
51+
// if there are no changes, don't emit a notification
52+
if len(notifications) == 0 {
53+
oldFlags = newFlags
54+
continue
55+
}
5156
notifier <- iservice.Notification{
5257
Type: iservice.ConfigurationChange,
5358
Data: map[string]interface{}{
54-
"flags": notifications,
59+
// don't use our custom type or it cannot be serialized, convert to map
60+
"flags": map[string]interface{}(notifications),
5561
},
5662
}
5763
}

flagd/pkg/service/flag-evaluation/eventing_test.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,14 @@ import (
44
"context"
55
"sync"
66
"testing"
7+
"time"
78

89
"github.com/open-feature/flagd/core/pkg/logger"
10+
"github.com/open-feature/flagd/core/pkg/model"
911
iservice "github.com/open-feature/flagd/core/pkg/service"
1012
"github.com/open-feature/flagd/core/pkg/store"
1113
"github.com/stretchr/testify/require"
14+
"google.golang.org/protobuf/types/known/structpb"
1215
)
1316

1417
func TestSubscribe(t *testing.T) {
@@ -71,3 +74,90 @@ func TestUnsubscribe(t *testing.T) {
7174
"expected subscription cleared, but value present: %v", eventing.subs[idA])
7275
require.Equal(t, chanB, eventing.subs[idB], "incorrect subscription association")
7376
}
77+
78+
// TestNotificationCompatibleWithStructpb verifies that notification data from
79+
// flag change events can be converted to protobuf structs, as required by the
80+
// EventStream handlers. This is a regression test for
81+
// https://github.com/open-feature/flagd/discussions/1869
82+
func TestNotificationCompatibleWithStructpb(t *testing.T) {
83+
sources := []string{"source1"}
84+
log := logger.NewLogger(nil, false)
85+
s, err := store.NewStore(log, sources)
86+
require.NoError(t, err)
87+
88+
eventing := &eventingConfiguration{
89+
subs: make(map[interface{}]chan iservice.Notification),
90+
mu: &sync.RWMutex{},
91+
store: s,
92+
logger: log,
93+
}
94+
95+
notifyChan := make(chan iservice.Notification, 1)
96+
eventing.Subscribe(context.Background(), "test", nil, notifyChan)
97+
// allow the subscription goroutine to process the initial watch result
98+
time.Sleep(100 * time.Millisecond)
99+
100+
// first update sets up oldFlags.
101+
s.Update(sources[0], []model.Flag{
102+
{Key: "flag1", DefaultVariant: "off"},
103+
}, model.Metadata{})
104+
105+
// second update triggers a ConfigurationChange with a real diff.
106+
s.Update(sources[0], []model.Flag{
107+
{Key: "flag1", DefaultVariant: "on"},
108+
}, model.Metadata{})
109+
110+
select {
111+
case n := <-notifyChan:
112+
require.Equal(t, iservice.ConfigurationChange, n.Type)
113+
// contains a named map type instead of plain map[string]interface{}.
114+
_, err := structpb.NewStruct(n.Data)
115+
require.NoError(t, err, "notification data must be compatible with structpb.NewStruct")
116+
case <-time.After(2 * time.Second):
117+
t.Fatal("timeout waiting for notification")
118+
}
119+
}
120+
121+
// TestNoNotificationWhenFlagsUnchanged verifies that no ConfigurationChange
122+
// notification is sent when a store update contains the same flags as before.
123+
func TestNoNotificationWhenFlagsUnchanged(t *testing.T) {
124+
sources := []string{"source1"}
125+
log := logger.NewLogger(nil, false)
126+
s, err := store.NewStore(log, sources)
127+
require.NoError(t, err)
128+
129+
eventing := &eventingConfiguration{
130+
subs: make(map[interface{}]chan iservice.Notification),
131+
mu: &sync.RWMutex{},
132+
store: s,
133+
logger: log,
134+
}
135+
136+
notifyChan := make(chan iservice.Notification, 1)
137+
eventing.Subscribe(context.Background(), "test", nil, notifyChan)
138+
time.Sleep(100 * time.Millisecond)
139+
140+
// first update creates flag1 — this produces a notification (create).
141+
s.Update(sources[0], []model.Flag{
142+
{Key: "flag1", DefaultVariant: "off"},
143+
}, model.Metadata{})
144+
145+
// drain the first notification (flag creation).
146+
select {
147+
case <-notifyChan:
148+
case <-time.After(2 * time.Second):
149+
t.Fatal("timeout waiting for first notification")
150+
}
151+
152+
// second update with the same flags — should not produce a notification.
153+
s.Update(sources[0], []model.Flag{
154+
{Key: "flag1", DefaultVariant: "off"},
155+
}, model.Metadata{})
156+
157+
select {
158+
case n := <-notifyChan:
159+
t.Fatalf("unexpected notification received: %v", n)
160+
case <-time.After(500 * time.Millisecond):
161+
// expected: no notification sent
162+
}
163+
}

0 commit comments

Comments
 (0)