From 149565b4a060bc37699018a5ac2d5634adbb8a89 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 18 Jul 2023 15:53:44 +0900 Subject: [PATCH 1/7] output: Handle Fluent Bit V2 metadata format Signed-off-by: Hiroshi Hatake --- output/decoder.go | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/output/decoder.go b/output/decoder.go index 0242990..ffdf5b3 100644 --- a/output/decoder.go +++ b/output/decoder.go @@ -82,7 +82,22 @@ func GetRecord(dec *FLBDecoder) (ret int, ts interface{}, rec map[interface{}]in return -2, 0, nil } - t := slice.Index(0).Interface() + var t interface{} + ts = slice.Index(0).Interface() + switch ty := ts.(type) { + case FLBTime: + t = ty + case uint64: + t = ty + case []interface{}: // for Fluent Bit V2 metadata type of format + s := reflect.ValueOf(ty) + if s.Kind() != reflect.Slice || s.Len() < 2 { + return -4, 0, nil + } + t = s.Index(0).Interface() + default: + return -5, 0, nil + } data := slice.Index(1) map_data, ok := data.Interface().(map[interface{}]interface{}) From 4c65c5a7d30cdec51c9bd1a0e15601fa8cb1b73b Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 24 Jul 2023 14:58:18 +0900 Subject: [PATCH 2/7] decoder: Add a decoder test case for Fluent Bit V2 format Signed-off-by: Hiroshi Hatake --- output/decoder_test.go | 44 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/output/decoder_test.go b/output/decoder_test.go index 53e3cb8..24d0f93 100644 --- a/output/decoder_test.go +++ b/output/decoder_test.go @@ -33,6 +33,19 @@ var dummyRecord [29]byte = [29]byte{0x92, /* fix array 2 */ 0x01, /* fix int 1 */ } +// dummyV2Record should be byte Array, not Slice to be able to Cast c array. +var dummyV2Record [39]byte = [39]byte{0xdd, /* array 32 */ 0x00, 0x00, 0x00, + 0x02, /* count of array elements */ + 0xdd, /* array 32 */ 0x00, 0x00, 0x00, + 0x02, /* count of array elements */ + 0xd7, 0x00, 0x64, 0xbe, 0x0e, 0xeb, 0x16, 0x36, 0xe1, 0x28, 0x80, /* 2023/07/24 14:40:59 */ + 0x82, /* fix map 2 */ + 0xa7, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, /* fix str 7 "compact" */ + 0xc3, /* true */ + 0xa6, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, /* fix str 6 "schema" */ + 0x01, /* fix int 1 */ +} + func TestGetRecord(t *testing.T) { dec := NewDecoder(unsafe.Pointer(&dummyRecord), len(dummyRecord)) if dec == nil { @@ -63,3 +76,34 @@ func TestGetRecord(t *testing.T) { t.Errorf(`record["schema"] is not 1 %d`, v) } } + +func TestGetV2Record(t *testing.T) { + dec := NewDecoder(unsafe.Pointer(&dummyV2Record), len(dummyV2Record)) + if dec == nil { + t.Fatal("dec is nil") + } + + ret, timestamp, record := GetRecord(dec) + if ret < 0 { + t.Fatalf("ret is negative: code %v", ret) + } + + // test timestamp + ts, ok := timestamp.(FLBTime) + if !ok { + t.Fatalf("cast error. Type is %s", reflect.TypeOf(timestamp)) + } + + if ts.Unix() != int64(0x64be0eeb) { + t.Errorf("ts.Unix() error. given %d", ts.Unix()) + } + + // test record + v, ok := record["schema"].(int64) + if !ok { + t.Fatalf("cast error. Type is %s", reflect.TypeOf(record["schema"])) + } + if v != 1 { + t.Errorf(`record["schema"] is not 1 %d`, v) + } +} From edc8893f1939c01f98b19ec8854119ab3d4247ce Mon Sep 17 00:00:00 2001 From: "dave.seddon" Date: Tue, 25 Jul 2023 14:58:32 -0700 Subject: [PATCH 3/7] idiomatic err --- output/decoder.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/output/decoder.go b/output/decoder.go index ffdf5b3..51d1379 100644 --- a/output/decoder.go +++ b/output/decoder.go @@ -69,11 +69,11 @@ func NewDecoder(data unsafe.Pointer, length int) *FLBDecoder { } func GetRecord(dec *FLBDecoder) (ret int, ts interface{}, rec map[interface{}]interface{}) { - var check error + var m interface{} - check = dec.mpdec.Decode(&m) - if check != nil { + err := dec.mpdec.Decode(&m) + if err != nil { return -1, 0, nil } From cd399bccf8fea3387a94045b333747f59f7096ad Mon Sep 17 00:00:00 2001 From: "dave.seddon" Date: Tue, 25 Jul 2023 14:59:23 -0700 Subject: [PATCH 4/7] update dependancies --- go.mod | 4 ++-- go.sum | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index d0fa84d..0b387cd 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,5 @@ module github.com/fluent/fluent-bit-go -go 1.14 +go 1.20 -require github.com/ugorji/go/codec v1.1.7 +require github.com/ugorji/go/codec v1.2.11 diff --git a/go.sum b/go.sum index f43e45a..8881ca7 100644 --- a/go.sum +++ b/go.sum @@ -2,3 +2,5 @@ github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo= github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs= github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= +github.com/ugorji/go/codec v1.2.11 h1:BMaWp1Bb6fHwEtbplGBGJ498wD+LKlNSl25MjdZY4dU= +github.com/ugorji/go/codec v1.2.11/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= From 340f63a685c8885652b96b75cf30e369ff105438 Mon Sep 17 00:00:00 2001 From: "dave.seddon" Date: Tue, 25 Jul 2023 15:21:07 -0700 Subject: [PATCH 5/7] extractTimeStamp --- output/decoder_test.go | 62 ++++++++++++++++++++++++++++++++++++++---- 1 file changed, 56 insertions(+), 6 deletions(-) diff --git a/output/decoder_test.go b/output/decoder_test.go index 24d0f93..7bf4bd8 100644 --- a/output/decoder_test.go +++ b/output/decoder_test.go @@ -18,8 +18,10 @@ package output import ( + "encoding/binary" "reflect" "testing" + "time" "unsafe" ) @@ -34,16 +36,16 @@ var dummyRecord [29]byte = [29]byte{0x92, /* fix array 2 */ } // dummyV2Record should be byte Array, not Slice to be able to Cast c array. -var dummyV2Record [39]byte = [39]byte{0xdd, /* array 32 */ 0x00, 0x00, 0x00, +var dummyV2Record [39]byte = [39]byte{0xdd /* array 32 */, 0x00, 0x00, 0x00, 0x02, /* count of array elements */ - 0xdd, /* array 32 */ 0x00, 0x00, 0x00, - 0x02, /* count of array elements */ + 0xdd /* array 32 */, 0x00, 0x00, 0x00, + 0x02, /* count of array elements */ 0xd7, 0x00, 0x64, 0xbe, 0x0e, 0xeb, 0x16, 0x36, 0xe1, 0x28, 0x80, /* 2023/07/24 14:40:59 */ 0x82, /* fix map 2 */ 0xa7, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, /* fix str 7 "compact" */ - 0xc3, /* true */ - 0xa6, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, /* fix str 6 "schema" */ - 0x01, /* fix int 1 */ + 0xc3, /* true */ + 0xa6, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, /* fix str 6 "schema" */ + 0x01, /* fix int 1 */ } func TestGetRecord(t *testing.T) { @@ -94,6 +96,11 @@ func TestGetV2Record(t *testing.T) { t.Fatalf("cast error. Type is %s", reflect.TypeOf(timestamp)) } + // test + if ts.Time != extractTimeStamp() { + t.Fatalf("GetRecord timestamp extraction does not match manual extractTimeStamp") + } + if ts.Unix() != int64(0x64be0eeb) { t.Errorf("ts.Unix() error. given %d", ts.Unix()) } @@ -107,3 +114,46 @@ func TestGetV2Record(t *testing.T) { t.Errorf(`record["schema"] is not 1 %d`, v) } } + +// extractTimeStamp extracts the time out of the MsgPack data without using refection +func extractTimeStamp() (ts time.Time) { + + data := unsafe.Slice((*byte)(unsafe.Pointer(&dummyV2Record)), int(len(dummyV2Record))) + + // Manually find the time. + // + // The first headers of the data coming from Fluentbit looks like this + //~/Downloads/msgpack-inspect ./data + // --- + // - format: "array32" + // header: "0xdd" + // length: 2 + // children: + // - format: "array32" + // header: "0xdd" + // length: 2 + // children: + // - format: "fixext8" + // header: "0xd7" + // exttype: 0 + // length: 8 + // data: "0x64a46baa019bfcc0" + // - format: "fixmap" + // header: "0x80" + // length: 0 + // children: [] + // Array32 is 5 bytes long, and the fixext8 has x2 byte header, so that's where the time field starts + // fixext8 has 8 bytes of data + // Therefore, time bits start at: 5 + 5 + 2 = 12 + // Time bits are 8 long, so 12 + 8 = 20 + // See also: https://github.com/msgpack/msgpack/blob/master/spec.md + // + timeEightBytes := data[12:20] // extract bytes 12 through 20 + sec := binary.BigEndian.Uint32(timeEightBytes) + usec := binary.BigEndian.Uint32(timeEightBytes[4:]) + ts = time.Unix(int64(sec), int64(usec)) + + //log.Println("timestamp:", timestamp.Format(time.RFC850) + + return ts +} From 404c51f49ce19cc4ee96d24ee4da9c18a6cf2aee Mon Sep 17 00:00:00 2001 From: "dave.seddon" Date: Tue, 25 Jul 2023 15:32:45 -0700 Subject: [PATCH 6/7] testdata data --- output/decoder_test.go | 76 +++++++++++++++++++++++++++-------------- output/testdata/data | Bin 0 -> 39 bytes 2 files changed, 50 insertions(+), 26 deletions(-) create mode 100644 output/testdata/data diff --git a/output/decoder_test.go b/output/decoder_test.go index 7bf4bd8..66c6589 100644 --- a/output/decoder_test.go +++ b/output/decoder_test.go @@ -19,12 +19,18 @@ package output import ( "encoding/binary" + "log" + "os" "reflect" "testing" "time" "unsafe" ) +const ( + testDataFile = "./testdata/data" +) + // dummyRecord should be byte Array, not Slice to be able to Cast c array. var dummyRecord [29]byte = [29]byte{0x92, /* fix array 2 */ 0xd7, 0x00, 0x5e, 0xa9, 0x17, 0xe0, 0x00, 0x00, 0x00, 0x00, /* 2020/04/29 06:00:00*/ @@ -80,38 +86,56 @@ func TestGetRecord(t *testing.T) { } func TestGetV2Record(t *testing.T) { - dec := NewDecoder(unsafe.Pointer(&dummyV2Record), len(dummyV2Record)) - if dec == nil { - t.Fatal("dec is nil") - } - ret, timestamp, record := GetRecord(dec) - if ret < 0 { - t.Fatalf("ret is negative: code %v", ret) - } - - // test timestamp - ts, ok := timestamp.(FLBTime) - if !ok { - t.Fatalf("cast error. Type is %s", reflect.TypeOf(timestamp)) + anotherDummyV2RecordBytes, err := os.ReadFile(testDataFile) + if err != nil { + log.Fatal(err) } + anotherDummyV2Record := (*[39]byte)(anotherDummyV2RecordBytes) - // test - if ts.Time != extractTimeStamp() { - t.Fatalf("GetRecord timestamp extraction does not match manual extractTimeStamp") - } + t.Log(dummyV2Record) + t.Log(*anotherDummyV2Record) - if ts.Unix() != int64(0x64be0eeb) { - t.Errorf("ts.Unix() error. given %d", ts.Unix()) + dummyV2Records := [][39]byte{ + dummyV2Record, + *anotherDummyV2Record, } - // test record - v, ok := record["schema"].(int64) - if !ok { - t.Fatalf("cast error. Type is %s", reflect.TypeOf(record["schema"])) - } - if v != 1 { - t.Errorf(`record["schema"] is not 1 %d`, v) + for i, record := range dummyV2Records { + + dec := NewDecoder(unsafe.Pointer(&record), len(record)) + if dec == nil { + t.Fatal("dec is nil, i", i) + } + + ret, timestamp, record := GetRecord(dec) + if ret < 0 { + t.Fatalf("ret is negative: code %v", ret) + } + + // test timestamp + ts, ok := timestamp.(FLBTime) + if !ok { + t.Fatalf("cast error. Type is %s", reflect.TypeOf(timestamp)) + } + + // test + if ts.Time != extractTimeStamp() { + t.Fatalf("GetRecord timestamp extraction does not match manual extractTimeStamp") + } + + if ts.Unix() != int64(0x64be0eeb) { + t.Errorf("ts.Unix() error. given %d", ts.Unix()) + } + + // test record + v, ok := record["schema"].(int64) + if !ok { + t.Fatalf("cast error. Type is %s", reflect.TypeOf(record["schema"])) + } + if v != 1 { + t.Errorf(`record["schema"] is not 1 %d`, v) + } } } diff --git a/output/testdata/data b/output/testdata/data new file mode 100644 index 0000000000000000000000000000000000000000..a78acb59c95e73cb05ae07c4955721f0aa615494 GIT binary patch literal 39 scmcc1z`($C7eriVNZDHXS7mz^L&H4=?Ip>DC57t@%`7a<3{5Od0RJZp-2eap literal 0 HcmV?d00001 From dd971f6beeba9c27327fb79dfec7f451960bbfbd Mon Sep 17 00:00:00 2001 From: "dave.seddon" Date: Fri, 28 Jul 2023 17:52:48 -0700 Subject: [PATCH 7/7] more testdata --- output/testdata/data1 | Bin 0 -> 8048 bytes output/testdata/data1.yaml | 4201 ++++++++++++++++++++++++++++++++++++ output/testdata/data2 | Bin 0 -> 8048 bytes output/testdata/data3 | Bin 0 -> 8048 bytes 4 files changed, 4201 insertions(+) create mode 100644 output/testdata/data1 create mode 100644 output/testdata/data1.yaml create mode 100644 output/testdata/data2 create mode 100644 output/testdata/data3 diff --git a/output/testdata/data1 b/output/testdata/data1 new file mode 100644 index 0000000000000000000000000000000000000000..e1c304737423d9a3df788e9bdc235e995ae52def GIT binary patch literal 8048 zcmeI%&ui0A902glgW^TJ=uHghNzL!HN$P2v*)M2UW>CRlOkdJ9n5JodOotcIlY&e= zE7GRZ$#9pQ6hwq6_@h$~B0X%EdDvP10pE*WRyMTOc0DAA?@Jze;rqTXpR^>a7>4ay zMd2%^FQ0sO;QZo|Yiro?8Yuha_x%lK-}j5l_tBAm5JmerD^g>DMX@x?rC2saCsQo*gp@Jy0@1l%An z^~NzEaXCRVSc2y?k>K?_AViwc2sUpd7@o^(LM}%{L*|@l#L>SQ0ZUM}K72z7TJ_xw zY|B=C?*^Qc`GlcKNg`>Ifk#gL!i~^@&EdOnh%-H3@O_73NCwsLO5-6+8b@7TilV&B zB+zYAH>OF?)D4?7OBA4@U83ZgQ>Klo%TRzpMTQ$SIG$1Dj6w{_@+o$43+;i?W;p zWzVxJbkw@P+vy4s9Y+h0_dqx_nt=s?J1ks#I4^i&2+Bd@UhmBeCLRQ!aSLSzOKz2b zW)(`FpAu=2-jP~I85I^cXHpHc1rWFvC6FRR(p6Ac+T!l00=GhhkPeIvid9Ej!TR=a zRjc^otf8O486F=CY#bTI86m~YMp)X&2hno@`xvvP0UR#oA--J+FOzy-EA^ z*~vd>XaCn*IeRXhI=Z|OExnzY|Mx4&E3rFON8t!o!Mq`e+% pkG1!@_Q^wGllD9K>n>@R;D?;{zQ9SXYnM8t{bQ^>*4|^0q-!uule}cwn@91c z2XCTnGCvpyG7u5Oi+HIMJdHB8iyqv;ix9dMVgqGbJ?6cJtGCsK`_YA<+DC4m7n4fd0{LUKnM|mqpmIs zNiG8d=<4b~U?J$P7Iaf_s&oaqnoA6XCXW}Wik{)0o=3JuCMzRZcg39Q>>+DCD*lRK;Z@2?ycq{oOB`_LLyD0 zn-o+{O4XJqq^p`q=@lB%(J5JR^hMnas|#od!Xo2?o|wx^*}OzfX0w;F3*_aAg_(JY zh85G*Oc&AP`l3AU!5cW3W|dAM4TfXkP&6W^kc{-QWoRz* zWoRR0)R?Gs{vGF_n`d=sUEJJZ{ncu+|9ovKQLfPPK#fwSz8c z--xyE@}wTndo0$zhjwbDMSF1dWUsVyooZ*>*UolH`X(5d!R``T08(Jl|h+GFi~uRS-~ lqW#10=04iF|Mdkf-Kln=eeFV*v>%JL$J%?UJ-P1}xZl7ZFdqN_ literal 0 HcmV?d00001 diff --git a/output/testdata/data3 b/output/testdata/data3 new file mode 100644 index 0000000000000000000000000000000000000000..ded97d432cc28d9ad18c591113f7aa8451aeca8f GIT binary patch literal 8048 zcmeI!O=#0l902f4K@q`=Ab1dS@B`E&?cgG~m9UtKa5H&p$w-{g9Wq@xbDEzza#9Pl9+7 z2s8umRrCE^mcp^%QB~xkMP+DGO{dc8=$KMNu7_%wiA<`B@`VLFqD~E@YX~8a!qL?0 ziI^bEHRPLmU(NB5zmzp?&7I?l$Txh(^IbzL;8hG%X7E#gno_<{9Z8YJ$pRuwNVkz2 z1g73Nfn*_@&U3&~nI4)Nn^ZVd zw0*<&J#NsPQ3nHb5eM6z=B zR=()@NeM_`TWTF%nR8W5DA8$tmxn-t`>EP&)SaGx$RNwAc z#VR~LXXvMBDAMDBO%tOuOe94yDf0A4S_z?V(!TtlN&Bn%M{<^(*DjKmrakR)>)PcG zX+Icg|HqrUO}}H2_ARtOI@_fEt$n>)+VQ8aE$tF$S-S)}r2S~5J<>k6<^QN2`L?gD zH);QH>}$8Q^X+Pft!sxJ(tbSB9%=7+?avmPw156_vs>DQcD3`ZYv((por|}Qzdv2>mUgjS?LzC?g$`-&kF-bHdtUqNl_u>Q^Z9ORkF~2^Y+bw9A?;@(?UDAL h*Zyv!N&Bz!hq|Rb-mdmo>)K--(mow&kF@ug_TSvhGnW7W literal 0 HcmV?d00001