Skip to content

Commit 237a365

Browse files
committed
Merge remote-tracking branch 'origin/main' into rupei/kv-indexer-python-launcher-pr
Signed-off-by: PeaBrane <yanrpei@gmail.com> # Conflicts: # lib/bindings/python/Cargo.toml # lib/bindings/python/rust/lib.rs # tests/router/test_router_e2e_with_mockers.py
2 parents 99ec893 + b6596c5 commit 237a365

File tree

12 files changed

+2013
-54
lines changed

12 files changed

+2013
-54
lines changed

README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ Built in Rust for performance and Python for extensibility, Dynamo is fully open
5050

5151
| | [SGLang](docs/backends/sglang/README.md) | [TensorRT-LLM](docs/backends/trtllm/README.md) | [vLLM](docs/backends/vllm/README.md) |
5252
|---|:----:|:----------:|:--:|
53-
| **Best For** | High-throughput serving | Maximum performance | Broadest feature coverage |
5453
| [**Disaggregated Serving**](docs/design-docs/disagg-serving.md) ||||
5554
| [**KV-Aware Routing**](docs/components/router/README.md) ||||
5655
| [**SLA-Based Planner**](docs/components/planner/planner-guide.md) ||||

container/templates/wheel_builder.Dockerfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -439,9 +439,9 @@ RUN --mount=type=secret,id=aws-key-id,env=AWS_ACCESS_KEY_ID \
439439
uv build --wheel --out-dir /opt/dynamo/dist && \
440440
cd /opt/dynamo/lib/bindings/python && \
441441
if [ "$ENABLE_MEDIA_FFMPEG" = "true" ]; then \
442-
maturin build --release --features "media-ffmpeg,kv-indexer" --out /opt/dynamo/dist; \
442+
maturin build --release --features "media-ffmpeg" --out /opt/dynamo/dist; \
443443
else \
444-
maturin build --release --features "kv-indexer" --out /opt/dynamo/dist; \
444+
maturin build --release --out /opt/dynamo/dist; \
445445
fi && \
446446
/tmp/use-sccache.sh show-stats "Dynamo Runtime"
447447

lib/bindings/python/Cargo.lock

Lines changed: 77 additions & 43 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/velo-transports/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ mod address;
3131

3232
pub mod tcp;
3333

34+
#[cfg(unix)]
35+
pub mod uds;
36+
3437
// #[cfg(feature = "ucx")]
3538
// pub mod ucx;
3639

lib/velo-transports/src/tcp/transport.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -392,9 +392,9 @@ async fn connection_writer_task(
392392
instance_id: crate::InstanceId,
393393
rx: flume::Receiver<SendTask>,
394394
connections: Arc<DashMap<crate::InstanceId, ConnectionHandle>>,
395-
_cancel_token: CancellationToken,
395+
cancel_token: CancellationToken,
396396
) -> Result<()> {
397-
let result = connection_writer_inner(addr, instance_id, &rx).await;
397+
let result = connection_writer_inner(addr, instance_id, &rx, &cancel_token).await;
398398

399399
// Always drain queued messages and notify their error handlers.
400400
//
@@ -426,10 +426,14 @@ async fn connection_writer_inner(
426426
addr: SocketAddr,
427427
instance_id: crate::InstanceId,
428428
rx: &flume::Receiver<SendTask>,
429+
cancel_token: &CancellationToken,
429430
) -> Result<()> {
430431
debug!("Connecting to {}", addr);
431432

432-
let mut stream = TcpStream::connect(addr).await.context("connect failed")?;
433+
let mut stream = tokio::select! {
434+
_ = cancel_token.cancelled() => return Ok(()),
435+
res = TcpStream::connect(addr) => res.context("connect failed")?,
436+
};
433437

434438
if let Err(e) = stream.set_nodelay(true) {
435439
warn!("Failed to set TCP_NODELAY: {}", e);
@@ -450,7 +454,14 @@ async fn connection_writer_inner(
450454

451455
debug!("Connected to {}", addr);
452456

453-
while let Ok(msg) = rx.recv_async().await {
457+
loop {
458+
let msg = tokio::select! {
459+
_ = cancel_token.cancelled() => break,
460+
res = rx.recv_async() => match res {
461+
Ok(msg) => msg,
462+
Err(_) => break,
463+
},
464+
};
454465
if let Err(e) =
455466
TcpFrameCodec::encode_frame(&mut stream, msg.msg_type, &msg.header, &msg.payload).await
456467
{

0 commit comments

Comments
 (0)