Skip to content
This repository was archived by the owner on Sep 28, 2022. It is now read-only.

Commit fc3ddb2

Browse files
committed
full kafka source integration test working
1 parent 1038c73 commit fc3ddb2

File tree

6 files changed

+276
-41
lines changed

6 files changed

+276
-41
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ require (
3434
github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2
3535
github.com/y0ssar1an/q v1.0.7
3636
golang.org/x/sync v0.0.0-20190423024810-112230192c58
37+
gopkg.in/avro.v0 v0.0.0-20171217001914-a730b5802183 // indirect
3738
gopkg.in/linkedin/goavro.v1 v1.0.5 // indirect
3839
)
3940

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,8 @@ google.golang.org/grpc v1.21.0/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ij
436436
google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
437437
google.golang.org/grpc v1.22.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
438438
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
439+
gopkg.in/avro.v0 v0.0.0-20171217001914-a730b5802183 h1:PGIdqvwfpMUyUP+QAlAnKTSWQ671SmYjoou2/5j7HXk=
440+
gopkg.in/avro.v0 v0.0.0-20171217001914-a730b5802183/go.mod h1:FvqrFXt+jCsyQibeRv4xxEJBL5iG2DDW5aeJwzDiq4A=
439441
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
440442
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
441443
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

v2/kafka/csrc/csrc.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package csrc
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"io/ioutil"
7+
"net/http"
8+
"strings"
9+
10+
"github.com/pkg/errors"
11+
)
12+
13+
type Client struct {
14+
URL string
15+
}
16+
17+
func NewClient(url string) *Client {
18+
if !strings.HasPrefix(url, "http") {
19+
url = "http://" + url
20+
}
21+
return &Client{
22+
URL: url,
23+
}
24+
}
25+
26+
// GetSchema gets the schema with the ID.
27+
// https://docs.confluent.io/current/schema-registry/develop/api.html#get--schemas-ids-int-%20id
28+
func (c *Client) GetSchema(id int) (string, error) {
29+
sr := SchemaResponse{}
30+
resp, err := http.Get(fmt.Sprintf("%s/schemas/ids/%d", c.URL, id))
31+
err = unmarshalRespErr(resp, err, &sr)
32+
if err != nil {
33+
return "", errors.Wrap(err, "making http request")
34+
}
35+
return sr.Schema, nil
36+
}
37+
38+
type SchemaResponse struct {
39+
Schema string `json:"schema"` // The actual AVRO schema
40+
Subject string `json:"subject"` // Subject where the schema is registered for
41+
Version int `json:"version"` // Version within this subject
42+
ID int `json:"id"` // Registry's unique id
43+
}
44+
45+
type ErrorResponse struct {
46+
StatusCode int `json:"error_code"`
47+
Body string `json:"message"`
48+
}
49+
50+
func (e *ErrorResponse) Error() string {
51+
return fmt.Sprintf("status %d: %s", e.StatusCode, e.Body)
52+
}
53+
54+
func (c *Client) PostSubjects(subj, schema string) (*SchemaResponse, error) {
55+
schema = strings.Replace(schema, "\t", "", -1)
56+
schema = strings.Replace(schema, "\n", `\n`, -1)
57+
schema = fmt.Sprintf(`{"schema": "%s"}`, strings.Replace(schema, `"`, `\"`, -1)) // this is probably terrible
58+
resp, err := http.Post(fmt.Sprintf("%s/subjects/%s/versions", c.URL, subj), "application/json", strings.NewReader(schema))
59+
sr := &SchemaResponse{}
60+
err = unmarshalRespErr(resp, err, sr)
61+
if err != nil {
62+
return nil, errors.Wrap(err, "unmarshaling resp")
63+
}
64+
return sr, nil
65+
}
66+
67+
func unmarshalRespErr(resp *http.Response, err error, into interface{}) error {
68+
if err != nil {
69+
return errors.Wrap(err, "making http request")
70+
}
71+
if resp.StatusCode != 200 {
72+
bod, err := ioutil.ReadAll(resp.Body)
73+
if err != nil {
74+
return errors.Wrap(err, "reading body")
75+
}
76+
errResp := &ErrorResponse{
77+
StatusCode: resp.StatusCode,
78+
Body: string(bod),
79+
}
80+
return errResp
81+
}
82+
dec := json.NewDecoder(resp.Body)
83+
err = dec.Decode(into)
84+
if err != nil {
85+
return errors.Wrap(err, "unmarshaling body")
86+
}
87+
return nil
88+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package csrc_test
2+
3+
import (
4+
"testing"
5+
6+
"github.com/pilosa/pdk/v2/kafka/csrc"
7+
)
8+
9+
func TestPostGet(t *testing.T) {
10+
if testing.Short() {
11+
t.Skip()
12+
}
13+
sr := "localhost:8081"
14+
client := csrc.NewClient(sr)
15+
16+
schemaStr := `{"type":"record","name":"a","fields":[{"name":"blah","type":"string"}]}`
17+
r, err := client.PostSubjects("aname", schemaStr)
18+
if err != nil {
19+
t.Fatalf("postsubjects: %v", err)
20+
}
21+
22+
// Docs indicate that schema and subject should be returned by the
23+
// POST, but they are not.
24+
//
25+
// if r.Schema != schemaStr {
26+
// t.Errorf("wrong schema: %s", r.Schema)
27+
// }
28+
29+
// if r.Subject != "aname" {
30+
// t.Errorf("wrong name: %v", r.Subject)
31+
// }
32+
33+
sch, err := client.GetSchema(r.ID)
34+
if err != nil {
35+
t.Fatalf("getting schema: %v", err)
36+
}
37+
38+
if sch != schemaStr {
39+
t.Errorf("unexpected schema\n%s\n%s", sch, schemaStr)
40+
}
41+
}

v2/kafka/source.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,9 +114,10 @@ type Source struct {
114114
// NewSource gets a new Source
115115
func NewSource() *Source {
116116
src := &Source{
117-
Hosts: []string{"localhost:9092"},
118-
Topics: []string{"test"},
119-
Group: "group0",
117+
Hosts: []string{"localhost:9092"},
118+
Topics: []string{"test"},
119+
Group: "group0",
120+
RegistryURL: "localhost:8081",
120121

121122
lastSchemaID: -1,
122123
cache: make(map[int32]avro.Schema),

v2/kafka/source_test.go

Lines changed: 140 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,20 @@ package kafka
22

33
import (
44
"encoding/binary"
5+
"fmt"
56
"io/ioutil"
67
"math/big"
8+
"math/rand"
79
"reflect"
810
"strings"
911
"testing"
12+
"time"
1013

1114
"github.com/Shopify/sarama"
1215
"github.com/go-avro/avro"
1316
liavro "github.com/linkedin/goavro/v2"
1417
pdk "github.com/pilosa/pdk/v2"
18+
"github.com/pilosa/pdk/v2/kafka/csrc"
1519
)
1620

1721
func TestAvroToPDKSchema(t *testing.T) {
@@ -103,47 +107,47 @@ func liDecodeTestSchema(t *testing.T, filename string) *liavro.Codec {
103107
return codec
104108
}
105109

106-
func TestKafkaSource(t *testing.T) {
110+
var tests = []struct {
111+
data []map[string]interface{}
112+
schemaFile string
113+
exp [][]interface{}
114+
}{
115+
{
116+
schemaFile: "simple.json",
117+
data: []map[string]interface{}{{"first": "hello", "last": "goodbye"}, {"first": "one", "last": "two"}},
118+
exp: [][]interface{}{{"hello", "goodbye"}, {"one", "two"}},
119+
},
120+
{
121+
schemaFile: "stringtypes.json",
122+
data: []map[string]interface{}{{"first": "blah", "last": "goodbye", "middle": "123456789"}},
123+
exp: [][]interface{}{{"blah", []byte("goodbye"), []byte("123456789")}},
124+
},
125+
{
126+
schemaFile: "decimal.json",
127+
data: []map[string]interface{}{{"somenum": &big.Rat{}}, {"somenum": big.NewRat(10, 1)}, {"somenum": big.NewRat(1, 1)}, {"somenum": big.NewRat(5, 2)}, {"somenum": big.NewRat(1234567890, 1)}},
128+
exp: [][]interface{}{{uint64(0)}, {uint64(1000)}, {uint64(100)}, {uint64(250)}, {uint64(123456789000)}},
129+
},
130+
{
131+
schemaFile: "othertypes.json",
132+
data: []map[string]interface{}{{"first": "a", "second": []string{"b", "c"}, "third": -8, "fourth": 99, "fifth": 99.9, "sixth": 101.1, "seventh": true}},
133+
exp: [][]interface{}{{"a", []interface{}{"b", "c"}, int32(-8), int64(99), float32(99.9), float64(101.1), true}},
134+
},
135+
{
136+
schemaFile: "unions.json",
137+
data: []map[string]interface{}{
138+
{"first": map[string]interface{}{"string": "a"}, "second": map[string]interface{}{"boolean": true}, "third": map[string]interface{}{"long": 101}, "fourth": map[string]interface{}{"bytes.decimal": big.NewRat(5, 2)}},
139+
{"first": nil, "second": nil, "third": map[string]interface{}{"null": nil}, "fourth": nil},
140+
},
141+
exp: [][]interface{}{
142+
{"a", true, int64(101), uint64(2500)},
143+
{nil, nil, nil, nil}},
144+
},
145+
}
146+
147+
func TestKafkaSourceLocal(t *testing.T) {
107148
// this is not an integration test, so we'll take steps to avoid
108149
// actually connecting to Kafka or Schema Registry.
109150

110-
tests := []struct {
111-
data []map[string]interface{}
112-
schemaFile string
113-
exp [][]interface{}
114-
}{
115-
{
116-
schemaFile: "simple.json",
117-
data: []map[string]interface{}{{"first": "hello", "last": "goodbye"}, {"first": "one", "last": "two"}},
118-
exp: [][]interface{}{{"hello", "goodbye"}, {"one", "two"}},
119-
},
120-
{
121-
schemaFile: "stringtypes.json",
122-
data: []map[string]interface{}{{"first": "blah", "last": "goodbye", "middle": "123456789"}},
123-
exp: [][]interface{}{{"blah", []byte("goodbye"), []byte("123456789")}},
124-
},
125-
{
126-
schemaFile: "decimal.json",
127-
data: []map[string]interface{}{{"somenum": &big.Rat{}}, {"somenum": big.NewRat(10, 1)}, {"somenum": big.NewRat(1, 1)}, {"somenum": big.NewRat(5, 2)}, {"somenum": big.NewRat(1234567890, 1)}},
128-
exp: [][]interface{}{{uint64(0)}, {uint64(1000)}, {uint64(100)}, {uint64(250)}, {uint64(123456789000)}},
129-
},
130-
{
131-
schemaFile: "othertypes.json",
132-
data: []map[string]interface{}{{"first": "a", "second": []string{"b", "c"}, "third": -8, "fourth": 99, "fifth": 99.9, "sixth": 101.1, "seventh": true}},
133-
exp: [][]interface{}{{"a", []interface{}{"b", "c"}, int32(-8), int64(99), float32(99.9), float64(101.1), true}},
134-
},
135-
{
136-
schemaFile: "unions.json",
137-
data: []map[string]interface{}{
138-
{"first": map[string]interface{}{"string": "a"}, "second": map[string]interface{}{"boolean": true}, "third": map[string]interface{}{"long": 101}, "fourth": map[string]interface{}{"bytes.decimal": big.NewRat(5, 2)}},
139-
{"first": nil, "second": nil, "third": map[string]interface{}{"null": nil}, "fourth": nil},
140-
},
141-
exp: [][]interface{}{
142-
{"a", true, int64(101), uint64(2500)},
143-
{nil, nil, nil, nil}},
144-
},
145-
}
146-
147151
src := NewSource()
148152
// note: we will not call Open on the source which would connect
149153
// to Kafka. Instead, we'll set the src.messages manually so we
@@ -203,6 +207,104 @@ func TestKafkaSource(t *testing.T) {
203207

204208
}
205209

210+
// TestKafkaSource uses a real Kafka and Schema Registry. I downloaded
211+
// the tar archive of the Confluent Platform (self managed software)
212+
// from confluent.io/download (I got version 5.3.1). I ran `tar xzf`
213+
// on the file, changed into the directory, ran `curl -L
214+
// https://cnfl.io/cli | sh -s -- -b /Users/jaffee/bin` (that
215+
// directory is on my PATH), then ran `confluent local start
216+
// schema-registry`.
217+
//
218+
// I find that this test runs much faster after a `confluent local
219+
// destroy` followed by `confluent local start schema-registry`. The
220+
// difference is stark—10s of seconds—and I don't know why this should
221+
// be, but I think it has something to do with kafka rebalancing
222+
// itself when a new client joins.
223+
func TestKafkaSourceIntegration(t *testing.T) {
224+
if testing.Short() {
225+
t.Skip()
226+
}
227+
src := NewSource()
228+
src.Topics = []string{"test"}
229+
src.Group = "group0"
230+
err := src.Open()
231+
if err != nil {
232+
t.Fatalf("opening source: %v", err)
233+
}
234+
235+
schemaClient := csrc.NewClient("localhost:8081")
236+
237+
conf := sarama.NewConfig()
238+
conf.Version = sarama.V0_10_0_0
239+
conf.Producer.Return.Successes = true
240+
producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, conf)
241+
if err != nil {
242+
t.Fatalf("getting new producer: %v", err)
243+
}
244+
defer producer.Close()
245+
246+
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
247+
248+
key := fmt.Sprintf("%d", rnd.Int())
249+
for i, test := range tests {
250+
schemaStr := readTestSchema(t, test.schemaFile)
251+
resp, err := schemaClient.PostSubjects(fmt.Sprintf("schema%d", i), schemaStr)
252+
if err != nil {
253+
t.Fatalf("posting schema: %v", err)
254+
}
255+
schemaID := resp.ID
256+
schema := liDecodeTestSchema(t, test.schemaFile)
257+
t.Run(test.schemaFile, func(t *testing.T) {
258+
259+
for j, record := range test.data {
260+
buf := make([]byte, 5, 1000)
261+
buf[0] = 0
262+
binary.BigEndian.PutUint32(buf[1:], uint32(schemaID))
263+
buf, err := schema.BinaryFromNative(buf, record)
264+
if err != nil {
265+
t.Errorf("encoding:\n%+v\nerr: %v", record, err)
266+
}
267+
268+
// post buf to kafka
269+
_, _, err = producer.SendMessage(&sarama.ProducerMessage{Topic: "test", Key: sarama.StringEncoder(key), Value: sarama.ByteEncoder(buf)})
270+
if err != nil {
271+
t.Fatalf("sending message to kafka: %v", err)
272+
}
273+
274+
pdkRec, err := src.Record()
275+
if j == 0 {
276+
if err != pdk.ErrSchemaChange {
277+
t.Errorf("expected schema changed signal, got: %v", err)
278+
}
279+
gotSchema := src.Schema()
280+
if !reflect.DeepEqual(gotSchema, expectedSchemas[test.schemaFile]) {
281+
t.Errorf("unexpected schema got/exp:\n%+v\n%+v", gotSchema, expectedSchemas[test.schemaFile])
282+
}
283+
} else if err != nil {
284+
t.Fatalf("unexpected error getting record: %v", err)
285+
}
286+
if pdkRec == nil {
287+
t.Fatalf("should have a record")
288+
}
289+
data := pdkRec.Data()
290+
if !reflect.DeepEqual(data, test.exp[j]) {
291+
t.Errorf("data mismatch exp/got:\n%+v\n%+v", test.exp[j], data)
292+
if len(data) != len(test.exp[j]) {
293+
t.Fatalf("mismatched lengths exp/got %d/%d", len(test.exp[j]), len(data))
294+
}
295+
for k := range test.exp[j] {
296+
if !reflect.DeepEqual(test.exp[j][k], data[k]) {
297+
t.Errorf("Mismatch at %d, exp/got\n%v of %[2]T\n%v of %[3]T", k, test.exp[j][k], data[k])
298+
}
299+
}
300+
301+
}
302+
}
303+
})
304+
}
305+
306+
}
307+
206308
var expectedSchemas = map[string][]pdk.Field{
207309
"simple.json": []pdk.Field{pdk.StringField{NameVal: "first"}, pdk.StringField{NameVal: "last"}},
208310
"stringtypes.json": []pdk.Field{pdk.StringField{NameVal: "first"}, pdk.StringField{NameVal: "last"}, pdk.StringField{NameVal: "middle"}},

0 commit comments

Comments
 (0)