@@ -29,8 +29,9 @@ import (
2929 "github.com/ClickHouse/ch-go/proto"
3030)
3131
32- const JSONObjectSerializationVersion uint64 = 0
32+ const JSONDeprecatedObjectSerializationVersion uint64 = 0
3333const JSONStringSerializationVersion uint64 = 1
34+ const JSONObjectSerializationVersion uint64 = 3
3435const JSONUnsetSerializationVersion uint64 = math .MaxUint64
3536const DefaultMaxDynamicPaths = 1024
3637
@@ -51,13 +52,13 @@ type JSON struct {
5152 skipPaths []string
5253 skipPathsIndex map [string ]int
5354
55+ totalDynamicPaths int
5456 dynamicPaths []string
5557 dynamicPathsIndex map [string ]int
5658 dynamicColumns []* Dynamic
5759
58- maxDynamicPaths int
59- maxDynamicTypes int
60- totalDynamicPaths int
60+ maxDynamicPaths int
61+ maxDynamicTypes int
6162}
6263
6364func (c * JSON ) parse (t Type , tz * time.Location ) (_ * JSON , err error ) {
@@ -544,17 +545,29 @@ func (c *JSON) appendRowString(v any) error {
544545 return nil
545546}
546547
547- func (c * JSON ) encodeObjectHeader (buffer * proto.Buffer ) {
548- buffer .PutUVarInt (uint64 (c .maxDynamicPaths ))
548+ func (c * JSON ) encodeObjectHeader (buffer * proto.Buffer ) error {
549549 buffer .PutUVarInt (uint64 (c .totalDynamicPaths ))
550550
551551 for _ , dynamicPath := range c .dynamicPaths {
552552 buffer .PutString (dynamicPath )
553553 }
554554
555- for _ , col := range c .dynamicColumns {
556- col .encodeHeader (buffer )
555+ for i , col := range c .typedColumns {
556+ if serialize , ok := col .(CustomSerialization ); ok {
557+ if err := serialize .WriteStatePrefix (buffer ); err != nil {
558+ return fmt .Errorf ("failed to write prefix for typed path \" %s\" in json with type %s: %w" , c .typedPaths [i ], string (col .Type ()), err )
559+ }
560+ }
561+ }
562+
563+ for i , col := range c .dynamicColumns {
564+ err := col .encodeHeader (buffer )
565+ if err != nil {
566+ return fmt .Errorf ("failed to encode header for json dynamic path \" %s\" : %w" , c .dynamicPaths [i ], err )
567+ }
557568 }
569+
570+ return nil
558571}
559572
560573func (c * JSON ) encodeObjectData (buffer * proto.Buffer ) {
@@ -565,11 +578,6 @@ func (c *JSON) encodeObjectData(buffer *proto.Buffer) {
565578 for _ , col := range c .dynamicColumns {
566579 col .encodeData (buffer )
567580 }
568-
569- // SharedData per row, empty for now.
570- for i := 0 ; i < c .rows ; i ++ {
571- buffer .PutUInt64 (0 )
572- }
573581}
574582
575583func (c * JSON ) encodeStringData (buffer * proto.Buffer ) {
@@ -580,9 +588,7 @@ func (c *JSON) WriteStatePrefix(buffer *proto.Buffer) error {
580588 switch c .serializationVersion {
581589 case JSONObjectSerializationVersion :
582590 buffer .PutUInt64 (JSONObjectSerializationVersion )
583- c .encodeObjectHeader (buffer )
584-
585- return nil
591+ return c .encodeObjectHeader (buffer )
586592 case JSONStringSerializationVersion :
587593 buffer .PutUInt64 (JSONStringSerializationVersion )
588594
@@ -632,20 +638,14 @@ func (c *JSON) Reset() {
632638}
633639
634640func (c * JSON ) decodeObjectHeader (reader * proto.Reader ) error {
635- maxDynamicPaths , err := reader .UVarInt ()
636- if err != nil {
637- return fmt .Errorf ("failed to read max dynamic paths for json column: %w" , err )
638- }
639- c .maxDynamicPaths = int (maxDynamicPaths )
640-
641641 totalDynamicPaths , err := reader .UVarInt ()
642642 if err != nil {
643643 return fmt .Errorf ("failed to read total dynamic paths for json column: %w" , err )
644644 }
645645 c .totalDynamicPaths = int (totalDynamicPaths )
646646
647- c .dynamicPaths = make ([]string , 0 , totalDynamicPaths )
648- for i := 0 ; i < int ( totalDynamicPaths ) ; i ++ {
647+ c .dynamicPaths = make ([]string , 0 , c . totalDynamicPaths )
648+ for i := 0 ; i < c . totalDynamicPaths ; i ++ {
649649 dynamicPath , err := reader .Str ()
650650 if err != nil {
651651 return fmt .Errorf ("failed to read dynamic path name bytes at index %d for json column: %w" , i , err )
@@ -655,14 +655,22 @@ func (c *JSON) decodeObjectHeader(reader *proto.Reader) error {
655655 c .dynamicPathsIndex [dynamicPath ] = len (c .dynamicPaths ) - 1
656656 }
657657
658- c .dynamicColumns = make ([]* Dynamic , 0 , totalDynamicPaths )
658+ for i , col := range c .typedColumns {
659+ if serialize , ok := col .(CustomSerialization ); ok {
660+ if err := serialize .ReadStatePrefix (reader ); err != nil {
661+ return fmt .Errorf ("failed to read prefix for typed path \" %s\" with type %s in json: %w" , c .typedPaths [i ], string (col .Type ()), err )
662+ }
663+ }
664+ }
665+
666+ c .dynamicColumns = make ([]* Dynamic , 0 , c .totalDynamicPaths )
659667 for _ , dynamicPath := range c .dynamicPaths {
660668 parsedColDynamic , _ := Type ("Dynamic" ).Column ("" , c .tz )
661669 colDynamic := parsedColDynamic .(* Dynamic )
662670
663671 err := colDynamic .decodeHeader (reader )
664672 if err != nil {
665- return fmt .Errorf ("failed to decode dynamic header at path %s for json column: %w" , dynamicPath , err )
673+ return fmt .Errorf ("failed to decode dynamic header at path \" %s \" for json column: %w" , dynamicPath , err )
666674 }
667675
668676 c .dynamicColumns = append (c .dynamicColumns , colDynamic )
@@ -677,7 +685,7 @@ func (c *JSON) decodeObjectData(reader *proto.Reader, rows int) error {
677685
678686 err := col .Decode (reader , rows )
679687 if err != nil {
680- return fmt .Errorf ("failed to decode %s typed path %s for json column: %w" , col .Type (), typedPath , err )
688+ return fmt .Errorf ("failed to decode %s typed path \" %s \" for json column: %w" , col .Type (), typedPath , err )
681689 }
682690 }
683691
@@ -686,16 +694,10 @@ func (c *JSON) decodeObjectData(reader *proto.Reader, rows int) error {
686694
687695 err := col .decodeData (reader , rows )
688696 if err != nil {
689- return fmt .Errorf ("failed to decode dynamic path %s for json column: %w" , dynamicPath , err )
697+ return fmt .Errorf ("failed to decode dynamic path \" %s \" for json column: %w" , dynamicPath , err )
690698 }
691699 }
692700
693- // SharedData per row, ignored for now. May cause stream offset issues if present
694- _ , err := reader .ReadRaw (8 * rows ) // one UInt64 per row
695- if err != nil {
696- return fmt .Errorf ("failed to read shared data for json column: %w" , err )
697- }
698-
699701 return nil
700702}
701703
@@ -712,6 +714,8 @@ func (c *JSON) ReadStatePrefix(reader *proto.Reader) error {
712714 c .serializationVersion = jsonSerializationVersion
713715
714716 switch jsonSerializationVersion {
717+ case JSONDeprecatedObjectSerializationVersion :
718+ return fmt .Errorf ("deprecated json serialization version: %d, enable \" output_format_native_use_flattened_dynamic_and_json_serialization\" in your settings" , jsonSerializationVersion )
715719 case JSONObjectSerializationVersion :
716720 err := c .decodeObjectHeader (reader )
717721 if err != nil {
0 commit comments