Skip to content

Commit c59fa80

Browse files
committed
feat: add gcs implementation for storage
1 parent c4533be commit c59fa80

File tree

9 files changed

+878
-7
lines changed

9 files changed

+878
-7
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,19 @@ build = "build.rs"
1010
[dependencies]
1111
# Arrow and DataFusion ecosystem
1212
arrow = "54.0.0"
13-
arrow-array = "54.0.0"
13+
arrow-array = "54.0.0"
1414
arrow-flight = { version = "54.0.0", features = ["tls"] }
1515
arrow-ipc = { version = "54.0.0", features = ["zstd"] }
1616
arrow-json = "54.0.0"
1717
arrow-schema = { version = "54.0.0", features = ["serde"] }
1818
arrow-select = "54.0.0"
1919
datafusion = "45.0.0"
20-
object_store = { version = "0.11.2", features = ["cloud", "aws", "azure"] }
20+
object_store = { version = "0.11.2", features = [
21+
"cloud",
22+
"aws",
23+
"azure",
24+
"gcp",
25+
] }
2126
parquet = "54.0.0"
2227

2328
# Web server and HTTP-related
@@ -34,7 +39,11 @@ tower-http = { version = "0.6.1", features = ["cors"] }
3439
url = "2.4.0"
3540

3641
# Connectors dependencies
37-
rdkafka = { version = "0.37", optional = true, features = ["cmake-build", "tracing", "libz-static"] }
42+
rdkafka = { version = "0.37", optional = true, features = [
43+
"cmake-build",
44+
"tracing",
45+
"libz-static",
46+
] }
3847
sasl2-sys = { version = "0.1.22", optional = true, features = ["vendored"] }
3948

4049
# Authentication and Security
@@ -144,7 +153,14 @@ assets-sha1 = "3e703ef8bedf8ae55fd31713f6267ad14ad3d29d"
144153

145154
[features]
146155
debug = []
147-
kafka = ["rdkafka", "rdkafka/ssl-vendored", "rdkafka/ssl", "rdkafka/sasl", "sasl2-sys", "sasl2-sys/vendored"]
156+
kafka = [
157+
"rdkafka",
158+
"rdkafka/ssl-vendored",
159+
"rdkafka/ssl",
160+
"rdkafka/sasl",
161+
"sasl2-sys",
162+
"sasl2-sys/vendored",
163+
]
148164

149165
[profile.release-lto]
150166
inherits = "release"

Dockerfile.debug

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
# along with this program. If not, see <http://www.gnu.org/licenses/>.
1515

1616
# build stage
17-
FROM rust:1.84.0-bookworm AS builder
17+
FROM docker.io/rust:1.84.0-bookworm AS builder
1818

1919
LABEL org.opencontainers.image.title="Parseable"
2020
LABEL maintainer="Parseable Team <[email protected]>"
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
networks:
2+
parseable-internal:
3+
4+
services:
5+
# query server
6+
parseable-query:
7+
container_name: parseable-query
8+
build:
9+
context: .
10+
dockerfile: Dockerfile.debug
11+
platform: linux/amd64
12+
command: ["parseable", "gcs-store"]
13+
ports:
14+
- "8000:8000"
15+
environment:
16+
- P_S3_URL=http://minio:9000
17+
- P_S3_ACCESS_KEY=parseable
18+
- P_S3_SECRET_KEY=supersecret
19+
- P_S3_REGION=us-east-1
20+
- P_S3_BUCKET=parseable
21+
- P_STAGING_DIR=/tmp/data
22+
- P_USERNAME=parseableadmin
23+
- P_PASSWORD=parseableadmin
24+
- P_CHECK_UPDATE=false
25+
- P_PARQUET_COMPRESSION_ALGO=snappy
26+
- P_MODE=query
27+
- RUST_LOG=warn
28+
networks:
29+
- parseable-internal
30+
healthcheck:
31+
test: ["CMD", "curl", "-f", "http://localhost:8000/api/v1/liveness"]
32+
interval: 15s
33+
timeout: 20s
34+
retries: 5
35+
deploy:
36+
restart_policy:
37+
condition: on-failure
38+
delay: 20s
39+
max_attempts: 3
40+
# ingest server one
41+
parseable-ingest-one:
42+
container_name: parseable-ingest-one
43+
build:
44+
context: .
45+
dockerfile: Dockerfile.debug
46+
platform: linux/amd64
47+
command: ["parseable", "gcs-store"]
48+
ports:
49+
- "8000"
50+
environment:
51+
- P_S3_URL=http://minio:9000
52+
- P_S3_ACCESS_KEY=parseable
53+
- P_S3_SECRET_KEY=supersecret
54+
- P_S3_REGION=us-east-1
55+
- P_S3_BUCKET=parseable
56+
- P_STAGING_DIR=/tmp/data
57+
- P_USERNAME=parseableadmin
58+
- P_PASSWORD=parseableadmin
59+
- P_CHECK_UPDATE=false
60+
- P_PARQUET_COMPRESSION_ALGO=snappy
61+
- P_MODE=ingest
62+
- P_INGESTOR_ENDPOINT=parseable-ingest-one:8000
63+
- RUST_LOG=warn
64+
networks:
65+
- parseable-internal
66+
healthcheck:
67+
test: ["CMD", "curl", "-f", "http://localhost:8000/api/v1/liveness"]
68+
interval: 15s
69+
timeout: 20s
70+
retries: 5
71+
depends_on:
72+
parseable-query:
73+
condition: service_healthy
74+
deploy:
75+
restart_policy:
76+
condition: on-failure
77+
delay: 20s
78+
max_attempts: 3
79+
80+
quest:
81+
platform: linux/amd64
82+
image: ghcr.io/parseablehq/quest:main
83+
pull_policy: always
84+
command:
85+
[
86+
"load",
87+
"http://parseable-query:8000",
88+
"parseableadmin",
89+
"parseableadmin",
90+
"20",
91+
"10",
92+
"5m",
93+
"minio:9000",
94+
"parseable",
95+
"supersecret",
96+
"parseable",
97+
"http://parseable-ingest-one:8000",
98+
"parseableadmin",
99+
"parseableadmin",
100+
]
101+
networks:
102+
- parseable-internal
103+
depends_on:
104+
parseable-query:
105+
condition: service_healthy
106+
parseable-ingest-one:
107+
condition: service_healthy
108+
minio:
109+
condition: service_healthy
110+
deploy:
111+
restart_policy:
112+
condition: on-failure
113+
delay: 20s
114+
max_attempts: 3

