Skip to content

Commit 5c48526

Browse files
committed
Merge branch 'main' into expr-union-type-impl
2 parents 4832227 + 4968277 commit 5c48526

File tree

9 files changed

+134
-64
lines changed

9 files changed

+134
-64
lines changed

Cargo.lock

Lines changed: 63 additions & 27 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ pyo3-async-runtimes = { version = "0.24.0", features = ["tokio-runtime"] }
2020

2121
anyhow = { version = "1.0.97", features = ["std"] }
2222
async-trait = "0.1.88"
23-
axum = "0.7.9"
24-
axum-extra = { version = "0.9.6", features = ["query"] }
23+
axum = "0.8.4"
24+
axum-extra = { version = "0.10.1", features = ["query"] }
2525
base64 = "0.22.1"
2626
chrono = "0.4.40"
2727
config = "0.14.1"

docs/docs/getting_started/overview.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ CocoIndex is an ultra-performant real-time data transformation framework for AI,
99

1010
As a data framework, CocoIndex takes it to the next level on data freshness. **Incremental processing** is one of the core values provided by CocoIndex.
1111

12+
![Incremental Processing](/img/incremental-etl.gif)
13+
1214
## Programming Model
1315
CocoIndex follows the idea of [Dataflow programming](https://en.wikipedia.org/wiki/Dataflow_programming) model. Each transformation creates a new field solely based on input fields, without hidden states and value mutation. All data before/after each transformation is observable, with lineage out of the box.
1416

367 KB
Loading

examples/fastapi_server_docker/main.py

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
import uvicorn
33
from dotenv import load_dotenv
44
from fastapi import FastAPI, Query
5+
from fastapi import Request
56
from psycopg_pool import ConnectionPool
7+
from contextlib import asynccontextmanager
68
import os
79

810

@@ -86,27 +88,31 @@ def search(pool: ConnectionPool, query: str, top_k: int = 5):
8688
]
8789

8890

89-
fastapi_app = FastAPI()
90-
91-
92-
@fastapi_app.on_event("startup")
93-
def startup_event():
91+
@asynccontextmanager
92+
def lifespan(app: FastAPI):
9493
load_dotenv()
9594
cocoindex.init()
96-
# Initialize database connection pool
97-
fastapi_app.state.pool = ConnectionPool(os.getenv("COCOINDEX_DATABASE_URL"))
95+
pool = ConnectionPool(os.getenv("COCOINDEX_DATABASE_URL"))
96+
app.state.pool = pool
97+
try:
98+
yield
99+
finally:
100+
pool.close()
101+
102+
103+
fastapi_app = FastAPI(lifespan=lifespan)
98104

99105

100106
@fastapi_app.get("/search")
101107
def search_endpoint(
108+
request: Request,
102109
q: str = Query(..., description="Search query"),
103110
limit: int = Query(5, description="Number of results"),
104111
):
105-
results = search(fastapi_app.state.pool, q, limit)
112+
pool = request.app.state.pool
113+
results = search(pool, q, limit)
106114
return {"results": results}
107115

108116

109117
if __name__ == "__main__":
110-
load_dotenv()
111-
cocoindex.init()
112118
uvicorn.run(fastapi_app, host="0.0.0.0", port=8080)

src/ops/py_factory.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
use std::sync::Arc;
22

3-
use axum::async_trait;
4-
use futures::{future::BoxFuture, FutureExt};
3+
use async_trait::async_trait;
4+
use futures::{FutureExt, future::BoxFuture};
55
use pyo3::{
6-
pyclass, pymethods,
6+
IntoPyObjectExt, Py, PyAny, Python, pyclass, pymethods,
77
types::{IntoPyDict, PyString, PyTuple},
8-
IntoPyObjectExt, Py, PyAny, Python,
98
};
109
use pythonize::pythonize;
1110

@@ -14,7 +13,7 @@ use crate::{
1413
builder::plan,
1514
py::{self, FromPyResult},
1615
};
17-
use anyhow::{anyhow, Result};
16+
use anyhow::{Result, anyhow};
1817

1918
use super::interface::{FlowInstanceContext, SimpleFunctionExecutor, SimpleFunctionFactory};
2019

src/server.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::prelude::*;
22

