Skip to content
This repository was archived by the owner on Aug 30, 2025. It is now read-only.

Commit d5bfd2b

Browse files
author
alishakawaguchi
authored
Updates processors to convert go maps to json bits for non json columns (#3473)
1 parent a6e0fbb commit d5bfd2b

File tree

4 files changed

+30
-6
lines changed

4 files changed

+30
-6
lines changed

worker/pkg/benthos/sql/processor_neosync_mysql.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77

88
"github.com/doug-martin/goqu/v9"
9+
"github.com/nucleuscloud/neosync/internal/gotypeutil"
910
neosynctypes "github.com/nucleuscloud/neosync/internal/neosync-types"
1011
neosync_benthos "github.com/nucleuscloud/neosync/worker/pkg/benthos"
1112
"github.com/redpanda-data/benthos/v4/public/service"
@@ -154,8 +155,7 @@ func getMysqlValue(
154155
return value, nil
155156
}
156157

157-
switch datatype {
158-
case "json":
158+
if datatype == "json" {
159159
if v, ok := value.([]byte); ok {
160160
validJson, err := getValidJson(v)
161161
if err != nil {
@@ -171,9 +171,17 @@ func getMysqlValue(
171171
return nil, fmt.Errorf("unable to marshal mysql json to bits: %w", err)
172172
}
173173
return bits, nil
174-
default:
175-
return value, nil
176174
}
175+
176+
if gotypeutil.IsMap(value) {
177+
bits, err := json.Marshal(value)
178+
if err != nil {
179+
return nil, fmt.Errorf("unable to marshal go map to json bits: %w", err)
180+
}
181+
return bits, nil
182+
}
183+
184+
return value, nil
177185
}
178186

179187
func getMysqlNeosyncValue(root any) (value any, isNeosyncValue bool, err error) {

worker/pkg/benthos/sql/processor_neosync_pgx.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,9 +191,15 @@ func getPgxValue(
191191
return string(byteSlice), nil
192192
}
193193
return value, nil
194-
default:
195-
return value, nil
196194
}
195+
if gotypeutil.IsMap(value) {
196+
bits, err := json.Marshal(value)
197+
if err != nil {
198+
return nil, fmt.Errorf("unable to marshal go map to json bits: %w", err)
199+
}
200+
return bits, nil
201+
}
202+
return value, nil
197203
}
198204

199205
func getPgxNeosyncValue(root any) (value any, isNeosyncValue bool, err error) {

worker/pkg/benthos/utils.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ func IsCriticalError(errMsg string) bool {
4747
"ON CONFLICT does not support deferrable unique constraints",
4848
"ON CONFLICT",
4949
"SQLSTATE", // any sqlstate error should result in ending
50+
"goqu_encode_error",
5051
"doesn't have a default value",
5152
"column does not allow nulls",
5253
}

worker/pkg/query-builder/query-builder.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package querybuilder
22

33
import (
44
"fmt"
5+
"strings"
56

67
"github.com/doug-martin/goqu/v9"
78
mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1"
@@ -115,6 +116,10 @@ func BuildInsertQuery(
115116

116117
query, args, err := insert.ToSQL()
117118
if err != nil {
119+
// check if it's a goqu encoding error and sanitize it
120+
if strings.Contains(err.Error(), "goqu_encode_error") {
121+
return "", nil, fmt.Errorf("goqu_encode_error: Unable to encode value")
122+
}
118123
return "", nil, err
119124
}
120125
return query, args, nil
@@ -147,6 +152,10 @@ func BuildUpdateQuery(
147152

148153
query, _, err := update.ToSQL()
149154
if err != nil {
155+
// check if it's a goqu encoding error and sanitize it
156+
if strings.Contains(err.Error(), "goqu_encode_error") {
157+
return "", fmt.Errorf("goqu_encode_error: Unable to encode value")
158+
}
150159
return "", err
151160
}
152161
return query, nil

0 commit comments

Comments
 (0)