Skip to content

Commit 303a346

Browse files
authored
[ENH] Add GCS client to storage (#5869)
## Description of changes _Summarize the changes made by this PR._ - Improvements & Bug fixes - N/A - New functionality - Add GCS support with `object_store` crate ## Test plan _How are these changes tested?_ - [ ] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust ## Migration plan _Are there any migrations, or any forwards/backwards compatibility changes needed in order to make sure this change deploys reliably?_ ## Observability plan _What is the plan to instrument and monitor this change?_ ## Documentation Changes _Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the [docs section](https://github.com/chroma-core/chroma/tree/main/docs/docs.trychroma.com)?_
1 parent 5ce5ea0 commit 303a346

File tree

9 files changed

+1280
-150
lines changed

9 files changed

+1280
-150
lines changed

Cargo.lock

Lines changed: 94 additions & 100 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ members = ["rust/benchmark", "rust/blockstore", "rust/cache", "rust/chroma", "ru
77
anyhow = "1.0"
88
arrow = "55.1"
99
async-trait = "0.1"
10+
aws-config = { version = "1.5", features = ["behavior-version-latest"] }
11+
aws-sdk-s3 = "1.63"
12+
aws-smithy-types = "1.2"
1013
axum = { version = "0.8", features = ["macros"] }
1114
bytes = "1.10"
1215
chrono = { version = "0.4", features = ["serde"] }
@@ -16,6 +19,7 @@ flatbuffers = "25.2.10"
1619
futures = "0.3"
1720
futures-core = "0.3"
1821
http-body-util = "0.1.3"
22+
object_store = { version = "0.12", features = ["gcp"] }
1923
lazy_static = { version = "1.4" }
2024
lexical-core = "1.0"
2125
murmur3 = "0.5.2"

rust/storage/Cargo.toml

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,18 @@ edition = "2021"
77
path = "src/lib.rs"
88

99
[dependencies]
10-
bytes = "1.8.0"
11-
aws-sdk-s3 = "1.63"
12-
aws-smithy-types = "1.2"
13-
aws-config = { version = "1.5", features = ["behavior-version-latest"] }
14-
object_store = { version = "0.11", features = ["aws"] }
15-
1610
async-trait = { workspace = true }
11+
aws-sdk-s3 = { workspace = true }
12+
aws-smithy-types = { workspace = true }
13+
aws-config = { workspace = true }
14+
bytes = { workspace = true }
1715
futures = { workspace = true }
16+
object_store = { workspace = true }
1817
rand = { workspace = true }
1918
serde = { workspace = true }
19+
serde_json = { workspace = true }
2020
tempfile = { workspace = true }
21-
thiserror.workspace = true
21+
thiserror = { workspace = true }
2222
tokio = { workspace = true }
2323
tracing = { workspace = true }
2424
opentelemetry = { workspace = true }

rust/storage/src/config.rs

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,17 @@ use serde::{Deserialize, Serialize};
44
/// The configuration for the chosen storage.
55
/// # Options
66
/// - S3: The configuration for the s3 storage.
7+
/// - Object: The configuration for the object storage.
8+
/// - Local: The configuration for local filesystem storage.
9+
/// - AdmissionControlledS3: S3 with rate limiting and request coalescing.
710
/// # Notes
811
/// See config.rs in the root of the worker crate for an example of how to use
912
/// config files to configure the worker.
1013
pub enum StorageConfig {
1114
#[serde(alias = "s3")]
1215
S3(S3StorageConfig),
16+
#[serde(alias = "object")]
17+
Object(ObjectStorageConfig),
1318
#[serde(alias = "local")]
1419
Local(LocalStorageConfig),
1520
#[serde(alias = "admissioncontrolleds3")]
@@ -156,3 +161,80 @@ impl Default for RateLimitingConfig {
156161
RateLimitingConfig::CountBasedPolicy(CountBasedPolicyConfig::default())
157162
}
158163
}
164+
165+
#[derive(Deserialize, Debug, Clone, Serialize)]
166+
pub enum ObjectStorageProvider {
167+
/// GCS uses Application Default Credentials (ADC) automatically
168+
GCS,
169+
}
170+
171+
#[derive(Deserialize, Debug, Clone, Serialize)]
172+
/// The configuration for the ObjectStorage type
173+
/// # Fields
174+
/// - bucket: The name of the bucket to use.
175+
/// - connect_timeout_ms: Connection timeout in milliseconds.
176+
/// - download_part_size_bytes: Size of each part for parallel range downloads.
177+
/// - provider: Which backend to use for storage.
178+
/// - request_retry_count: Number of retry attempts for failed requests.
179+
/// - request_timeout_ms: Request timeout in milliseconds.
180+
/// - upload_part_size_bytes: Size of each part in multipart uploads.
181+
pub struct ObjectStorageConfig {
182+
#[serde(default = "ObjectStorageConfig::default_bucket")]
183+
pub bucket: String,
184+
#[serde(default = "ObjectStorageConfig::default_connect_timeout_ms")]
185+
pub connect_timeout_ms: u64,
186+
#[serde(default = "ObjectStorageConfig::default_download_part_size_bytes")]
187+
pub download_part_size_bytes: u64,
188+
#[serde(default = "ObjectStorageConfig::default_provider")]
189+
pub provider: ObjectStorageProvider,
190+
#[serde(default = "ObjectStorageConfig::default_request_retry_count")]
191+
pub request_retry_count: usize,
192+
#[serde(default = "ObjectStorageConfig::default_request_timeout_ms")]
193+
pub request_timeout_ms: u64,
194+
#[serde(default = "ObjectStorageConfig::default_upload_part_size_bytes")]
195+
pub upload_part_size_bytes: u64,
196+
}
197+
198+
impl ObjectStorageConfig {
199+
fn default_bucket() -> String {
200+
"chroma-storage".to_string()
201+
}
202+
203+
fn default_connect_timeout_ms() -> u64 {
204+
5000
205+
}
206+
207+
fn default_download_part_size_bytes() -> u64 {
208+
8 * 1024 * 1024 // 8 MB
209+
}
210+
211+
fn default_provider() -> ObjectStorageProvider {
212+
ObjectStorageProvider::GCS
213+
}
214+
215+
fn default_request_retry_count() -> usize {
216+
3
217+
}
218+
219+
fn default_request_timeout_ms() -> u64 {
220+
60000
221+
}
222+
223+
fn default_upload_part_size_bytes() -> u64 {
224+
512 * 1024 * 1024 // 512 MB
225+
}
226+
}
227+
228+
impl Default for ObjectStorageConfig {
229+
fn default() -> Self {
230+
ObjectStorageConfig {
231+
bucket: Self::default_bucket(),
232+
connect_timeout_ms: Self::default_connect_timeout_ms(),
233+
download_part_size_bytes: Self::default_download_part_size_bytes(),
234+
provider: Self::default_provider(),
235+
request_retry_count: Self::default_request_retry_count(),
236+
request_timeout_ms: Self::default_request_timeout_ms(),
237+
upload_part_size_bytes: Self::default_upload_part_size_bytes(),
238+
}
239+
}
240+
}

rust/storage/src/lib.rs

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use chroma_error::{ChromaError, ErrorCodes};
99
pub mod admissioncontrolleds3;
1010
pub mod config;
1111
pub mod local;
12+
pub mod object_storage;
1213
pub mod s3;
1314
pub mod stream;
1415
use local::LocalStorage;
@@ -173,7 +174,7 @@ impl ChromaError for StorageError {
173174
StorageError::PermissionDenied { .. } => ErrorCodes::PermissionDenied,
174175
StorageError::Unauthenticated { .. } => ErrorCodes::Unauthenticated,
175176
StorageError::UnknownConfigurationKey { .. } => ErrorCodes::InvalidArgument,
176-
StorageError::Backoff { .. } => ErrorCodes::ResourceExhausted,
177+
StorageError::Backoff => ErrorCodes::ResourceExhausted,
177178
StorageError::CallbackError { .. } => ErrorCodes::Internal,
178179
}
179180
}
@@ -193,8 +194,6 @@ pub enum PathError {
193194
},
194195
}
195196

