Skip to content

Commit a86d433

Browse files
author
fn500i
committed
adding json support for bq sink
Signed-off-by: fn500i <[email protected]>
1 parent 08dfc26 commit a86d433

File tree

2 files changed

+7
-2
lines changed

2 files changed

+7
-2
lines changed

src/connectors/impls/gbq/writer/sink.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use googapis::google::cloud::bigquery::storage::v1::{
3131
use prost::encoding::WireType;
3232
use prost::Message;
3333
use prost_types::{field_descriptor_proto, DescriptorProto, FieldDescriptorProto};
34+
use simd_json_derive::Serialize;
3435
use std::collections::hash_map::Entry;
3536
use std::marker::PhantomData;
3637
use std::{collections::HashMap, time::Duration};
@@ -268,7 +269,11 @@ fn encode_field(val: &Value, field: &Field, result: &mut Vec<u8>) -> Result<()>
268269
);
269270
}
270271
TableType::Json => {
271-
warn!("Found a field of type JSON, this is not supported, ignoring.");
272+
prost::encoding::string::encode(
273+
tag,
274+
&val.json_string()?,
275+
result,
276+
);
272277
}
273278
TableType::Interval => {
274279
warn!("Found a field of type Interval, this is not supported, ignoring.");

src/connectors/impls/psql_repl/postgres_replication.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ pub(crate) async fn replication(connection_config:MzConfig, publication : &str,
346346
let source_id = "source_id";
347347
let mut _replication_lsn = PgLsn::from(0);
348348

349-
// println!("======== BEGIN SNA≠PSHOT ==========");
349+
// println!("======== BEGIN SNAPSHOT ==========");
350350

351351
// Validate publication tables against the state snapshot
352352
// dbg!(&publication_tables);

0 commit comments

Comments
 (0)