From 9b837c63e320492c1ef9b57df0339d889325bb51 Mon Sep 17 00:00:00 2001 From: dcfocus Date: Sun, 28 Jun 2026 00:37:43 +0000 Subject: [PATCH 1/2] =?UTF-8?q?feat:=20external=20media=20references=20?= =?UTF-8?q?=E2=80=94=20store=20large=20media=20by=20typed=20payload=20URI?= =?UTF-8?q?=20(#115)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a typed external-reference payload so large media (images/audio/video) can live as objects in the configured object store and be referenced from a ContextRecord by `payload_uri` (plus optional `payload_size`/`payload_checksum`), instead of inlining bytes in `binary_payload`. `list`/`search` return the reference without materializing the bytes; an opt-in fetch resolves them on demand using the context's `storage_options`. - core: add `payload_uri`/`payload_size`/`payload_checksum` to `ContextRecord` and `RecordPatch`; new backward-compatible schema columns gated by `include_external_reference` (old datasets read them as `None`); `ContextStore::{fetch_payload,put_payload}` resolve/offload bytes through the context's object store (works for gs://, s3://, local). - api/server/client/python: thread the fields through the DTOs; add `GET /contexts/{name}/records/{id}/payload`, client `fetch_record_payload`, and Python `Context.fetch_payload`/`put_payload`. - tests: Rust roundtrip + missing-record/missing-reference cases, server route test, api serde round-trip, and a Python end-to-end suite. Also realign the shared add/upsert/update test doubles in test_search.py/test_embeddings.py with the current binding signature (they were already out of sync with tenant/source/run_id/created_at) and extend them for the new payload params. Signed-URL resolution is deferred (TODO left in `fetch_payload`). Closes #115 Co-Authored-By: Claude Opus 4.8 (1M context) --- README.md | 21 ++ crates/lance-context-api/src/lib.rs | 78 ++++++ crates/lance-context-client/src/lib.rs | 25 ++ crates/lance-context-core/src/api_impl.rs | 9 + crates/lance-context-core/src/eval.rs | 3 + crates/lance-context-core/src/export.rs | 3 + crates/lance-context-core/src/namespace.rs | 3 + crates/lance-context-core/src/record.rs | 21 ++ crates/lance-context-core/src/store.rs | 227 +++++++++++++++++- crates/lance-context-server/src/routes/mod.rs | 4 + .../src/routes/records.rs | 132 ++++++++++ python/python/lance_context/api.py | 99 ++++++++ python/src/lib.rs | 115 ++++++++- python/tests/test_external_payload.py | 91 +++++++ python/tests/test_search.py | 34 +++ 15 files changed, 846 insertions(+), 19 deletions(-) create mode 100644 python/tests/test_external_payload.py diff --git a/README.md b/README.md index ef7ee7a..c3ee8b2 100644 --- a/README.md +++ b/README.md @@ -168,6 +168,23 @@ ctx.add("assistant", image) print("Current version:", ctx.version()) +# External media references: keep large media in object storage (GCS/S3/local) +# and reference it from a record by a typed URI, instead of inlining the bytes. +object_uri = "gs://my-bucket/media/diagram-001.png" +ctx.put_payload(object_uri, image_bytes) # offload bytes via the context's storage_options +ctx.add( + "assistant", + "incident timeline diagram", # inline caption; the media stays in the bucket + content_type="image/png", + external_id="diagram-001", + payload_uri=object_uri, + payload_size=len(image_bytes), +) +# list/search/get return the reference without fetching the bytes; resolve on demand: +record = ctx.get(external_id="diagram-001") +media_bytes = ctx.fetch_payload(record["id"]) + + # Batch append source chunks in one storage operation ctx.add_many([ { @@ -424,6 +441,9 @@ let record = ContextRecord { content_type: "text/plain".into(), text_payload: Some("hello world".into()), binary_payload: None, + payload_uri: None, + payload_size: None, + payload_checksum: None, embedding: None, }; store.add(&[record]).await?; @@ -446,6 +466,7 @@ We are tracking future enhancements as GitHub issues: - ~~[Support standard storage_options / GCS](https://github.com/lance-format/lance-context/issues/45)~~ ✅ **Implemented** - [Add relationship column for GraphRAG workflows](https://github.com/lance-format/lance-context/issues/15) - ~~[Background compaction for Lance fragments](https://github.com/lance-format/lance-context/issues/16)~~ ✅ **Implemented** +- ~~[External media references — store large media in object storage by typed URI](https://github.com/lance-format/lance-context/issues/115)~~ ✅ **Implemented** Contributions are welcome—feel free to comment on the issues above or open your own proposals. diff --git a/crates/lance-context-api/src/lib.rs b/crates/lance-context-api/src/lib.rs index e0a0328..979d07d 100644 --- a/crates/lance-context-api/src/lib.rs +++ b/crates/lance-context-api/src/lib.rs @@ -176,6 +176,14 @@ pub struct AddRecordRequest { deserialize_with = "deserialize_base64_opt" )] pub binary_payload: Option>, + /// Typed reference to a payload object stored outside the dataset + /// (e.g. `gs://bucket/prefix/`). Distinct from inline `binary_payload`. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub payload_uri: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub payload_size: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub payload_checksum: Option, #[serde(default, skip_serializing_if = "Option::is_none")] pub embedding: Option>, #[serde(default, skip_serializing_if = "Option::is_none")] @@ -280,6 +288,12 @@ pub struct RecordPatchDto { pub retired_reason: Option, #[serde(default, skip_serializing_if = "Option::is_none")] pub embedding: Option>, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub payload_uri: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub payload_size: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub payload_checksum: Option, } impl RecordPatchDto { @@ -298,6 +312,9 @@ impl RecordPatchDto { && self.retired_at.is_none() && self.retired_reason.is_none() && self.embedding.is_none() + && self.payload_uri.is_none() + && self.payload_size.is_none() + && self.payload_checksum.is_none() } } @@ -348,6 +365,12 @@ pub struct RecordDto { )] pub binary_payload: Option>, #[serde(default, skip_serializing_if = "Option::is_none")] + pub payload_uri: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub payload_size: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub payload_checksum: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] pub embedding: Option>, #[serde(default, skip_serializing_if = "Option::is_none")] pub state_metadata: Option, @@ -602,4 +625,59 @@ mod tests { assert!(req.include_expired); assert!(req.include_retired); } + + #[test] + fn add_request_omits_payload_reference_when_absent() { + // Records without an external reference must not emit the new keys, so + // older servers/clients keep round-tripping unchanged. + let req = AddRecordRequest { + role: "user".to_string(), + content_type: "text/plain".to_string(), + text_payload: Some("hi".to_string()), + ..Default::default() + }; + let json = serde_json::to_string(&req).unwrap(); + assert!(!json.contains("payload_uri")); + assert!(!json.contains("payload_size")); + assert!(!json.contains("payload_checksum")); + } + + #[test] + fn add_request_roundtrips_payload_reference() { + let req = AddRecordRequest { + role: "user".to_string(), + content_type: "image/png".to_string(), + payload_uri: Some("gs://bucket/prefix/obj.png".to_string()), + payload_size: Some(2048), + payload_checksum: Some("sha256:abc".to_string()), + ..Default::default() + }; + let json = serde_json::to_string(&req).unwrap(); + let back: AddRecordRequest = serde_json::from_str(&json).unwrap(); + assert_eq!( + back.payload_uri.as_deref(), + Some("gs://bucket/prefix/obj.png") + ); + assert_eq!(back.payload_size, Some(2048)); + assert_eq!(back.payload_checksum.as_deref(), Some("sha256:abc")); + } + + #[test] + fn record_dto_decodes_payload_reference_and_legacy_shape() { + // New shape with a reference. + let dto: RecordDto = serde_json::from_str( + r#"{"id":"r1","run_id":"run","created_at":"2026-06-27T00:00:00Z","role":"user","content_type":"image/png","lifecycle_status":"active","payload_uri":"s3://b/obj","payload_size":10}"#, + ) + .unwrap(); + assert_eq!(dto.payload_uri.as_deref(), Some("s3://b/obj")); + assert_eq!(dto.payload_size, Some(10)); + assert_eq!(dto.payload_checksum, None); + + // Legacy shape lacking the reference fields still decodes. + let legacy: RecordDto = serde_json::from_str( + r#"{"id":"r1","run_id":"run","created_at":"2026-06-27T00:00:00Z","role":"user","content_type":"text/plain","lifecycle_status":"active"}"#, + ) + .unwrap(); + assert_eq!(legacy.payload_uri, None); + } } diff --git a/crates/lance-context-client/src/lib.rs b/crates/lance-context-client/src/lib.rs index 02b6a7c..3bbd88d 100644 --- a/crates/lance-context-client/src/lib.rs +++ b/crates/lance-context-client/src/lib.rs @@ -42,6 +42,15 @@ impl RemoteContextStore { cached_version: info.version, }) } + + /// Resolve a record's external payload reference to its raw bytes via the + /// server. Errors if no such record or the record has no external reference. + pub async fn fetch_payload(&self, id: &str) -> ContextResult> { + self.client + .fetch_record_payload(&self.context_name, id) + .await + .map_err(to_ctx_err) + } } impl ContextStoreApi for RemoteContextStore { @@ -403,6 +412,22 @@ impl ContextClient { Self::handle_response(resp).await } + /// Resolve a record's external payload reference to its raw bytes via the + /// server, which fetches from object storage using the context's + /// `storage_options`. Returns the raw payload bytes on success. + pub async fn fetch_record_payload(&self, name: &str, id: &str) -> Result, ClientError> { + let resp = self + .http + .get(self.url(&format!("/contexts/{}/records/{}/payload", name, id))) + .send() + .await?; + if resp.status().is_success() { + Ok(resp.bytes().await?.to_vec()) + } else { + Err(Self::extract_error(resp).await) + } + } + pub async fn delete_record( &self, name: &str, diff --git a/crates/lance-context-core/src/api_impl.rs b/crates/lance-context-core/src/api_impl.rs index 32a1317..be55240 100644 --- a/crates/lance-context-core/src/api_impl.rs +++ b/crates/lance-context-core/src/api_impl.rs @@ -408,6 +408,9 @@ fn patch_from_dto(patch: &RecordPatchDto) -> RecordPatch { retired_at: patch.retired_at, retired_reason: patch.retired_reason.clone(), embedding: patch.embedding.clone(), + payload_uri: patch.payload_uri.clone(), + payload_size: patch.payload_size, + payload_checksum: patch.payload_checksum.clone(), } } @@ -445,6 +448,9 @@ fn record_from_add_request(r: &AddRecordRequest, id: String, run_id: String) -> content_type: r.content_type.clone(), text_payload: r.text_payload.clone(), binary_payload: r.binary_payload.clone(), + payload_uri: r.payload_uri.clone(), + payload_size: r.payload_size, + payload_checksum: r.payload_checksum.clone(), embedding: r.embedding.clone(), } } @@ -463,6 +469,9 @@ fn record_to_dto(r: ContextRecord) -> RecordDto { content_type: r.content_type, text_payload: r.text_payload, binary_payload: r.binary_payload, + payload_uri: r.payload_uri, + payload_size: r.payload_size, + payload_checksum: r.payload_checksum, embedding: r.embedding, state_metadata: r.state_metadata.map(|sm| StateMetadataDto { step: sm.step, diff --git a/crates/lance-context-core/src/eval.rs b/crates/lance-context-core/src/eval.rs index 5152798..7b51c92 100644 --- a/crates/lance-context-core/src/eval.rs +++ b/crates/lance-context-core/src/eval.rs @@ -516,6 +516,9 @@ mod tests { content_type: "text/plain".to_string(), text_payload: Some(text.to_string()), binary_payload: None, + payload_uri: None, + payload_size: None, + payload_checksum: None, embedding: Some(embedding), } } diff --git a/crates/lance-context-core/src/export.rs b/crates/lance-context-core/src/export.rs index 339053f..3532d8d 100644 --- a/crates/lance-context-core/src/export.rs +++ b/crates/lance-context-core/src/export.rs @@ -1249,6 +1249,9 @@ mod tests { content_type: "text/plain".to_string(), text_payload: Some(text.to_string()), binary_payload: None, + payload_uri: None, + payload_size: None, + payload_checksum: None, embedding: None, } } diff --git a/crates/lance-context-core/src/namespace.rs b/crates/lance-context-core/src/namespace.rs index f022371..50d678a 100644 --- a/crates/lance-context-core/src/namespace.rs +++ b/crates/lance-context-core/src/namespace.rs @@ -483,6 +483,9 @@ mod tests { content_type: CONTENT_TYPE_TEXT.to_string(), text_payload: Some("hello".to_string()), binary_payload: None, + payload_uri: None, + payload_size: None, + payload_checksum: None, embedding: None, } } diff --git a/crates/lance-context-core/src/record.rs b/crates/lance-context-core/src/record.rs index 5629e19..6d649cc 100644 --- a/crates/lance-context-core/src/record.rs +++ b/crates/lance-context-core/src/record.rs @@ -51,6 +51,16 @@ pub struct ContextRecord { pub content_type: String, pub text_payload: Option, pub binary_payload: Option>, + /// Typed reference to a payload object stored outside the Lance dataset + /// (e.g. `gs://bucket/prefix/`). Large media lives in object storage and + /// is fetched on demand via [`ContextStore::fetch_payload`], leaving the + /// dataset to hold only metadata, embeddings, and the reference. Distinct + /// from inline [`Self::binary_payload`], which stays the small-payload path. + pub payload_uri: Option, + /// Size in bytes of the externally-referenced payload, when known. + pub payload_size: Option, + /// Caller-supplied checksum of the externally-referenced payload, when known. + pub payload_checksum: Option, pub embedding: Option>, } @@ -172,6 +182,11 @@ pub struct RecordPatch { /// Vector embedding to attach to the record. Enables deferred embedding /// workflows: append raw text first, then enrich with an embedding later. pub embedding: Option>, + /// External payload reference to attach to the record. Enables attaching a + /// `gs://…`/`s3://…` media reference after the record was first created. + pub payload_uri: Option, + pub payload_size: Option, + pub payload_checksum: Option, } impl RecordPatch { @@ -190,6 +205,9 @@ impl RecordPatch { && self.retired_at.is_none() && self.retired_reason.is_none() && self.embedding.is_none() + && self.payload_uri.is_none() + && self.payload_size.is_none() + && self.payload_checksum.is_none() } } @@ -439,6 +457,9 @@ mod tests { content_type: "text/plain".to_string(), text_payload: Some("hello".to_string()), binary_payload: None, + payload_uri: None, + payload_size: None, + payload_checksum: None, embedding: None, } } diff --git a/crates/lance-context-core/src/store.rs b/crates/lance-context-core/src/store.rs index 1d44b4f..8edf25c 100644 --- a/crates/lance-context-core/src/store.rs +++ b/crates/lance-context-core/src/store.rs @@ -4,13 +4,13 @@ use std::sync::Arc; use std::time::Duration; use arrow_array::builder::{ - FixedSizeListBuilder, Float32Builder, Int32Builder, LargeBinaryBuilder, LargeStringBuilder, - ListBuilder, StringBuilder, StringDictionaryBuilder, StructBuilder, + FixedSizeListBuilder, Float32Builder, Int32Builder, Int64Builder, LargeBinaryBuilder, + LargeStringBuilder, ListBuilder, StringBuilder, StringDictionaryBuilder, StructBuilder, TimestampMicrosecondBuilder, }; use arrow_array::types::Int8Type; use arrow_array::{ - Array, ArrayRef, DictionaryArray, FixedSizeListArray, Float32Array, Int32Array, + Array, ArrayRef, DictionaryArray, FixedSizeListArray, Float32Array, Int32Array, Int64Array, LargeBinaryArray, LargeStringArray, ListArray, RecordBatch, RecordBatchIterator, StringArray, StructArray, TimestampMicrosecondArray, }; @@ -24,7 +24,7 @@ use lance::dataset::optimize::{compact_files, CompactionMetrics, CompactionOptio use lance::dataset::NewColumnTransform; use lance::dataset::{builder::DatasetBuilder, Dataset, WriteMode, WriteParams}; use lance::index::DatasetIndexExt; -use lance::io::{ObjectStoreParams, StorageOptionsAccessor}; +use lance::io::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry, StorageOptionsAccessor}; use lance::{Error as LanceError, Result as LanceResult}; use lance_index::mem_wal::MEM_WAL_INDEX_NAME; use lance_index::scalar::ScalarIndexParams; @@ -196,6 +196,10 @@ pub struct ContextStore { id_index_type: IdIndexType, embedding_dim: i32, distance_metric: DistanceMetric, + /// Object-store configuration used to resolve external payload references + /// (see [`ContextStore::fetch_payload`]). Reuses the same options the + /// dataset was opened with so referenced media can live in the same bucket. + storage_options: Option>, } /// Additional configuration when opening a [`ContextStore`]. @@ -358,7 +362,7 @@ impl ContextStore { Err(LanceError::DatasetNotFound { .. }) => { let dataset = Self::create_with_options( uri, - storage_options, + storage_options.clone(), &blob_columns, requested_embedding_dim, options.distance_metric.unwrap_or_default(), @@ -403,6 +407,7 @@ impl ContextStore { id_index_type: options.id_index_type, embedding_dim, distance_metric, + storage_options, }; // Ensure id index if configured @@ -494,6 +499,63 @@ impl ContextStore { Ok(self.dataset.manifest.version) } + /// Resolve a record's external payload reference to its bytes on demand. + /// + /// Records may carry a typed [`ContextRecord::payload_uri`] pointing at media + /// stored outside the dataset (e.g. `gs://bucket/prefix/`); `list`/`search` + /// return that reference without materializing the bytes. This opt-in fetch + /// resolves them using the context's configured `storage_options`, so it works + /// for `gs://`, `s3://`, and local paths through the same object-store path the + /// dataset itself uses. + /// + /// Returns `Ok(None)` if no record with `id` exists. Returns an error if the + /// record exists but carries no external payload reference. + // TODO(#115): offer a signed-URL variant (`fetch_payload_url`) where the + // backend supports presigning, instead of always streaming the bytes back. + pub async fn fetch_payload(&self, id: &str) -> LanceResult>> { + // Use the list-backed accessor so freshly written (MemWAL-buffered) rows + // and lifecycle visibility are handled exactly like every other read. + let Some(record) = self.get_by_id(id).await? else { + return Ok(None); + }; + let Some(uri) = record.payload_uri.as_deref() else { + return Err(LanceError::from(ArrowError::InvalidArgumentError(format!( + "record '{id}' has no external payload reference to fetch" + )))); + }; + let registry = Arc::new(ObjectStoreRegistry::default()); + let (store, path) = + ObjectStore::from_uri_and_params(registry, uri, &self.payload_store_params()).await?; + let bytes = store.read_one_all(&path).await?; + Ok(Some(bytes.to_vec())) + } + + /// Offload caller-provided bytes to an object at `uri` using the context's + /// configured `storage_options`, returning the number of bytes written. + /// + /// Pairs with [`ContextStore::fetch_payload`]: write the media object here, + /// then `add` a record whose `payload_uri` points at `uri`. Inline + /// [`ContextRecord::binary_payload`] remains the small-payload path. + pub async fn put_payload(&self, uri: &str, bytes: &[u8]) -> LanceResult { + let registry = Arc::new(ObjectStoreRegistry::default()); + let (store, path) = + ObjectStore::from_uri_and_params(registry, uri, &self.payload_store_params()).await?; + store.put(&path, bytes).await?; + Ok(bytes.len() as u64) + } + + /// Object-store parameters threading the context's `storage_options` so the + /// same credentials/endpoint apply when resolving external payload URIs. + fn payload_store_params(&self) -> ObjectStoreParams { + let mut params = ObjectStoreParams::default(); + if let Some(options) = &self.storage_options { + params.storage_options_accessor = Some(Arc::new( + StorageOptionsAccessor::with_static_options(options.clone()), + )); + } + params + } + /// Logically forget a record by internal storage id. /// /// This writes a tombstone with the same primary key, preserving prior @@ -909,6 +971,15 @@ impl ContextStore { if let Some(embedding) = patch.embedding { record.embedding = Some(embedding); } + if let Some(payload_uri) = patch.payload_uri { + record.payload_uri = Some(payload_uri); + } + if let Some(payload_size) = patch.payload_size { + record.payload_size = Some(payload_size); + } + if let Some(payload_checksum) = patch.payload_checksum { + record.payload_checksum = Some(payload_checksum); + } self.validate_new_record_id(&record).await?; let version = self.write_entries(std::slice::from_ref(&record)).await?; @@ -943,6 +1014,9 @@ impl ContextStore { content_type: CONTENT_TYPE_TOMBSTONE.to_string(), text_payload: None, binary_payload: None, + payload_uri: None, + payload_size: None, + payload_checksum: None, embedding: None, }; self.write_entries(std::slice::from_ref(&tombstone)).await @@ -1821,17 +1895,20 @@ impl ContextStore { true, true, true, + true, embedding_dim, DistanceMetric::default(), ) } + #[allow(clippy::too_many_arguments)] fn schema_with_options( blob_columns: &HashSet, include_external_id: bool, include_metadata: bool, include_relationships: bool, include_lifecycle: bool, + include_external_reference: bool, embedding_dim: i32, distance_metric: DistanceMetric, ) -> Schema { @@ -1920,15 +1997,22 @@ impl ContextStore { Field::new("content_type", DataType::Utf8, false), text_field, binary_field, - Field::new( - "embedding", - DataType::FixedSizeList( - Arc::new(Field::new("item", DataType::Float32, true)), - embedding_dim, - ), - true, - ), ]); + if include_external_reference { + fields.extend([ + Field::new("payload_uri", DataType::Utf8, true), + Field::new("payload_size", DataType::Int64, true), + Field::new("payload_checksum", DataType::Utf8, true), + ]); + } + fields.push(Field::new( + "embedding", + DataType::FixedSizeList( + Arc::new(Field::new("item", DataType::Float32, true)), + embedding_dim, + ), + true, + )); let schema_metadata = HashMap::from([( DISTANCE_METRIC_METADATA_KEY.to_string(), @@ -1965,6 +2049,7 @@ impl ContextStore { true, true, true, + true, embedding_dim, distance_metric, )); @@ -2023,6 +2108,12 @@ impl ContextStore { .field_paths() .iter() .any(|path| path == "source"); + let include_external_reference = self + .dataset + .schema() + .field_paths() + .iter() + .any(|path| path == "payload_uri"); let include_relationships = self.has_relationships_column(); if !include_external_id && entries.iter().any(|entry| entry.external_id.is_some()) { return Err(ArrowError::InvalidArgumentError( @@ -2057,6 +2148,18 @@ impl ContextStore { ) .into()); } + if !include_external_reference + && entries.iter().any(|entry| { + entry.payload_uri.is_some() + || entry.payload_size.is_some() + || entry.payload_checksum.is_some() + }) + { + return Err(ArrowError::InvalidArgumentError( + "external payload references require a context dataset created with external-reference support".to_string(), + ) + .into()); + } if !include_lifecycle && entries.iter().any(ContextRecord::has_non_default_lifecycle) { return Err(ArrowError::InvalidArgumentError( "lifecycle fields require a context dataset created with lifecycle support" @@ -2086,6 +2189,9 @@ impl ContextStore { let mut superseded_by_id_builder = StringBuilder::new(); let mut content_type_builder = StringBuilder::new(); let mut binary_builder = LargeBinaryBuilder::new(); + let mut payload_uri_builder = StringBuilder::new(); + let mut payload_size_builder = Int64Builder::new(); + let mut payload_checksum_builder = StringBuilder::new(); let text_is_blob = self.blob_columns.contains("text_payload"); let mut text_string_builder = if !text_is_blob { @@ -2180,6 +2286,10 @@ impl ContextStore { None => binary_builder.append_null(), } + payload_uri_builder.append_option(entry.payload_uri.as_deref()); + payload_size_builder.append_option(entry.payload_size); + payload_checksum_builder.append_option(entry.payload_checksum.as_deref()); + if let Some(metadata) = &entry.state_metadata { state_builder .field_builder::(0) @@ -2269,6 +2379,9 @@ impl ContextStore { Arc::new(text_string_builder.unwrap().finish()) }; let binary_array: ArrayRef = Arc::new(binary_builder.finish()); + let payload_uri_array: ArrayRef = Arc::new(payload_uri_builder.finish()); + let payload_size_array: ArrayRef = Arc::new(payload_size_builder.finish()); + let payload_checksum_array: ArrayRef = Arc::new(payload_checksum_builder.finish()); let state_array: ArrayRef = Arc::new(state_builder.finish()); let embedding_array: ArrayRef = Arc::new(embedding_builder.finish()); @@ -2313,6 +2426,13 @@ impl ContextStore { ("binary_payload".to_string(), binary_array), ("embedding".to_string(), embedding_array), ]); + if include_external_reference { + arrays_by_name.extend([ + ("payload_uri".to_string(), payload_uri_array), + ("payload_size".to_string(), payload_size_array), + ("payload_checksum".to_string(), payload_checksum_array), + ]); + } let schema: Arc = Arc::new(self.dataset.schema().into()); let arrays = schema @@ -2367,6 +2487,9 @@ fn batch_to_records(batch: &RecordBatch) -> LanceResult> { let superseded_by_id_array = column_as_optional::(batch, "superseded_by_id"); let content_type_array = column_as::(batch, "content_type")?; let binary_array = column_as_optional::(batch, "binary_payload"); + let payload_uri_array = column_as_optional::(batch, "payload_uri"); + let payload_size_array = column_as_optional::(batch, "payload_size"); + let payload_checksum_array = column_as_optional::(batch, "payload_checksum"); let embedding_array = column_as_optional::(batch, "embedding"); // `text_payload` may be projected out, or stored as LargeBinary (blob) or LargeUtf8. @@ -2559,6 +2682,15 @@ fn batch_to_records(batch: &RecordBatch) -> LanceResult> { let retired_reason = optional_string_from_array(retired_reason_array, row); let supersedes_id = optional_string_from_array(supersedes_id_array, row); let superseded_by_id = optional_string_from_array(superseded_by_id_array, row); + let payload_uri = optional_string_from_array(payload_uri_array, row); + let payload_size = payload_size_array.and_then(|arr| { + if arr.is_null(row) { + None + } else { + Some(arr.value(row)) + } + }); + let payload_checksum = optional_string_from_array(payload_checksum_array, row); results.push(ContextRecord { id: id_array.value(row).to_string(), @@ -2589,6 +2721,9 @@ fn batch_to_records(batch: &RecordBatch) -> LanceResult> { content_type: content_type_array.value(row).to_string(), text_payload, binary_payload, + payload_uri, + payload_size, + payload_checksum, embedding, }); } @@ -3001,10 +3136,75 @@ mod tests { content_type: CONTENT_TYPE_TEXT.to_string(), text_payload: Some(format!("payload-{id}")), binary_payload: None, + payload_uri: None, + payload_size: None, + payload_checksum: None, embedding: Some(make_embedding(embedding_pivot)), } } + #[test] + fn external_payload_reference_roundtrips_add_list_and_fetch() { + let dir = TempDir::new().unwrap(); + let uri = dir.path().to_string_lossy().to_string(); + let media_dir = TempDir::new().unwrap(); + let object_uri = media_dir + .path() + .join("media-001.bin") + .to_string_lossy() + .to_string(); + let payload = b"\x89PNG\r\n\x1a\n external media bytes".to_vec(); + + let runtime = tokio::runtime::Runtime::new().unwrap(); + runtime.block_on(async { + let mut store = ContextStore::open(&uri).await.unwrap(); + + // Offload bytes to the object store, then reference them by URI. + let written = store.put_payload(&object_uri, &payload).await.unwrap(); + assert_eq!(written, payload.len() as u64); + + let mut record = text_record("media-001", 0.5); + record.content_type = "image/png".to_string(); + record.text_payload = None; + record.payload_uri = Some(object_uri.clone()); + record.payload_size = Some(payload.len() as i64); + record.payload_checksum = Some("sha256:deadbeef".to_string()); + store.add(std::slice::from_ref(&record)).await.unwrap(); + + // list returns the reference without materializing the bytes. + let listed = store.list(None, None).await.unwrap(); + assert_eq!(listed.len(), 1); + let listed = &listed[0]; + assert_eq!(listed.payload_uri.as_deref(), Some(object_uri.as_str())); + assert_eq!(listed.payload_size, Some(payload.len() as i64)); + assert_eq!(listed.payload_checksum.as_deref(), Some("sha256:deadbeef")); + assert_eq!(listed.binary_payload, None); + + // opt-in fetch resolves the bytes via the context's storage path. + let fetched = store.fetch_payload(&record.id).await.unwrap(); + assert_eq!(fetched, Some(payload.clone())); + }); + } + + #[test] + fn fetch_payload_handles_missing_record_and_missing_reference() { + let dir = TempDir::new().unwrap(); + let uri = dir.path().to_string_lossy().to_string(); + let runtime = tokio::runtime::Runtime::new().unwrap(); + runtime.block_on(async { + let mut store = ContextStore::open(&uri).await.unwrap(); + + // Unknown id resolves to None rather than erroring. + assert_eq!(store.fetch_payload("does-not-exist").await.unwrap(), None); + + // A record without an external reference is an error to fetch. + let record = text_record("inline-1", 0.1); + store.add(std::slice::from_ref(&record)).await.unwrap(); + let err = store.fetch_payload(&record.id).await.unwrap_err(); + assert!(err.to_string().contains("no external payload reference")); + }); + } + #[test] fn search_orders_by_distance() { let dir = TempDir::new().unwrap(); @@ -3981,6 +4181,7 @@ mod tests { true, false, true, + true, DEFAULT_EMBEDDING_DIM, DistanceMetric::default(), )); diff --git a/crates/lance-context-server/src/routes/mod.rs b/crates/lance-context-server/src/routes/mod.rs index 2e33035..e255c2a 100644 --- a/crates/lance-context-server/src/routes/mod.rs +++ b/crates/lance-context-server/src/routes/mod.rs @@ -59,6 +59,10 @@ pub fn router() -> Router> { "/api/v1/contexts/{name}/records/{id}", delete(records::delete_record), ) + .route( + "/api/v1/contexts/{name}/records/{id}/payload", + get(records::fetch_payload), + ) .route("/api/v1/contexts/{name}/search", post(search::search)) .route("/api/v1/contexts/{name}/retrieve", post(search::retrieve)) .route( diff --git a/crates/lance-context-server/src/routes/records.rs b/crates/lance-context-server/src/routes/records.rs index 24c52b2..4037497 100644 --- a/crates/lance-context-server/src/routes/records.rs +++ b/crates/lance-context-server/src/routes/records.rs @@ -1,6 +1,9 @@ use std::sync::Arc; +use axum::body::Body; use axum::extract::{Path, Query, State}; +use axum::http::header; +use axum::response::Response; use axum::Json; use chrono::Utc; use lance_context_api::{ @@ -248,6 +251,51 @@ pub async fn get_record( })) } +/// Resolve a record's external payload reference to its raw bytes. +/// +/// Returns the bytes with the record's `content_type` (defaulting to +/// `application/octet-stream`). `404` if no such record; `400` if the record +/// carries no external payload reference. +pub async fn fetch_payload( + State(state): State>, + Path((name, id)): Path<(String, String)>, +) -> Result { + let stores = state.stores.read().await; + let store_lock = stores + .get(&name) + .ok_or_else(|| AppError::NotFound(format!("Context '{}' does not exist", name)))? + .clone(); + drop(stores); + + let store = store_lock.read().await; + let record = store + .get_by_id(&id) + .await + .map_err(AppError::from_lance)? + .ok_or_else(|| AppError::NotFound(format!("Record '{}' does not exist", id)))?; + if record.payload_uri.is_none() { + return Err(AppError::InvalidRequest(format!( + "record '{}' has no external payload reference to fetch", + id + ))); + } + let bytes = store + .fetch_payload(&id) + .await + .map_err(AppError::from_lance)? + .ok_or_else(|| AppError::NotFound(format!("Record '{}' does not exist", id)))?; + + let content_type = if record.content_type.is_empty() { + "application/octet-stream".to_string() + } else { + record.content_type + }; + Response::builder() + .header(header::CONTENT_TYPE, content_type) + .body(Body::from(bytes)) + .map_err(|err| AppError::Internal(err.to_string())) +} + #[derive(serde::Deserialize)] pub struct ExternalIdParams { pub external_id: String, @@ -424,6 +472,9 @@ pub fn record_to_dto(r: ContextRecord) -> RecordDto { content_type: r.content_type, text_payload: r.text_payload, binary_payload: r.binary_payload, + payload_uri: r.payload_uri, + payload_size: r.payload_size, + payload_checksum: r.payload_checksum, embedding: r.embedding, state_metadata: r.state_metadata.map(|sm| StateMetadataDto { step: sm.step, @@ -489,6 +540,9 @@ fn patch_from_dto(patch: &RecordPatchDto) -> RecordPatch { retired_at: patch.retired_at, retired_reason: patch.retired_reason.clone(), embedding: patch.embedding.clone(), + payload_uri: patch.payload_uri.clone(), + payload_size: patch.payload_size, + payload_checksum: patch.payload_checksum.clone(), } } @@ -530,6 +584,9 @@ fn record_from_add_request( content_type: r.content_type.clone(), text_payload: r.text_payload.clone(), binary_payload: r.binary_payload.clone(), + payload_uri: r.payload_uri.clone(), + payload_size: r.payload_size, + payload_checksum: r.payload_checksum.clone(), embedding: r.embedding.clone(), } } @@ -579,6 +636,81 @@ mod tests { } } + #[tokio::test] + async fn fetch_payload_returns_bytes_404_and_400() { + let context_name = "ctx"; + let (state, dir) = test_state(context_name).await; + let object_uri = dir.path().join("media.bin").to_string_lossy().to_string(); + let payload = b"external media bytes".to_vec(); + + // Offload the object through the store's object-store path. + { + let stores = state.stores.read().await; + let store = stores.get(context_name).unwrap().read().await; + store.put_payload(&object_uri, &payload).await.unwrap(); + } + + // Add a record that references the object instead of inlining bytes. + let record = AddRecordRequest { + role: "user".to_string(), + content_type: "image/png".to_string(), + payload_uri: Some(object_uri.clone()), + payload_size: Some(payload.len() as i64), + ..Default::default() + }; + let (_, Json(add_response)) = add_records( + State(state.clone()), + Path(context_name.to_string()), + Json(AddRecordsRequest { + records: vec![record], + }), + ) + .await + .unwrap(); + let id = add_response.ids[0].clone(); + + // The payload endpoint streams the resolved bytes with the content type. + let resp = fetch_payload( + State(state.clone()), + Path((context_name.to_string(), id.clone())), + ) + .await + .unwrap(); + assert_eq!( + resp.headers().get(header::CONTENT_TYPE).unwrap(), + "image/png" + ); + let body = axum::body::to_bytes(resp.into_body(), usize::MAX) + .await + .unwrap(); + assert_eq!(body.as_ref(), payload.as_slice()); + + // Unknown id -> 404. + let missing = fetch_payload( + State(state.clone()), + Path((context_name.to_string(), "does-not-exist".to_string())), + ) + .await + .unwrap_err(); + assert!(matches!(missing, AppError::NotFound(_))); + + // Record without an external reference -> 400. + let (_, Json(inline)) = add_records( + State(state.clone()), + Path(context_name.to_string()), + Json(AddRecordsRequest { + records: vec![text_record("inline only")], + }), + ) + .await + .unwrap(); + let inline_id = inline.ids[0].clone(); + let no_ref = fetch_payload(State(state), Path((context_name.to_string(), inline_id))) + .await + .unwrap_err(); + assert!(matches!(no_ref, AppError::InvalidRequest(_))); + } + #[tokio::test] async fn get_and_delete_by_external_id() { let context_name = "ctx"; diff --git a/python/python/lance_context/api.py b/python/python/lance_context/api.py index a24882d..c0b12a1 100644 --- a/python/python/lance_context/api.py +++ b/python/python/lance_context/api.py @@ -57,6 +57,9 @@ "retired_reason", "supersedes_id", "superseded_by_id", + "payload_uri", + "payload_size", + "payload_checksum", } @@ -185,6 +188,9 @@ def _normalize_record(raw: dict[str, Any]) -> dict[str, Any]: "content_type": raw.get("content_type"), "text": raw.get("text_payload"), "binary": raw.get("binary_payload"), + "payload_uri": raw.get("payload_uri"), + "payload_size": raw.get("payload_size"), + "payload_checksum": raw.get("payload_checksum"), "embedding": raw.get("embedding"), "created_at": _normalize_timestamp(raw.get("created_at")), "state_metadata": raw.get("state_metadata"), @@ -598,6 +604,9 @@ def add( retired_reason: str | None = None, supersedes_id: str | None = None, superseded_by_id: str | None = None, + payload_uri: str | None = None, + payload_size: int | None = None, + payload_checksum: str | None = None, ) -> None: if content_type is not None and data_type is not None: raise ValueError("Specify only one of content_type or data_type") @@ -641,6 +650,9 @@ def add( supersedes_id, superseded_by_id, _json_dumps(relationships, "relationships"), + payload_uri, + payload_size, + payload_checksum, ) def upsert( @@ -665,6 +677,9 @@ def upsert( lifecycle_status: str | None = None, retired_at: datetime | str | None = None, retired_reason: str | None = None, + payload_uri: str | None = None, + payload_size: int | None = None, + payload_checksum: str | None = None, *, key: str = "external_id", ) -> dict[str, Any]: @@ -714,6 +729,9 @@ def upsert( _coerce_timestamp(retired_at, field_name="retired_at"), retired_reason, _json_dumps(relationships, "relationships"), + payload_uri, + payload_size, + payload_checksum, key, ) return { @@ -740,12 +758,18 @@ def update( retired_at: datetime | str | None = None, retired_reason: str | None = None, embedding: list[float] | None = None, + payload_uri: str | None = None, + payload_size: int | None = None, + payload_checksum: str | None = None, ) -> dict[str, Any]: """Patch mutable fields on a visible record by id or external_id. Pass ``embedding`` to attach or replace a record's vector after it was appended without one (deferred / enrich-later ingestion). The updated record participates in vector search once the embedding is set. + + Pass ``payload_uri`` (with optional ``payload_size`` / ``payload_checksum``) + to attach an external media reference after the record was created. """ if (id is None) == (external_id is None): raise ValueError("Specify exactly one of id or external_id") @@ -762,6 +786,9 @@ def update( and retired_at is None and retired_reason is None and embedding is None + and payload_uri is None + and payload_size is None + and payload_checksum is None ): raise ValueError("update requires at least one patch field") @@ -780,6 +807,9 @@ def update( _coerce_timestamp(retired_at, field_name="retired_at"), retired_reason, embedding, + payload_uri, + payload_size, + payload_checksum, ) record = result.get("record") return { @@ -863,6 +893,9 @@ def add_many(self, records: Iterable[Mapping[str, Any]]) -> None: "retired_reason": record.get("retired_reason"), "supersedes_id": record.get("supersedes_id"), "superseded_by_id": record.get("superseded_by_id"), + "payload_uri": record.get("payload_uri"), + "payload_size": record.get("payload_size"), + "payload_checksum": record.get("payload_checksum"), } ) @@ -1490,6 +1523,26 @@ def get( return None return _normalize_record(result) + def fetch_payload(self, id: str) -> bytes | None: + """Resolve a record's external ``payload_uri`` to its bytes on demand. + + Reads the referenced object (``gs://``/``s3://``/local) using the + context's configured ``storage_options``, so large media can live in + object storage while ``get``/``list``/``search`` return only the + reference. Returns ``None`` if no record with ``id`` exists; raises if + the record carries no external payload reference. + """ + return self._inner.fetch_payload(id) + + def put_payload(self, uri: str, data: bytes) -> int: + """Offload ``data`` to an object at ``uri`` using the context's + ``storage_options``; returns the number of bytes written. + + Pair with ``add(..., payload_uri=uri)`` to store large media outside the + dataset and reference it from a record. + """ + return int(self._inner.put_payload(uri, data)) + def related( self, target_id: str, @@ -1760,6 +1813,9 @@ async def add( retired_reason: str | None = None, supersedes_id: str | None = None, superseded_by_id: str | None = None, + payload_uri: str | None = None, + payload_size: int | None = None, + payload_checksum: str | None = None, ) -> None: loop = asyncio.get_running_loop() await loop.run_in_executor( @@ -1785,6 +1841,9 @@ async def add( retired_reason=retired_reason, supersedes_id=supersedes_id, superseded_by_id=superseded_by_id, + payload_uri=payload_uri, + payload_size=payload_size, + payload_checksum=payload_checksum, ), ) @@ -1967,6 +2026,18 @@ async def get( None, lambda: self._sync.get(id=id, external_id=external_id) ) + async def fetch_payload(self, id: str) -> bytes | None: + """Asynchronously resolve a record's external ``payload_uri`` to bytes.""" + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, lambda: self._sync.fetch_payload(id)) + + async def put_payload(self, uri: str, data: bytes) -> int: + """Asynchronously offload ``data`` to an object at ``uri``.""" + loop = asyncio.get_running_loop() + return await loop.run_in_executor( + None, lambda: self._sync.put_payload(uri, data) + ) + async def list( self, limit: int | None = None, @@ -2050,6 +2121,9 @@ async def add( retention_policy: str | None = None, supersedes_id: str | None = None, relationships: Iterable[Mapping[str, Any]] | None = None, + payload_uri: str | None = None, + payload_size: int | None = None, + payload_checksum: str | None = None, ) -> dict[str, Any]: """Add a record to the remote context.""" payload, data_type = _normalize_content(content, content_type) @@ -2077,6 +2151,9 @@ async def add( retention_policy=retention_policy, supersedes_id=supersedes_id, relationships_json=rel_json, + payload_uri=payload_uri, + payload_size=payload_size, + payload_checksum=payload_checksum, ), ) @@ -2095,6 +2172,9 @@ async def upsert( retention_policy: str | None = None, supersedes_id: str | None = None, relationships: Iterable[Mapping[str, Any]] | None = None, + payload_uri: str | None = None, + payload_size: int | None = None, + payload_checksum: str | None = None, key: str = "external_id", ) -> dict[str, Any]: """Upsert a record by external_id.""" @@ -2121,6 +2201,9 @@ async def upsert( retention_policy=retention_policy, supersedes_id=supersedes_id, relationships_json=rel_json, + payload_uri=payload_uri, + payload_size=payload_size, + payload_checksum=payload_checksum, key=key, ), ) @@ -2142,6 +2225,9 @@ async def update( retired_at: datetime | str | None = None, retired_reason: str | None = None, embedding: Iterable[float] | None = None, + payload_uri: str | None = None, + payload_size: int | None = None, + payload_checksum: str | None = None, ) -> dict[str, Any]: """Update a record by id or external_id.""" meta_json = _json_dumps(metadata, "metadata") @@ -2167,6 +2253,9 @@ async def update( retired_at=ret, retired_reason=retired_reason, embedding=emb, + payload_uri=payload_uri, + payload_size=payload_size, + payload_checksum=payload_checksum, ), ) if result.get("record") is not None: @@ -2187,6 +2276,16 @@ async def get( ) return _normalize_record(result) if result is not None else None + async def fetch_payload(self, id: str) -> bytes: + """Resolve a record's external ``payload_uri`` to its bytes via the + remote server, which reads from object storage using the context's + ``storage_options``.""" + loop = asyncio.get_running_loop() + return await loop.run_in_executor( + None, + lambda: self._sync.fetch_payload(id), + ) + async def delete( self, *, diff --git a/python/src/lib.rs b/python/src/lib.rs index c905e88..73cb468 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -52,6 +52,9 @@ struct RecordInput { metadata_json: Option, relationships: Vec, lifecycle: LifecycleFields, + payload_uri: Option, + payload_size: Option, + payload_checksum: Option, } #[derive(Default)] @@ -409,7 +412,7 @@ impl Context { } #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (role, content, data_type = None, embedding = None, bot_id = None, session_id = None, tenant = None, source = None, external_id = None, run_id = None, created_at = None, state_metadata = None, metadata_json = None, expires_at = None, retention_policy = None, lifecycle_status = None, retired_at = None, retired_reason = None, supersedes_id = None, superseded_by_id = None, relationships_json = None))] + #[pyo3(signature = (role, content, data_type = None, embedding = None, bot_id = None, session_id = None, tenant = None, source = None, external_id = None, run_id = None, created_at = None, state_metadata = None, metadata_json = None, expires_at = None, retention_policy = None, lifecycle_status = None, retired_at = None, retired_reason = None, supersedes_id = None, superseded_by_id = None, relationships_json = None, payload_uri = None, payload_size = None, payload_checksum = None))] fn add( &mut self, py: Python<'_>, @@ -434,6 +437,9 @@ impl Context { supersedes_id: Option, superseded_by_id: Option, relationships_json: Option, + payload_uri: Option, + payload_size: Option, + payload_checksum: Option, ) -> PyResult<()> { let lifecycle = LifecycleFields { expires_at: parse_optional_datetime(expires_at, "expires_at")?, @@ -461,6 +467,9 @@ impl Context { metadata_json, relationships: relationships_from_json(relationships_json)?, lifecycle, + payload_uri, + payload_size, + payload_checksum, }, 1, )?; @@ -479,7 +488,7 @@ impl Context { } #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (role, content, data_type = None, embedding = None, bot_id = None, session_id = None, tenant = None, source = None, external_id = None, run_id = None, created_at = None, state_metadata = None, metadata_json = None, expires_at = None, retention_policy = None, lifecycle_status = None, retired_at = None, retired_reason = None, relationships_json = None, key = "external_id"))] + #[pyo3(signature = (role, content, data_type = None, embedding = None, bot_id = None, session_id = None, tenant = None, source = None, external_id = None, run_id = None, created_at = None, state_metadata = None, metadata_json = None, expires_at = None, retention_policy = None, lifecycle_status = None, retired_at = None, retired_reason = None, relationships_json = None, payload_uri = None, payload_size = None, payload_checksum = None, key = "external_id"))] fn upsert( &mut self, py: Python<'_>, @@ -502,6 +511,9 @@ impl Context { retired_at: Option, retired_reason: Option, relationships_json: Option, + payload_uri: Option, + payload_size: Option, + payload_checksum: Option, key: &str, ) -> PyResult { if key != "external_id" { @@ -541,6 +553,9 @@ impl Context { metadata_json, relationships: relationships_from_json(relationships_json)?, lifecycle, + payload_uri, + payload_size, + payload_checksum, }, 1, )?; @@ -565,7 +580,7 @@ impl Context { } #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (id = None, external_id = None, bot_id = None, session_id = None, tenant = None, source = None, metadata_json = None, relationships_json = None, expires_at = None, retention_policy = None, lifecycle_status = None, retired_at = None, retired_reason = None, embedding = None))] + #[pyo3(signature = (id = None, external_id = None, bot_id = None, session_id = None, tenant = None, source = None, metadata_json = None, relationships_json = None, expires_at = None, retention_policy = None, lifecycle_status = None, retired_at = None, retired_reason = None, embedding = None, payload_uri = None, payload_size = None, payload_checksum = None))] fn update( &mut self, py: Python<'_>, @@ -583,6 +598,9 @@ impl Context { retired_at: Option, retired_reason: Option, embedding: Option>, + payload_uri: Option, + payload_size: Option, + payload_checksum: Option, ) -> PyResult { let patch = RecordPatch { bot_id, @@ -598,6 +616,9 @@ impl Context { retired_at: parse_optional_datetime(retired_at, "retired_at")?, retired_reason, embedding, + payload_uri, + payload_size, + payload_checksum, }; if patch.is_empty() { return Err(PyRuntimeError::new_err( @@ -1033,6 +1054,32 @@ impl Context { record.map(|record| record_to_py(py, record)).transpose() } + /// Resolve a record's external payload reference to its bytes on demand, + /// using the context's configured ``storage_options``. Returns ``None`` if + /// no record with ``id`` exists; raises if the record has no external + /// payload reference. + #[pyo3(signature = (id))] + fn fetch_payload(&self, py: Python<'_>, id: &str) -> PyResult>> { + let bytes = py.allow_threads(|| { + self.runtime + .block_on(self.store.fetch_payload(id)) + .map_err(to_py_err) + })?; + Ok(bytes.map(|bytes| PyBytes::new(py, &bytes).unbind())) + } + + /// Offload caller-provided bytes to an object at ``uri`` using the context's + /// configured ``storage_options``; returns the number of bytes written. + /// Pair with a subsequent ``add(..., payload_uri=uri)``. + #[pyo3(signature = (uri, data))] + fn put_payload(&self, py: Python<'_>, uri: &str, data: &[u8]) -> PyResult { + py.allow_threads(|| { + self.runtime + .block_on(self.store.put_payload(uri, data)) + .map_err(to_py_err) + }) + } + #[pyo3(signature = (id = None, external_id = None))] fn delete( &mut self, @@ -1231,6 +1278,11 @@ impl Context { optional_item(dict, "supersedes_id")?.map(|value| value.extract::()); let superseded_by_id = optional_item(dict, "superseded_by_id")?.map(|value| value.extract::()); + let payload_uri = + optional_item(dict, "payload_uri")?.map(|value| value.extract::()); + let payload_size = optional_item(dict, "payload_size")?.map(|value| value.extract::()); + let payload_checksum = + optional_item(dict, "payload_checksum")?.map(|value| value.extract::()); let lifecycle = LifecycleFields { expires_at: parse_optional_datetime(expires_at.transpose()?, "expires_at")?, @@ -1259,6 +1311,9 @@ impl Context { metadata_json: metadata_json.transpose()?, relationships: relationships_from_json(relationships_json.transpose()?)?, lifecycle, + payload_uri: payload_uri.transpose()?, + payload_size: payload_size.transpose()?, + payload_checksum: payload_checksum.transpose()?, }, index as u64 + 1, ) @@ -1286,6 +1341,9 @@ impl Context { metadata_json, relationships, lifecycle, + payload_uri, + payload_size, + payload_checksum, } = input; let (content_type, text_payload, binary_payload, inner_content) = @@ -1340,6 +1398,9 @@ impl Context { content_type, text_payload, binary_payload, + payload_uri, + payload_size, + payload_checksum, embedding, }, role, @@ -1370,7 +1431,7 @@ impl RemoteContext { } #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (role, content, data_type = None, embedding = None, bot_id = None, session_id = None, external_id = None, state_metadata = None, metadata_json = None, expires_at = None, retention_policy = None, supersedes_id = None, relationships_json = None))] + #[pyo3(signature = (role, content, data_type = None, embedding = None, bot_id = None, session_id = None, external_id = None, state_metadata = None, metadata_json = None, expires_at = None, retention_policy = None, supersedes_id = None, relationships_json = None, payload_uri = None, payload_size = None, payload_checksum = None))] fn add( &mut self, py: Python<'_>, @@ -1387,6 +1448,9 @@ impl RemoteContext { retention_policy: Option, supersedes_id: Option, relationships_json: Option, + payload_uri: Option, + payload_size: Option, + payload_checksum: Option, ) -> PyResult { let (content_type, text_payload, binary_payload) = content_to_payloads(content, data_type)?; let req = AddRecordRequest { @@ -1394,6 +1458,9 @@ impl RemoteContext { content_type, text_payload, binary_payload, + payload_uri, + payload_size, + payload_checksum, embedding, bot_id, session_id, @@ -1421,7 +1488,7 @@ impl RemoteContext { } #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (role, content, data_type = None, embedding = None, bot_id = None, session_id = None, external_id = None, metadata_json = None, expires_at = None, retention_policy = None, supersedes_id = None, relationships_json = None, key = "external_id"))] + #[pyo3(signature = (role, content, data_type = None, embedding = None, bot_id = None, session_id = None, external_id = None, metadata_json = None, expires_at = None, retention_policy = None, supersedes_id = None, relationships_json = None, payload_uri = None, payload_size = None, payload_checksum = None, key = "external_id"))] fn upsert( &mut self, py: Python<'_>, @@ -1437,6 +1504,9 @@ impl RemoteContext { retention_policy: Option, supersedes_id: Option, relationships_json: Option, + payload_uri: Option, + payload_size: Option, + payload_checksum: Option, key: &str, ) -> PyResult { if key != "external_id" { @@ -1456,6 +1526,9 @@ impl RemoteContext { content_type, text_payload, binary_payload, + payload_uri, + payload_size, + payload_checksum, embedding, bot_id, session_id, @@ -1485,7 +1558,7 @@ impl RemoteContext { } #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (id = None, external_id = None, bot_id = None, session_id = None, metadata_json = None, relationships_json = None, expires_at = None, retention_policy = None, lifecycle_status = None, retired_at = None, retired_reason = None, embedding = None))] + #[pyo3(signature = (id = None, external_id = None, bot_id = None, session_id = None, metadata_json = None, relationships_json = None, expires_at = None, retention_policy = None, lifecycle_status = None, retired_at = None, retired_reason = None, embedding = None, payload_uri = None, payload_size = None, payload_checksum = None))] fn update( &mut self, py: Python<'_>, @@ -1501,6 +1574,9 @@ impl RemoteContext { retired_at: Option, retired_reason: Option, embedding: Option>, + payload_uri: Option, + payload_size: Option, + payload_checksum: Option, ) -> PyResult { let patch = RecordPatchDto { bot_id, @@ -1516,6 +1592,9 @@ impl RemoteContext { embedding, tenant: None, source: None, + payload_uri, + payload_size, + payload_checksum, }; if patch.is_empty() { return Err(PyRuntimeError::new_err( @@ -1589,6 +1668,18 @@ impl RemoteContext { .transpose() } + /// Resolve a record's external payload reference to its bytes via the server, + /// which fetches from object storage using the context's ``storage_options``. + #[pyo3(signature = (id))] + fn fetch_payload(&self, py: Python<'_>, id: &str) -> PyResult> { + let bytes = py.allow_threads(|| { + self.runtime + .block_on(self.store.fetch_payload(id)) + .map_err(to_py_err) + })?; + Ok(PyBytes::new(py, &bytes).unbind()) + } + #[pyo3(signature = (id = None, external_id = None))] fn delete( &mut self, @@ -1930,6 +2021,9 @@ fn record_to_py(py: Python<'_>, record: ContextRecord) -> PyResult { content_type, text_payload, binary_payload, + payload_uri, + payload_size, + payload_checksum, embedding, } = record; @@ -1984,6 +2078,9 @@ fn record_to_py(py: Python<'_>, record: ContextRecord) -> PyResult { Some(payload) => dict.set_item("binary_payload", PyBytes::new(py, &payload))?, None => dict.set_item("binary_payload", py.None())?, } + dict.set_item("payload_uri", payload_uri)?; + dict.set_item("payload_size", payload_size)?; + dict.set_item("payload_checksum", payload_checksum)?; dict.set_item("embedding", embedding)?; Ok(dict.into_pyobject(py)?.unbind().into()) } @@ -2103,6 +2200,9 @@ fn dto_record_to_py(py: Python<'_>, record: RecordDto) -> PyResult { content_type, text_payload, binary_payload, + payload_uri, + payload_size, + payload_checksum, embedding, state_metadata, metadata, @@ -2169,6 +2269,9 @@ fn dto_record_to_py(py: Python<'_>, record: RecordDto) -> PyResult { Some(payload) => dict.set_item("binary_payload", PyBytes::new(py, &payload))?, None => dict.set_item("binary_payload", py.None())?, } + dict.set_item("payload_uri", payload_uri)?; + dict.set_item("payload_size", payload_size)?; + dict.set_item("payload_checksum", payload_checksum)?; dict.set_item("embedding", embedding)?; Ok(dict.into_pyobject(py)?.unbind().into()) } diff --git a/python/tests/test_external_payload.py b/python/tests/test_external_payload.py new file mode 100644 index 0000000..6d4faab --- /dev/null +++ b/python/tests/test_external_payload.py @@ -0,0 +1,91 @@ +"""End-to-end tests for external media references (issue #115). + +Large media lives as an object in the configured object store and is referenced +from a record by a typed ``payload_uri`` (plus optional ``payload_size`` / +``payload_checksum``). ``get``/``list`` return the reference without fetching +bytes; ``fetch_payload`` resolves them on demand via the context's storage path. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest + +if TYPE_CHECKING: + from pathlib import Path + +from lance_context.api import Context + + +def test_external_payload_reference_roundtrips(tmp_path: Path) -> None: + uri = str(tmp_path / "context.lance") + ctx = Context.create(uri) + + # Offload the media bytes to an object, then reference it from a record. + object_uri = str(tmp_path / "media-001.png") + payload = b"\x89PNG\r\n\x1a\n external media bytes" + written = ctx.put_payload(object_uri, payload) + assert written == len(payload) + + ctx.add( + "user", + "diagram of the incident timeline", # inline caption, not the media + content_type="image/png", + external_id="img-1", + payload_uri=object_uri, + payload_size=len(payload), + payload_checksum="sha256:abc", + ) + + # The reference round-trips and no bytes are inlined. + record = ctx.get(external_id="img-1") + assert record is not None + assert record["payload_uri"] == object_uri + assert record["payload_size"] == len(payload) + assert record["payload_checksum"] == "sha256:abc" + assert record["binary"] is None + + # list returns the reference without materializing the bytes. + listed = ctx.list() + assert len(listed) == 1 + assert listed[0]["payload_uri"] == object_uri + assert listed[0]["binary"] is None + + # Opt-in fetch resolves the bytes through the context's storage path. + assert ctx.fetch_payload(record["id"]) == payload + + +def test_fetch_payload_missing_record_and_missing_reference(tmp_path: Path) -> None: + uri = str(tmp_path / "context.lance") + ctx = Context.create(uri) + + # Unknown id resolves to None rather than raising. + assert ctx.fetch_payload("does-not-exist") is None + + # A record without an external reference is an error to fetch. + ctx.add("user", "inline text only", external_id="inline-1") + record = ctx.get(external_id="inline-1") + assert record is not None + with pytest.raises(Exception, match="no external payload reference"): + ctx.fetch_payload(record["id"]) + + +def test_update_attaches_payload_reference_later(tmp_path: Path) -> None: + uri = str(tmp_path / "context.lance") + ctx = Context.create(uri) + + object_uri = str(tmp_path / "audio.wav") + payload = b"RIFF....WAVEfmt external audio" + ctx.put_payload(object_uri, payload) + + # Capture the record first, attach the media reference afterwards. + ctx.add("user", "voice note", external_id="note-1") + result = ctx.update( + external_id="note-1", + payload_uri=object_uri, + payload_size=len(payload), + ) + assert result["updated"] is True + assert result["record"]["payload_uri"] == object_uri + assert ctx.fetch_payload(result["record"]["id"]) == payload diff --git a/python/tests/test_search.py b/python/tests/test_search.py index dd2e9b8..fa78013 100644 --- a/python/tests/test_search.py +++ b/python/tests/test_search.py @@ -61,7 +61,11 @@ def add( embedding: list[float] | None, bot_id: str | None, session_id: str | None, + tenant: str | None, + source: str | None, external_id: str | None, + run_id: str | None, + created_at: str | None, state_metadata: dict[str, Any] | None, metadata_json: str | None, expires_at: str | None = None, @@ -72,6 +76,9 @@ def add( supersedes_id: str | None = None, superseded_by_id: str | None = None, relationships_json: str | None = None, + payload_uri: str | None = None, + payload_size: int | None = None, + payload_checksum: str | None = None, ): self.add_calls.append( ( @@ -107,7 +114,12 @@ def upsert( embedding: list[float] | None, bot_id: str | None, session_id: str | None, + tenant: str | None, + source: str | None, external_id: str | None, + run_id: str | None, + created_at: str | None, + state_metadata: dict[str, Any] | None, metadata_json: str | None, expires_at: str | None = None, retention_policy: str | None = None, @@ -115,6 +127,9 @@ def upsert( retired_at: str | None = None, retired_reason: str | None = None, relationships_json: str | None = None, + payload_uri: str | None = None, + payload_size: int | None = None, + payload_checksum: str | None = None, key: str = "external_id", ): self.upsert_calls.append( @@ -173,6 +188,8 @@ def update( external_id: str | None, bot_id: str | None, session_id: str | None, + tenant: str | None, + source: str | None, metadata_json: str | None, relationships_json: str | None, expires_at: str | None, @@ -181,6 +198,9 @@ def update( retired_at: str | None, retired_reason: str | None, embedding: list[float] | None = None, + payload_uri: str | None = None, + payload_size: int | None = None, + payload_checksum: str | None = None, ): self.update_calls.append( { @@ -1271,7 +1291,11 @@ def test_context_add_many_normalizes_records(): "embedding": None, "bot_id": None, "session_id": None, + "tenant": None, + "source": None, "external_id": None, + "run_id": None, + "created_at": None, "state_metadata": None, "metadata_json": None, "relationships_json": None, @@ -1282,6 +1306,9 @@ def test_context_add_many_normalizes_records(): "retired_reason": None, "supersedes_id": None, "superseded_by_id": None, + "payload_uri": None, + "payload_size": None, + "payload_checksum": None, }, { "role": "assistant", @@ -1290,7 +1317,11 @@ def test_context_add_many_normalizes_records(): "embedding": [0.1, 0.2], "bot_id": "bot", "session_id": "sess", + "tenant": None, + "source": None, "external_id": "doc-1#chunk-2", + "run_id": None, + "created_at": None, "state_metadata": None, "metadata_json": None, "relationships_json": None, @@ -1301,6 +1332,9 @@ def test_context_add_many_normalizes_records(): "retired_reason": None, "supersedes_id": None, "superseded_by_id": None, + "payload_uri": None, + "payload_size": None, + "payload_checksum": None, }, ] ] From cbc7cd0cc30ea07db0e72fbfe7dde6055406aa2c Mon Sep 17 00:00:00 2001 From: dcfocus Date: Sun, 28 Jun 2026 01:47:18 +0000 Subject: [PATCH 2/2] fix: thread external payload reference through Python upsert_many `add_many` and single `upsert` forwarded `payload_uri`/`payload_size`/ `payload_checksum`, but the bulk `upsert_many` path dropped them from the normalized record dict, so external media references were silently lost on batch insert-or-replace. Include the three fields (mirroring `add_many`) and cover it with a test. Co-Authored-By: Claude Opus 4.8 (1M context) --- python/python/lance_context/api.py | 3 +++ python/tests/test_external_payload.py | 33 +++++++++++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/python/python/lance_context/api.py b/python/python/lance_context/api.py index c0b12a1..31a9f33 100644 --- a/python/python/lance_context/api.py +++ b/python/python/lance_context/api.py @@ -989,6 +989,9 @@ def upsert_many( field_name=f"records[{index}].retired_at", ), "retired_reason": record.get("retired_reason"), + "payload_uri": record.get("payload_uri"), + "payload_size": record.get("payload_size"), + "payload_checksum": record.get("payload_checksum"), } ) diff --git a/python/tests/test_external_payload.py b/python/tests/test_external_payload.py index 6d4faab..afab818 100644 --- a/python/tests/test_external_payload.py +++ b/python/tests/test_external_payload.py @@ -89,3 +89,36 @@ def test_update_attaches_payload_reference_later(tmp_path: Path) -> None: assert result["updated"] is True assert result["record"]["payload_uri"] == object_uri assert ctx.fetch_payload(result["record"]["id"]) == payload + + +def test_upsert_many_forwards_payload_reference(tmp_path: Path) -> None: + uri = str(tmp_path / "context.lance") + ctx = Context.create(uri) + + object_uri = str(tmp_path / "frame-001.jpg") + payload = b"\xff\xd8\xff external jpeg bytes" + ctx.put_payload(object_uri, payload) + + # Bulk insert-or-replace must carry the external reference through, just like + # add_many / single upsert. + ctx.upsert_many( + [ + { + "role": "assistant", + "content": "captured frame", + "content_type": "image/jpeg", + "external_id": "frame-001", + "payload_uri": object_uri, + "payload_size": len(payload), + "payload_checksum": "sha256:frame", + } + ] + ) + + record = ctx.get(external_id="frame-001") + assert record is not None + assert record["payload_uri"] == object_uri + assert record["payload_size"] == len(payload) + assert record["payload_checksum"] == "sha256:frame" + assert record["binary"] is None + assert ctx.fetch_payload(record["id"]) == payload