33
use crate::{lib_context::LibContext, service};
4-
use axum::{routing, Router};
4+
use axum::{Router, routing};
55
use tower::ServiceBuilder;
66
use tower_http::{
77
cors::{AllowOrigin, CorsLayer},
@@ -46,31 +46,31 @@ pub async fn init_server(
4646
Router::new()
4747
.route("/flows", routing::get(service::flows::list_flows))
4848
.route(
49-
"/flows/:flowInstName",
50-
routing::get(service::flows::get_flow_spec),
49+
"/flows/{flowInstName}",
50+
routing::get(service::flows::get_flow),
5151
)
5252
.route(
53-
"/flows/:flowInstName/schema",
53+
"/flows/{flowInstName}/schema",
5454
routing::get(service::flows::get_flow_schema),
5555
)
5656
.route(
57-
"/flows/:flowInstName/keys",
57+
"/flows/{flowInstName}/keys",
5858
routing::get(service::flows::get_keys),
5959
)
6060
.route(
61-
"/flows/:flowInstName/data",
61+
"/flows/{flowInstName}/data",
6262
routing::get(service::flows::evaluate_data),
6363
)
6464
.route(
65-
"/flows/:flowInstName/rowStatus",
65+
"/flows/{flowInstName}/rowStatus",
6666
routing::get(service::flows::get_row_indexing_status),
6767
)
6868
.route(
69-
"/flows/:flowInstName/update",
69+
"/flows/{flowInstName}/update",
7070
routing::post(service::flows::update),
7171
)
7272
.route(
73-
"/flows/:flowInstName/search",
73+
"/flows/{flowInstName}/search",
7474
routing::get(service::search::search),
7575
)
7676
.layer(

src/service/error.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
1-
use anyhow::anyhow;
1+
use crate::prelude::*;
2+
23
use axum::{
4+
Json,
35
http::StatusCode,
46
response::{IntoResponse, Response},
57
};
6-
use log::debug;
78
use pyo3::{exceptions::PyException, prelude::*};
89
use std::{
910
error::Error,
1011
fmt::{Debug, Display},
11-
sync::Arc,
1212
};
1313

1414
#[derive(Debug)]
@@ -38,10 +38,18 @@ impl Error for ApiError {
3838
}
3939
}
4040

41+
#[derive(Serialize)]
42+
struct ErrorResponse {
43+
error: String,
44+
}
45+
4146
impl IntoResponse for ApiError {
4247
fn into_response(self) -> Response {
4348
debug!("Internal server error:\n{:?}", self.err);
44-
(self.status_code, format!("{:#}", self.err)).into_response()
49+
let error_response = ErrorResponse {
50+
error: self.err.to_string(),
51+
};
52+
(self.status_code, Json(error_response)).into_response()
4553
}
4654
}
4755

src/service/flows.rs

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ use crate::execution::{evaluator, indexing_status, memoization, row_indexer, sta
44
use crate::lib_context::LibContext;
55
use crate::{base::schema::FlowSchema, ops::interface::SourceExecutorListOptions};
66
use axum::{
7+
Json,
78
extract::{Path, State},
89
http::StatusCode,
9-
Json,
1010
};
1111
use axum_extra::extract::Query;
1212

@@ -18,20 +18,39 @@ pub async fn list_flows(
1818
))
1919
}
2020

21-
pub async fn get_flow_spec(
21+
pub async fn get_flow_schema(
2222
Path(flow_name): Path<String>,
2323
State(lib_context): State<Arc<LibContext>>,
24-
) -> Result<Json<spec::FlowInstanceSpec>, ApiError> {
24+
) -> Result<Json<FlowSchema>, ApiError> {
2525
let flow_ctx = lib_context.get_flow_context(&flow_name)?;
26-
Ok(Json(flow_ctx.flow.flow_instance.clone()))
26+
Ok(Json(flow_ctx.flow.data_schema.clone()))
2727
}
2828

29-
pub async fn get_flow_schema(
29+
#[derive(Serialize)]
30+
pub struct GetFlowResponse {
31+
flow_spec: spec::FlowInstanceSpec,
32+
data_schema: FlowSchema,
33+
fingerprint: utils::fingerprint::Fingerprint,
34+
}
35+
36+
pub async fn get_flow(
3037
Path(flow_name): Path<String>,
3138
State(lib_context): State<Arc<LibContext>>,
32-
) -> Result<Json<FlowSchema>, ApiError> {
39+
) -> Result<Json<GetFlowResponse>, ApiError> {
3340
let flow_ctx = lib_context.get_flow_context(&flow_name)?;
34-
Ok(Json(flow_ctx.flow.data_schema.clone()))
41+
let flow_spec = flow_ctx.flow.flow_instance.clone();
42+
let data_schema = flow_ctx.flow.data_schema.clone();
43+
let fingerprint = utils::fingerprint::Fingerprinter::default()
44+
.with(&flow_spec)
45+
.map_err(|e| api_error!("failed to fingerprint flow spec: {e}"))?
46+
.with(&data_schema)
47+
.map_err(|e| api_error!("failed to fingerprint data schema: {e}"))?
48+
.into_fingerprint();
49+
Ok(Json(GetFlowResponse {
50+
flow_spec,
51+
data_schema,
52+
fingerprint,
53+
}))
3554
}
3655

3756
#[derive(Deserialize)]

0 commit comments

Comments
 (0)