-
Notifications
You must be signed in to change notification settings - Fork 22
Open
Description
Hi,
I have this problem when deserializing from avro file, which returned duplicate values and also always continue running (not stopping).
regards
main.go
package main
import (
"bufio"
"encoding/json"
"fmt"
"gopkg.in/avro.v0"
"log"
"os"
)
type ViewJSON struct {
Timestamp string `json:"timestamp"`
UserID int `json:"user_id"`
ProductID int `json:"product_id"`
}
type ViewAVRO struct {
Timestamp string
UserID int32
ProductID int32
}
func main() {
view2avro()
avro2view()
}
func avro2view() {
specificReader, err := avro.NewDataFileReader("view.avro", avro.NewSpecificDatumReader())
if err != nil {
log.Fatalln("Cannot create data file reader.", err)
}
for {
obj := new(ViewAVRO)
hasNext, err := specificReader.Next(obj)
if err != nil {
log.Println("err", err)
}
fmt.Println(obj)
if !hasNext {
break
}
}
}
func view2avro() {
in, err := os.Open("view.txt")
if err != nil {
log.Fatalln("Cannot open file:", err)
}
defer in.Close()
out, err := os.Create("view.avro")
if err != nil {
log.Fatalln("Cannot open file:", err)
}
defer out.Close()
schema, err := avro.ParseSchemaFile("view.avsc")
if err != nil {
log.Fatalln("Cannot read schema file:", err)
}
dWriter := avro.NewSpecificDatumWriter()
dWriter.SetSchema(schema)
dfw, err := avro.NewDataFileWriter(out, schema, dWriter)
if err != nil {
log.Fatalln("Cannot read schema file:", err)
}
// defer dfw.Close()
scanner := bufio.NewScanner(in)
for scanner.Scan() {
viewRecord := ViewJSON{}
json.Unmarshal(scanner.Bytes(), &viewRecord)
// log.Println(viewRecord)
dfw.Write(&ViewAVRO{
Timestamp: viewRecord.Timestamp,
UserID: int32(viewRecord.UserID),
ProductID: int32(viewRecord.ProductID),
}) // must be in int32 (avro "typed"), and then pass it as reference; if not, its skipped.
// dfw.Flush()
}
dfw.Flush()
if err = dfw.Close(); err != nil {
log.Fatal(err)
}
}
view.txt
{"user_id":1,"product_id":100,"timestamp":"2019-03-10T23:59:58.953775421+07:00"}
{"user_id":2,"product_id":101,"timestamp":"2019-03-10T23:59:59.026548024+07:00"}
{"user_id":3,"product_id":102,"timestamp":"2019-03-10T23:59:58.953455719+07:00"}
{"user_id":4,"product_id":501,"timestamp":"2019-03-10T23:59:59.034088025+07:00"}
{"user_id":5,"product_id":30,"timestamp":"2019-03-10T23:59:58.941772276+07:00"}
view.avsc
{
"namespace": "shop",
"type": "record",
"name": "View",
"fields": [
{"name": "timestamp", "type": "string"},
{"name": "userID", "type": "int"},
{"name": "productID", "type": "int"}
]
}
Output:
go build main.go
./main
&{2019-03-10T23:59:58.953775421+07:00 1 100}
&{2019-03-10T23:59:59.026548024+07:00 2 101}
&{2019-03-10T23:59:58.953455719+07:00 3 102}
&{2019-03-10T23:59:59.034088025+07:00 4 501}
&{2019-03-10T23:59:58.941772276+07:00 5 30}
&{2019-03-10T23:59:58.953775421+07:00 1 100}
&{2019-03-10T23:59:59.026548024+07:00 2 101}
&{2019-03-10T23:59:58.953455719+07:00 3 102}
&{2019-03-10T23:59:59.034088025+07:00 4 501}
&{2019-03-10T23:59:58.941772276+07:00 5 30}
&{ 0 0}
&{ 0 0}
&{ 0 0}
&{ 0 0}
&{ 0 0}
&{ 0 0}
&{ 0 0}
&{ 0 0}
&{ 0 0}
&{ 0 0}
&{ 0 0}
&{ 0 0}
&{ 0 0}
...
...
..
..
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels