Skip to content

Commit 0ac809a

Browse files
committed
feat: BigQuery sink
1 parent 0d37dab commit 0ac809a

File tree

12 files changed

+829
-17
lines changed

12 files changed

+829
-17
lines changed

Cargo.lock

Lines changed: 203 additions & 14 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dozer-cli/src/errors.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ pub enum OrchestrationError {
107107
Aborted,
108108
#[error("Failed to create lambda runtime: {0}")]
109109
CreateLambdaRuntime(#[from] dozer_lambda::Error),
110+
#[error("SinkError: {0}")]
111+
SinkError(#[from] BoxedError),
110112
}
111113

112114
#[derive(Error, Debug)]

dozer-cli/src/pipeline/builder.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,13 @@ use dozer_core::app::PipelineEntryPoint;
1010
use dozer_core::node::SinkFactory;
1111
use dozer_core::DEFAULT_PORT_HANDLE;
1212
use dozer_ingestion::{get_connector, get_connector_info_table};
13+
use dozer_sinks::bigquery::BigQuerySinkFactory;
1314
use dozer_sql::builder::statement_to_pipeline;
1415
use dozer_sql::builder::{OutputNodeInfo, QueryContext};
1516
use dozer_tracing::LabelsAndProgress;
1617
use dozer_types::log::debug;
1718
use dozer_types::models::connection::Connection;
19+
use dozer_types::models::endpoint::BigQuery;
1820
use dozer_types::models::flags::Flags;
1921
use dozer_types::models::source::Source;
2022
use dozer_types::models::udf_config::UdfConfig;
@@ -57,6 +59,7 @@ pub struct EndpointLog {
5759
pub enum EndpointLogKind {
5860
Api { log: Arc<Mutex<Log>> },
5961
Dummy,
62+
BigQuery(BigQuery),
6063
}
6164

6265
pub struct PipelineBuilder<'a> {
@@ -284,6 +287,7 @@ impl<'a> PipelineBuilder<'a> {
284287
self.labels.clone(),
285288
)),
286289
EndpointLogKind::Dummy => Box::new(DummySinkFactory),
290+
EndpointLogKind::BigQuery(config) => Box::new(BigQuerySinkFactory::new(config)),
287291
};
288292

289293
match table_info {

dozer-cli/src/simple/executor.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use dozer_cache::dozer_log::home_dir::{BuildPath, HomeDir};
55
use dozer_cache::dozer_log::replication::Log;
66
use dozer_core::checkpoint::{CheckpointOptions, OptionCheckpoint};
77
use dozer_tracing::LabelsAndProgress;
8-
use dozer_types::models::endpoint::{Endpoint, EndpointKind};
8+
use dozer_types::models::endpoint::{BigQuery, Endpoint, EndpointKind};
99
use dozer_types::models::flags::Flags;
1010
use tokio::runtime::Runtime;
1111
use tokio::sync::Mutex;
@@ -44,6 +44,7 @@ struct ExecutorEndpoint {
4444
enum ExecutorEndpointKind {
4545
Api { log_endpoint: LogEndpoint },
4646
Dummy,
47+
BigQuery(BigQuery),
4748
}
4849

4950
impl<'a> Executor<'a> {
@@ -85,6 +86,7 @@ impl<'a> Executor<'a> {
8586
ExecutorEndpointKind::Api { log_endpoint }
8687
}
8788
EndpointKind::Dummy => ExecutorEndpointKind::Dummy,
89+
EndpointKind::BigQuery(config) => ExecutorEndpointKind::BigQuery(config.clone()),
8890
};
8991

9092
executor_endpoints.push(ExecutorEndpoint {
@@ -140,6 +142,7 @@ impl<'a> Executor<'a> {
140142
log: log_endpoint.log,
141143
},
142144
ExecutorEndpointKind::Dummy => EndpointLogKind::Dummy,
145+
ExecutorEndpointKind::BigQuery(config) => EndpointLogKind::BigQuery(config),
143146
};
144147
EndpointLog {
145148
table_name: endpoint.table_name,

dozer-sinks/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ edition = "2021"
77

88
[dependencies]
99
dozer-sinks-snowflake = { path = "./snowflake", optional = true }
10+
dozer-sinks-bigquery = { path = "./bigquery" }
1011

1112
[features]
1213
snowflake = ["dep:dozer-sinks-snowflake"]

dozer-sinks/bigquery/Cargo.toml

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
[package]
2+
name = "dozer-sinks-bigquery"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
7+
8+
[dependencies]
9+
dozer-api = { path = "../../dozer-api" }
10+
dozer-types = { path = "../../dozer-types" }
11+
dozer-log = { path = "../../dozer-log" }
12+
dozer-core = { path = "../../dozer-core" }
13+
dozer-recordstore = { path = "../../dozer-recordstore" }
14+
itertools = "0.10.5"
15+
futures-util = "0.3.28"
16+
chrono = "0.4.31"
17+
gcp-bigquery-client = "0.18.0"
18+
object_store = { version = "0.9.0", features = ["gcp"] }
19+
parquet = "48.0.1"
20+
yup-oauth2 = "8.3.2"
21+
bytes = "1.5.0"
22+
html-escape = "0.2.13"

0 commit comments

Comments
 (0)