196-
// END BORROWED CODE
197-
198197
#[derive(Error, Debug)]
199198
pub enum StorageConfigError {
200199
#[error("Invalid storage config")]
@@ -207,6 +206,7 @@ pub enum StorageConfigError {
207206
#[allow(clippy::large_enum_variant)]
208207
pub enum Storage {
209208
S3(s3::S3Storage),
209+
Object(object_storage::ObjectStorage),
210210
Local(local::LocalStorage),
211211
AdmissionControlledS3(admissioncontrolleds3::AdmissionControlledS3Storage),
212212
}
@@ -215,6 +215,7 @@ impl std::fmt::Debug for Storage {
215215
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
216216
match self {
217217
Storage::S3(_) => f.debug_tuple("S3").finish(),
218+
Storage::Object(_) => f.debug_tuple("Object").finish(),
218219
Storage::Local(_) => f.debug_tuple("Local").finish(),
219220
Storage::AdmissionControlledS3(_) => f.debug_tuple("AdmissionControlledS3").finish(),
220221
}
@@ -235,6 +236,7 @@ impl Storage {
235236
pub fn bucket_name(&self) -> Option<&str> {
236237
match self {
237238
Storage::S3(s3) => Some(&s3.bucket),
239+
Storage::Object(obj) => Some(&obj.bucket),
238240
Storage::AdmissionControlledS3(ac_s3) => Some(&ac_s3.storage.bucket),
239241
Storage::Local(_) => None,
240242
}
@@ -243,6 +245,10 @@ impl Storage {
243245
pub async fn get(&self, key: &str, options: GetOptions) -> Result<Arc<Vec<u8>>, StorageError> {
244246
match self {
245247
Storage::S3(s3) => s3.get(key, options).await,
248+
Storage::Object(obj) => obj
249+
.get(key, options)
250+
.await
251+
.map(|(bytes, _)| Vec::from(bytes).into()),
246252
Storage::Local(local) => local.get(key).await,
247253
Storage::AdmissionControlledS3(admission_controlled_storage) => {
248254
admission_controlled_storage.get(key, options).await
@@ -267,6 +273,11 @@ impl Storage {
267273
let fetch_result = fetch_fn(Ok(res.0)).await?;
268274
Ok((fetch_result, res.1))
269275
}
276+
Storage::Object(obj) => {
277+
let (bytes, etag) = obj.get(key, options).await?;
278+
let fetch_result = fetch_fn(Ok(Vec::from(bytes).into())).await?;
279+
Ok((fetch_result, Some(etag)))
280+
}
270281
Storage::Local(local) => {
271282
let res = local.get_with_e_tag(key).await?;
272283
let fetch_result = fetch_fn(Ok(res.0)).await?;
@@ -318,6 +329,7 @@ impl Storage {
318329
{
319330
match self {
320331
Storage::S3(_) => self.fetch_batch_generic(keys, options, fetch_fn).await,
332+
Storage::Object(_) => self.fetch_batch_generic(keys, options, fetch_fn).await,
321333
Storage::Local(_) => self.fetch_batch_generic(keys, options, fetch_fn).await,
322334
Storage::AdmissionControlledS3(admission_controlled_storage) => {
323335
admission_controlled_storage
@@ -334,6 +346,10 @@ impl Storage {
334346
) -> Result<(Arc<Vec<u8>>, Option<ETag>), StorageError> {
335347
match self {
336348
Storage::S3(s3) => s3.get_with_e_tag(key).await,
349+
Storage::Object(obj) => {
350+
let (bytes, etag) = obj.get(key, options).await?;
351+
Ok((Vec::from(bytes).into(), Some(etag)))
352+
}
337353
Storage::Local(local) => local.get_with_e_tag(key).await,
338354
Storage::AdmissionControlledS3(admission_controlled_storage) => {
339355
admission_controlled_storage
@@ -350,6 +366,7 @@ impl Storage {
350366
pub async fn confirm_same(&self, key: &str, e_tag: &ETag) -> Result<bool, StorageError> {
351367
match self {
352368
Storage::S3(s3) => s3.confirm_same(key, e_tag).await,
369+
Storage::Object(obj) => obj.confirm_same(key, e_tag).await,
353370
Storage::Local(local) => local.confirm_same(key, e_tag).await,
354371
Storage::AdmissionControlledS3(as3) => as3.confirm_same(key, e_tag).await,
355372
}
@@ -363,6 +380,7 @@ impl Storage {
363380
) -> Result<Option<ETag>, StorageError> {
364381
match self {
365382
Storage::S3(s3) => s3.put_file(key, path, options).await,
383+
Storage::Object(obj) => obj.put_file(key, path, options).await.map(Some),
366384
Storage::Local(local) => local.put_file(key, path, options).await,
367385
Storage::AdmissionControlledS3(as3) => as3.put_file(key, path, options).await,
368386
}
@@ -376,6 +394,7 @@ impl Storage {
376394
) -> Result<Option<ETag>, StorageError> {
377395
match self {
378396
Storage::S3(s3) => s3.put_bytes(key, bytes, options).await,
397+
Storage::Object(obj) => obj.put(key, bytes.into(), options).await.map(Some),
379398
Storage::Local(local) => local.put_bytes(key, &bytes, options).await,
380399
Storage::AdmissionControlledS3(as3) => as3.put_bytes(key, bytes, options).await,
381400
}
@@ -384,12 +403,20 @@ impl Storage {
384403
pub async fn delete(&self, key: &str, options: DeleteOptions) -> Result<(), StorageError> {
385404
match self {
386405
Storage::S3(s3) => s3.delete(key, options).await,
387-
Storage::Local(local) => {
406+
Storage::Object(obj) => {
388407
if options.if_match.is_some() {
389408
return Err(StorageError::Message {
390409
message: "if match not supported for object store backend".to_string(),
391410
});
392411
}
412+
obj.delete(key).await
413+
}
414+
Storage::Local(local) => {
415+
if options.if_match.is_some() {
416+
return Err(StorageError::Message {
417+
message: "if match not supported for local backend".to_string(),
418+
});
419+
}
393420
local.delete(key).await
394421
}
395422
Storage::AdmissionControlledS3(ac) => ac.delete(key, options).await,
@@ -402,6 +429,7 @@ impl Storage {
402429
) -> Result<crate::s3::DeletedObjects, StorageError> {
403430
match self {
404431
Storage::S3(s3) => s3.delete_many(keys).await,
432+
Storage::Object(obj) => obj.delete_many(keys).await,
405433
Storage::Local(local) => local.delete_many(keys).await,
406434
Storage::AdmissionControlledS3(ac) => ac.delete_many(keys).await,
407435
}
@@ -410,6 +438,7 @@ impl Storage {
410438
pub async fn rename(&self, src_key: &str, dst_key: &str) -> Result<(), StorageError> {
411439
match self {
412440
Storage::S3(s3) => s3.rename(src_key, dst_key).await,
441+
Storage::Object(obj) => obj.rename(src_key, dst_key).await,
413442
Storage::Local(local) => local.rename(src_key, dst_key).await,
414443
Storage::AdmissionControlledS3(_) => Err(StorageError::NotImplemented),
415444
}
@@ -418,6 +447,7 @@ impl Storage {
418447
pub async fn copy(&self, src_key: &str, dst_key: &str) -> Result<(), StorageError> {
419448
match self {
420449
Storage::S3(s3) => s3.copy(src_key, dst_key).await,
450+
Storage::Object(obj) => obj.copy(src_key, dst_key).await,
421451
Storage::Local(local) => local.copy(src_key, dst_key).await,
422452
Storage::AdmissionControlledS3(ac) => ac.copy(src_key, dst_key).await,
423453
}
@@ -431,6 +461,7 @@ impl Storage {
431461
match self {
432462
Storage::Local(local) => local.list_prefix(prefix).await,
433463
Storage::S3(s3) => s3.list_prefix(prefix).await,
464+
Storage::Object(obj) => obj.list_prefix(prefix).await,
434465
Storage::AdmissionControlledS3(acs3) => acs3.list_prefix(prefix, options).await,
435466
}
436467
}
@@ -446,6 +477,9 @@ impl Configurable<StorageConfig> for Storage {
446477
StorageConfig::S3(_) => Ok(Storage::S3(
447478
s3::S3Storage::try_from_config(config, registry).await?,
448479
)),
480+
StorageConfig::Object(_) => Ok(Storage::Object(
481+
object_storage::ObjectStorage::try_from_config(config, registry).await?,
482+
)),
449483
StorageConfig::Local(_) => Ok(Storage::Local(
450484
local::LocalStorage::try_from_config(config, registry).await?,
451485
)),

0 commit comments

Comments
 (0)