src/cli.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use crate::connectors::kafka::config::KafkaConfig;
2727
use crate::{
2828
oidc::{self, OpenidConfig},
2929
option::{validation, Compression, Mode},
30-
storage::{AzureBlobConfig, FSConfig, S3Config},
30+
storage::{AzureBlobConfig, FSConfig, GCSConfig, S3Config},
3131
};
3232

3333
/// Default username and password for Parseable server, used by default for local mode.
@@ -80,6 +80,9 @@ pub enum StorageOptions {
8080

8181
#[command(name = "blob-store")]
8282
Blob(BlobStoreArgs),
83+
84+
#[command(name = "gcs-store")]
85+
GCS(GCSStoreArgs),
8386
}
8487

8588
#[derive(Parser)]
@@ -115,6 +118,17 @@ pub struct BlobStoreArgs {
115118
pub kafka: KafkaConfig,
116119
}
117120

121+
#[derive(Parser)]
122+
pub struct GCSStoreArgs {
123+
#[command(flatten)]
124+
pub options: Options,
125+
#[command(flatten)]
126+
pub storage: GCSConfig,
127+
#[cfg(feature = "kafka")]
128+
#[command(flatten)]
129+
pub kafka: KafkaConfig,
130+
}
131+
118132
#[derive(Parser, Debug, Default)]
119133
pub struct Options {
120134
// Authentication
@@ -338,7 +352,7 @@ pub struct Options {
338352

339353
#[arg(
340354
long,
341-
env = "P_MEMORY_THRESHOLD",
355+
env = "P_MEMORY_THRESHOLD",
342356
default_value = "80.0",
343357
value_parser = validation::validate_percentage,
344358
help = "Memory utilization threshold percentage (0.0-100.0) for resource monitoring"

src/metrics/storage.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,3 +125,42 @@ pub mod azureblob {
125125
}
126126
}
127127
}
128+
129+
pub mod gcs {
130+
use crate::{metrics::METRICS_NAMESPACE, storage::GCSConfig};
131+
use once_cell::sync::Lazy;
132+
use prometheus::{HistogramOpts, HistogramVec};
133+
134+
use super::StorageMetrics;
135+
136+
pub static REQUEST_RESPONSE_TIME: Lazy<HistogramVec> = Lazy::new(|| {
137+
HistogramVec::new(
138+
HistogramOpts::new("gcs_response_time", "gcs Request Latency")
139+
.namespace(METRICS_NAMESPACE),
140+
&["method", "status"],
141+
)
142+
.expect("metric can be created")
143+
});
144+
145+
pub static QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME: Lazy<HistogramVec> = Lazy::new(|| {
146+
HistogramVec::new(
147+
HistogramOpts::new("query_gcs_response_time", "GCS Request Latency")
148+
.namespace(METRICS_NAMESPACE),
149+
&["method", "status"],
150+
)
151+
.expect("metric can be created")
152+
});
153+
154+
impl StorageMetrics for GCSConfig {
155+
fn register_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) {
156+
handler
157+
.registry
158+
.register(Box::new(REQUEST_RESPONSE_TIME.clone()))
159+
.expect("metric can be registered");
160+
handler
161+
.registry
162+
.register(Box::new(QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME.clone()))
163+
.expect("metric can be registered");
164+
}
165+
}
166+
}

src/parseable/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,12 @@ pub static PARSEABLE: Lazy<Parseable> = Lazy::new(|| match Cli::parse().storage
117117
args.kafka,
118118
Arc::new(args.storage),
119119
),
120+
StorageOptions::GCS(args) => Parseable::new(
121+
args.options,
122+
#[cfg(feature = "kafka")]
123+
args.kafka,
124+
Arc::new(args.storage),
125+
),
120126
});
121127

122128
/// All state related to parseable, in one place.

0 commit comments

Comments
 (0)