Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,23 @@ ctx.add("assistant", image)

print("Current version:", ctx.version())

# External media references: keep large media in object storage (GCS/S3/local)
# and reference it from a record by a typed URI, instead of inlining the bytes.
object_uri = "gs://my-bucket/media/diagram-001.png"
ctx.put_payload(object_uri, image_bytes) # offload bytes via the context's storage_options
ctx.add(
"assistant",
"incident timeline diagram", # inline caption; the media stays in the bucket
content_type="image/png",
external_id="diagram-001",
payload_uri=object_uri,
payload_size=len(image_bytes),
)
# list/search/get return the reference without fetching the bytes; resolve on demand:
record = ctx.get(external_id="diagram-001")
media_bytes = ctx.fetch_payload(record["id"])


# Batch append source chunks in one storage operation
ctx.add_many([
{
Expand Down Expand Up @@ -424,6 +441,9 @@ let record = ContextRecord {
content_type: "text/plain".into(),
text_payload: Some("hello world".into()),
binary_payload: None,
payload_uri: None,
payload_size: None,
payload_checksum: None,
embedding: None,
};
store.add(&[record]).await?;
Expand All @@ -446,6 +466,7 @@ We are tracking future enhancements as GitHub issues:
- ~~[Support standard storage_options / GCS](https://github.com/lance-format/lance-context/issues/45)~~ ✅ **Implemented**
- [Add relationship column for GraphRAG workflows](https://github.com/lance-format/lance-context/issues/15)
- ~~[Background compaction for Lance fragments](https://github.com/lance-format/lance-context/issues/16)~~ ✅ **Implemented**
- ~~[External media references — store large media in object storage by typed URI](https://github.com/lance-format/lance-context/issues/115)~~ ✅ **Implemented**

Contributions are welcome—feel free to comment on the issues above or open your own proposals.

Expand Down
78 changes: 78 additions & 0 deletions crates/lance-context-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,14 @@ pub struct AddRecordRequest {
deserialize_with = "deserialize_base64_opt"
)]
pub binary_payload: Option<Vec<u8>>,
/// Typed reference to a payload object stored outside the dataset
/// (e.g. `gs://bucket/prefix/<id>`). Distinct from inline `binary_payload`.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub payload_uri: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub payload_size: Option<i64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub payload_checksum: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub embedding: Option<Vec<f32>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
Expand Down Expand Up @@ -280,6 +288,12 @@ pub struct RecordPatchDto {
pub retired_reason: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub embedding: Option<Vec<f32>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub payload_uri: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub payload_size: Option<i64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub payload_checksum: Option<String>,
}

impl RecordPatchDto {
Expand All @@ -298,6 +312,9 @@ impl RecordPatchDto {
&& self.retired_at.is_none()
&& self.retired_reason.is_none()
&& self.embedding.is_none()
&& self.payload_uri.is_none()
&& self.payload_size.is_none()
&& self.payload_checksum.is_none()
}
}

Expand Down Expand Up @@ -348,6 +365,12 @@ pub struct RecordDto {
)]
pub binary_payload: Option<Vec<u8>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub payload_uri: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub payload_size: Option<i64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub payload_checksum: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub embedding: Option<Vec<f32>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub state_metadata: Option<StateMetadataDto>,
Expand Down Expand Up @@ -602,4 +625,59 @@ mod tests {
assert!(req.include_expired);
assert!(req.include_retired);
}

#[test]
fn add_request_omits_payload_reference_when_absent() {
// Records without an external reference must not emit the new keys, so
// older servers/clients keep round-tripping unchanged.
let req = AddRecordRequest {
role: "user".to_string(),
content_type: "text/plain".to_string(),
text_payload: Some("hi".to_string()),
..Default::default()
};
let json = serde_json::to_string(&req).unwrap();
assert!(!json.contains("payload_uri"));
assert!(!json.contains("payload_size"));
assert!(!json.contains("payload_checksum"));
}

#[test]
fn add_request_roundtrips_payload_reference() {
let req = AddRecordRequest {
role: "user".to_string(),
content_type: "image/png".to_string(),
payload_uri: Some("gs://bucket/prefix/obj.png".to_string()),
payload_size: Some(2048),
payload_checksum: Some("sha256:abc".to_string()),
..Default::default()
};
let json = serde_json::to_string(&req).unwrap();
let back: AddRecordRequest = serde_json::from_str(&json).unwrap();
assert_eq!(
back.payload_uri.as_deref(),
Some("gs://bucket/prefix/obj.png")
);
assert_eq!(back.payload_size, Some(2048));
assert_eq!(back.payload_checksum.as_deref(), Some("sha256:abc"));
}

#[test]
fn record_dto_decodes_payload_reference_and_legacy_shape() {
// New shape with a reference.
let dto: RecordDto = serde_json::from_str(
r#"{"id":"r1","run_id":"run","created_at":"2026-06-27T00:00:00Z","role":"user","content_type":"image/png","lifecycle_status":"active","payload_uri":"s3://b/obj","payload_size":10}"#,
)
.unwrap();
assert_eq!(dto.payload_uri.as_deref(), Some("s3://b/obj"));
assert_eq!(dto.payload_size, Some(10));
assert_eq!(dto.payload_checksum, None);

// Legacy shape lacking the reference fields still decodes.
let legacy: RecordDto = serde_json::from_str(
r#"{"id":"r1","run_id":"run","created_at":"2026-06-27T00:00:00Z","role":"user","content_type":"text/plain","lifecycle_status":"active"}"#,
)
.unwrap();
assert_eq!(legacy.payload_uri, None);
}
}
25 changes: 25 additions & 0 deletions crates/lance-context-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ impl RemoteContextStore {
cached_version: info.version,
})
}

/// Resolve a record's external payload reference to its raw bytes via the
/// server. Errors if no such record or the record has no external reference.
pub async fn fetch_payload(&self, id: &str) -> ContextResult<Vec<u8>> {
self.client
.fetch_record_payload(&self.context_name, id)
.await
.map_err(to_ctx_err)
}
}

impl ContextStoreApi for RemoteContextStore {
Expand Down Expand Up @@ -403,6 +412,22 @@ impl ContextClient {
Self::handle_response(resp).await
}

/// Resolve a record's external payload reference to its raw bytes via the
/// server, which fetches from object storage using the context's
/// `storage_options`. Returns the raw payload bytes on success.
pub async fn fetch_record_payload(&self, name: &str, id: &str) -> Result<Vec<u8>, ClientError> {
let resp = self
.http
.get(self.url(&format!("/contexts/{}/records/{}/payload", name, id)))
.send()
.await?;
if resp.status().is_success() {
Ok(resp.bytes().await?.to_vec())
} else {
Err(Self::extract_error(resp).await)
}
}

pub async fn delete_record(
&self,
name: &str,
Expand Down
9 changes: 9 additions & 0 deletions crates/lance-context-core/src/api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,9 @@ fn patch_from_dto(patch: &RecordPatchDto) -> RecordPatch {
retired_at: patch.retired_at,
retired_reason: patch.retired_reason.clone(),
embedding: patch.embedding.clone(),
payload_uri: patch.payload_uri.clone(),
payload_size: patch.payload_size,
payload_checksum: patch.payload_checksum.clone(),
}
}

Expand Down Expand Up @@ -445,6 +448,9 @@ fn record_from_add_request(r: &AddRecordRequest, id: String, run_id: String) ->
content_type: r.content_type.clone(),
text_payload: r.text_payload.clone(),
binary_payload: r.binary_payload.clone(),
payload_uri: r.payload_uri.clone(),
payload_size: r.payload_size,
payload_checksum: r.payload_checksum.clone(),
embedding: r.embedding.clone(),
}
}
Expand All @@ -463,6 +469,9 @@ fn record_to_dto(r: ContextRecord) -> RecordDto {
content_type: r.content_type,
text_payload: r.text_payload,
binary_payload: r.binary_payload,
payload_uri: r.payload_uri,
payload_size: r.payload_size,
payload_checksum: r.payload_checksum,
embedding: r.embedding,
state_metadata: r.state_metadata.map(|sm| StateMetadataDto {
step: sm.step,
Expand Down
3 changes: 3 additions & 0 deletions crates/lance-context-core/src/eval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,9 @@ mod tests {
content_type: "text/plain".to_string(),
text_payload: Some(text.to_string()),
binary_payload: None,
payload_uri: None,
payload_size: None,
payload_checksum: None,
embedding: Some(embedding),
}
}
Expand Down
3 changes: 3 additions & 0 deletions crates/lance-context-core/src/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1249,6 +1249,9 @@ mod tests {
content_type: "text/plain".to_string(),
text_payload: Some(text.to_string()),
binary_payload: None,
payload_uri: None,
payload_size: None,
payload_checksum: None,
embedding: None,
}
}
Expand Down
3 changes: 3 additions & 0 deletions crates/lance-context-core/src/namespace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,9 @@ mod tests {
content_type: CONTENT_TYPE_TEXT.to_string(),
text_payload: Some("hello".to_string()),
binary_payload: None,
payload_uri: None,
payload_size: None,
payload_checksum: None,
embedding: None,
}
}
Expand Down
21 changes: 21 additions & 0 deletions crates/lance-context-core/src/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ pub struct ContextRecord {
pub content_type: String,
pub text_payload: Option<String>,
pub binary_payload: Option<Vec<u8>>,
/// Typed reference to a payload object stored outside the Lance dataset
/// (e.g. `gs://bucket/prefix/<id>`). Large media lives in object storage and
/// is fetched on demand via [`ContextStore::fetch_payload`], leaving the
/// dataset to hold only metadata, embeddings, and the reference. Distinct
/// from inline [`Self::binary_payload`], which stays the small-payload path.
pub payload_uri: Option<String>,
/// Size in bytes of the externally-referenced payload, when known.
pub payload_size: Option<i64>,
/// Caller-supplied checksum of the externally-referenced payload, when known.
pub payload_checksum: Option<String>,
pub embedding: Option<Vec<f32>>,
}

Expand Down Expand Up @@ -172,6 +182,11 @@ pub struct RecordPatch {
/// Vector embedding to attach to the record. Enables deferred embedding
/// workflows: append raw text first, then enrich with an embedding later.
pub embedding: Option<Vec<f32>>,
/// External payload reference to attach to the record. Enables attaching a
/// `gs://…`/`s3://…` media reference after the record was first created.
pub payload_uri: Option<String>,
pub payload_size: Option<i64>,
pub payload_checksum: Option<String>,
}

impl RecordPatch {
Expand All @@ -190,6 +205,9 @@ impl RecordPatch {
&& self.retired_at.is_none()
&& self.retired_reason.is_none()
&& self.embedding.is_none()
&& self.payload_uri.is_none()
&& self.payload_size.is_none()
&& self.payload_checksum.is_none()
}
}

Expand Down Expand Up @@ -439,6 +457,9 @@ mod tests {
content_type: "text/plain".to_string(),
text_payload: Some("hello".to_string()),
binary_payload: None,
payload_uri: None,
payload_size: None,
payload_checksum: None,
embedding: None,
}
}
Expand Down
Loading
Loading