Skip to content

Commit d457442

Browse files
authored
[Issue 1132] Fix JSONSchema unmarshalling in TableView (#1133)
Fixes #1132 ### Motivation Fix issue #1132 - using JSONSchema with TableView ### Modifications - Set a concrete type in the `payload` variable before JSON-unmarshalling into that variable. This allows the JSON package to identify and use the type rather than seeing it as `interface{}`. - Use `reflect.Indirect(payload).Interface()` when storing the payload and passing it to listeners to remove the pointer from `reflect.New`. - Add test coverage for `TableView.Get` covering all supported schema types. - Add test coverage for `TableView.ForEachAndListen` for JSONSchema. Additional minor changes. They didn't seem worth their own MRs but I'm happy to split them out if that's better. - Correct typo in comments on `TableView.ForEach` and `TableView.ForEachAndListen` interface methods. - Correct `TableView.ForEachAndListen` comment to clarify that it continues to call the given action on future messages. - Correct formatting directive (`%w` -> `%v`) in error log `tv.logger.Errorf("msg.GetSchemaValue() failed with %v; msg is %v", err, msg)`. (This indirectly calls `fmt.Sprintf` in logrus which doesn't support `%w`).
1 parent ec846ff commit d457442

File tree

3 files changed

+219
-9
lines changed

3 files changed

+219
-9
lines changed

pulsar/table_view.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,12 +65,12 @@ type TableView interface {
6565
// Keys returns a slice of the keys contained in this TableView.
6666
Keys() []string
6767

68-
// ForEach performs the give action for each entry in this map until all entries have been processed or the action
68+
// ForEach performs the given action for each entry in this map until all entries have been processed or the action
6969
// returns an error.
7070
ForEach(func(string, interface{}) error) error
7171

72-
// ForEachAndListen performs the give action for each entry in this map until all entries have been processed or
73-
// the action returns an error.
72+
// ForEachAndListen performs the given action for each entry in this map until all entries have been processed or
73+
// the action returns an error. The given action will then be performed on each new entry in this map.
7474
ForEachAndListen(func(string, interface{}) error) error
7575

7676
// Close closes the table view and releases resources allocated.

pulsar/table_view_impl.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -245,19 +245,18 @@ func (tv *TableViewImpl) handleMessage(msg Message) {
245245
tv.dataMu.Lock()
246246
defer tv.dataMu.Unlock()
247247

248-
var payload interface{}
248+
payload := reflect.New(tv.options.SchemaValueType)
249249
if len(msg.Payload()) == 0 {
250250
delete(tv.data, msg.Key())
251251
} else {
252-
payload = reflect.Indirect(reflect.New(tv.options.SchemaValueType)).Interface()
253-
if err := msg.GetSchemaValue(&payload); err != nil {
254-
tv.logger.Errorf("msg.GetSchemaValue() failed with %w; msg is %v", err, msg)
252+
if err := msg.GetSchemaValue(payload.Interface()); err != nil {
253+
tv.logger.Errorf("msg.GetSchemaValue() failed with %v; msg is %v", err, msg)
255254
}
256-
tv.data[msg.Key()] = payload
255+
tv.data[msg.Key()] = reflect.Indirect(payload).Interface()
257256
}
258257

259258
for _, listener := range tv.listeners {
260-
if err := listener(msg.Key(), payload); err != nil {
259+
if err := listener(msg.Key(), reflect.Indirect(payload).Interface()); err != nil {
261260
tv.logger.Errorf("table view listener failed for %v: %w", msg, err)
262261
}
263262
}

pulsar/table_view_test.go

Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"testing"
2525
"time"
2626

27+
pb "github.com/apache/pulsar-client-go/integration-tests/pb"
2728
"github.com/stretchr/testify/assert"
2829
"github.com/stretchr/testify/require"
2930
)
@@ -80,6 +81,157 @@ func TestTableView(t *testing.T) {
8081
}
8182
}
8283

84+
func TestTableViewSchemas(t *testing.T) {
85+
var tests = []struct {
86+
name string
87+
schema Schema
88+
schemaType interface{}
89+
producerValue interface{}
90+
expValueOut interface{}
91+
valueCheck func(t *testing.T, got interface{}) // Overrides expValueOut for more complex checks
92+
}{
93+
{
94+
name: "StringSchema",
95+
schema: NewStringSchema(nil),
96+
schemaType: strPointer("hello pulsar"),
97+
producerValue: "hello pulsar",
98+
expValueOut: strPointer("hello pulsar"),
99+
},
100+
{
101+
name: "JSONSchema",
102+
schema: NewJSONSchema(exampleSchemaDef, nil),
103+
schemaType: testJSON{},
104+
producerValue: testJSON{ID: 1, Name: "Pulsar"},
105+
expValueOut: testJSON{ID: 1, Name: "Pulsar"},
106+
},
107+
{
108+
name: "JSONSchema pointer type",
109+
schema: NewJSONSchema(exampleSchemaDef, nil),
110+
schemaType: &testJSON{ID: 1, Name: "Pulsar"},
111+
producerValue: testJSON{ID: 1, Name: "Pulsar"},
112+
expValueOut: &testJSON{ID: 1, Name: "Pulsar"},
113+
},
114+
{
115+
name: "AvroSchema",
116+
schema: NewAvroSchema(exampleSchemaDef, nil),
117+
schemaType: testAvro{ID: 1, Name: "Pulsar"},
118+
producerValue: testAvro{ID: 1, Name: "Pulsar"},
119+
expValueOut: testAvro{ID: 1, Name: "Pulsar"},
120+
},
121+
{
122+
name: "Int8Schema",
123+
schema: NewInt8Schema(nil),
124+
schemaType: int8(0),
125+
producerValue: int8(1),
126+
expValueOut: int8(1),
127+
},
128+
{
129+
name: "Int16Schema",
130+
schema: NewInt16Schema(nil),
131+
schemaType: int16(0),
132+
producerValue: int16(1),
133+
expValueOut: int16(1),
134+
},
135+
{
136+
name: "Int32Schema",
137+
schema: NewInt32Schema(nil),
138+
schemaType: int32(0),
139+
producerValue: int32(1),
140+
expValueOut: int32(1),
141+
},
142+
{
143+
name: "Int64Schema",
144+
schema: NewInt64Schema(nil),
145+
schemaType: int64(0),
146+
producerValue: int64(1),
147+
expValueOut: int64(1),
148+
},
149+
{
150+
name: "FloatSchema",
151+
schema: NewFloatSchema(nil),
152+
schemaType: float32(0),
153+
producerValue: float32(1),
154+
expValueOut: float32(1),
155+
},
156+
{
157+
name: "DoubleSchema",
158+
schema: NewDoubleSchema(nil),
159+
schemaType: float64(0),
160+
producerValue: float64(1),
161+
expValueOut: float64(1),
162+
},
163+
{
164+
name: "ProtoSchema",
165+
schema: NewProtoSchema(protoSchemaDef, nil),
166+
schemaType: pb.Test{},
167+
producerValue: &pb.Test{Num: 1, Msf: "Pulsar"},
168+
valueCheck: func(t *testing.T, got interface{}) {
169+
assert.IsType(t, pb.Test{}, got)
170+
assert.Equal(t, int32(1), got.(pb.Test).Num)
171+
assert.Equal(t, "Pulsar", got.(pb.Test).Msf)
172+
},
173+
},
174+
{
175+
name: "ProtoNativeSchema",
176+
schema: NewProtoNativeSchemaWithMessage(&pb.Test{}, nil),
177+
schemaType: pb.Test{},
178+
producerValue: &pb.Test{Num: 1, Msf: "Pulsar"},
179+
valueCheck: func(t *testing.T, got interface{}) {
180+
assert.IsType(t, pb.Test{}, got)
181+
assert.Equal(t, int32(1), got.(pb.Test).Num)
182+
assert.Equal(t, "Pulsar", got.(pb.Test).Msf)
183+
},
184+
},
185+
}
186+
for _, test := range tests {
187+
t.Run(test.name, func(t *testing.T) {
188+
client, err := NewClient(ClientOptions{
189+
URL: lookupURL,
190+
})
191+
192+
assert.NoError(t, err)
193+
defer client.Close()
194+
195+
topic := newTopicName()
196+
197+
// create producer
198+
producer, err := client.CreateProducer(ProducerOptions{
199+
Topic: topic,
200+
Schema: test.schema,
201+
})
202+
assert.NoError(t, err)
203+
defer producer.Close()
204+
205+
_, err = producer.Send(context.Background(), &ProducerMessage{
206+
Key: "testKey",
207+
Value: test.producerValue,
208+
})
209+
assert.NoError(t, err)
210+
211+
// create table view
212+
tv, err := client.CreateTableView(TableViewOptions{
213+
Topic: topic,
214+
Schema: test.schema,
215+
SchemaValueType: reflect.TypeOf(test.schemaType),
216+
})
217+
assert.NoError(t, err)
218+
defer tv.Close()
219+
220+
value := tv.Get("testKey")
221+
if test.valueCheck != nil {
222+
test.valueCheck(t, value)
223+
} else {
224+
assert.IsType(t, test.expValueOut, value)
225+
assert.Equal(t, test.expValueOut, value)
226+
}
227+
})
228+
}
229+
}
230+
231+
func strPointer(s string) *string {
232+
return &s
233+
}
234+
83235
func TestPublishNilValue(t *testing.T) {
84236
client, err := NewClient(ClientOptions{
85237
URL: lookupURL,
@@ -143,3 +295,62 @@ func TestPublishNilValue(t *testing.T) {
143295

144296
assert.Equal(t, *(tv.Get("key-2").(*string)), "value-2")
145297
}
298+
299+
func TestForEachAndListenJSONSchema(t *testing.T) {
300+
client, err := NewClient(ClientOptions{
301+
URL: lookupURL,
302+
})
303+
304+
assert.NoError(t, err)
305+
defer client.Close()
306+
307+
topic := newTopicName()
308+
schema := NewJSONSchema(exampleSchemaDef, nil)
309+
310+
// create table view
311+
tv, err := client.CreateTableView(TableViewOptions{
312+
Topic: topic,
313+
Schema: schema,
314+
SchemaValueType: reflect.TypeOf(testJSON{}),
315+
})
316+
assert.NoError(t, err)
317+
defer tv.Close()
318+
319+
// create listener
320+
valuePrefix := "hello pulsar: "
321+
tv.ForEachAndListen(func(key string, value interface{}) error {
322+
t.Log("foreach" + key)
323+
s, ok := value.(testJSON)
324+
assert.Truef(t, ok, "expected value to be testJSON type got %T", value)
325+
assert.Equal(t, fmt.Sprintf(valuePrefix+key), s.Name)
326+
return nil
327+
})
328+
329+
// create producer
330+
producer, err := client.CreateProducer(ProducerOptions{
331+
Topic: topic,
332+
Schema: schema,
333+
})
334+
assert.NoError(t, err)
335+
defer producer.Close()
336+
337+
numMsg := 10
338+
for i := 0; i < numMsg; i++ {
339+
key := fmt.Sprintf("%d", i)
340+
t.Log("producing" + key)
341+
_, err = producer.Send(context.Background(), &ProducerMessage{
342+
Key: key,
343+
Value: testJSON{
344+
ID: i,
345+
Name: fmt.Sprintf(valuePrefix + key),
346+
},
347+
})
348+
assert.NoError(t, err)
349+
}
350+
351+
// Wait until tv receives all messages
352+
for tv.Size() < 10 {
353+
time.Sleep(time.Second * 1)
354+
t.Logf("TableView number of elements: %d", tv.Size())
355+
}
356+
}

0 commit comments

Comments
 (0)