Skip to content

Commit 794c0a4

Browse files
authored
feat(keyvalue): Filesystem backed KeyValueStore (#4138)
Signed-off-by: Graham King <[email protected]>
1 parent 3fd0ab3 commit 794c0a4

File tree

46 files changed

+751
-216
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+751
-216
lines changed

Cargo.lock

Lines changed: 23 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

components/src/dynamo/frontend/main.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,12 @@ def parse_args():
225225
),
226226
help=f"Interval in seconds for polling custom backend metrics. Set to > 0 to enable polling (default: 0=disabled, suggested: 9.2s which is less than typical Prometheus scrape interval). Can be set via {CUSTOM_BACKEND_METRICS_POLLING_INTERVAL_ENV_VAR} env var.",
227227
)
228+
parser.add_argument(
229+
"--store-kv",
230+
type=str,
231+
default=os.environ.get("DYN_STORE_KV", "etcd"),
232+
help="Which key-value backend to use: etcd, mem, file. Etcd uses the ETCD_* env vars (e.g. ETCD_ENPOINTS) for connection details. File uses root dir from env var DYN_FILE_KV or defaults to $TMPDIR/dynamo_store_kv.",
233+
)
228234

229235
flags = parser.parse_args()
230236

@@ -252,8 +258,7 @@ async def async_main():
252258
os.environ["DYN_METRICS_PREFIX"] = flags.metrics_prefix
253259

254260
loop = asyncio.get_running_loop()
255-
256-
runtime = DistributedRuntime(loop, is_static)
261+
runtime = DistributedRuntime(loop, flags.store_kv, is_static)
257262

258263
def signal_handler():
259264
asyncio.create_task(graceful_shutdown(runtime))

components/src/dynamo/mocker/args.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,12 @@ def parse_args():
204204
default=False,
205205
help="Mark this as a decode worker which does not publish KV events and skips prefill cost estimation (default: False)",
206206
)
207+
parser.add_argument(
208+
"--store-kv",
209+
type=str,
210+
default=os.environ.get("DYN_STORE_KV", "etcd"),
211+
help="Which key-value backend to use: etcd, mem, file. Etcd uses the ETCD_* env vars (e.g. ETCD_ENPOINTS) for connection details. File uses root dir from env var DYN_FILE_KV or defaults to $TMPDIR/dynamo_store_kv.",
212+
)
207213

208214
args = parser.parse_args()
209215
validate_worker_type_args(args)

components/src/dynamo/mocker/main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ async def launch_workers(args, extra_engine_args_path):
7272
logger.info(f"Creating mocker worker {worker_id + 1}/{args.num_workers}")
7373

7474
# Create a separate DistributedRuntime for this worker (on same event loop)
75-
runtime = DistributedRuntime(loop, False)
75+
runtime = DistributedRuntime(loop, args.store_kv, False)
7676
runtimes.append(runtime)
7777

7878
# Create EntrypointArgs for this worker

components/src/dynamo/sglang/args.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,12 @@
9393
"default": None,
9494
"help": "Dump debug config to the specified file path. If not specified, the config will be dumped to stdout at INFO level.",
9595
},
96+
"store-kv": {
97+
"flags": ["--store-kv"],
98+
"type": str,
99+
"default": os.environ.get("DYN_STORE_KV", "etcd"),
100+
"help": "Which key-value backend to use: etcd, mem, file. Etcd uses the ETCD_* env vars (e.g. ETCD_ENPOINTS) for connection details. File uses root dir from env var DYN_FILE_KV or defaults to $TMPDIR/dynamo_store_kv.",
101+
},
96102
}
97103

98104

@@ -102,6 +108,7 @@ class DynamoArgs:
102108
component: str
103109
endpoint: str
104110
migration_limit: int
111+
store_kv: str
105112

106113
# tool and reasoning parser options
107114
tool_call_parser: Optional[str] = None
@@ -329,6 +336,7 @@ async def parse_args(args: list[str]) -> Config:
329336
component=parsed_component_name,
330337
endpoint=parsed_endpoint_name,
331338
migration_limit=parsed_args.migration_limit,
339+
store_kv=parsed_args.store_kv,
332340
tool_call_parser=tool_call_parser,
333341
reasoning_parser=reasoning_parser,
334342
custom_jinja_template=expanded_template_path,

components/src/dynamo/sglang/main.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
from dynamo.common.config_dump import dump_config
1313
from dynamo.llm import ModelInput, ModelType
14-
from dynamo.runtime import DistributedRuntime, dynamo_worker
14+
from dynamo.runtime import DistributedRuntime
1515
from dynamo.runtime.logging import configure_dynamo_logging
1616
from dynamo.sglang.args import Config, DisaggregationMode, parse_args
1717
from dynamo.sglang.health_check import (
@@ -33,9 +33,12 @@
3333
configure_dynamo_logging()
3434

3535

36-
@dynamo_worker(static=False)
37-
async def worker(runtime: DistributedRuntime):
36+
async def worker():
37+
config = await parse_args(sys.argv[1:])
38+
dump_config(config.dynamo_args.dump_config_to, config)
39+
3840
loop = asyncio.get_running_loop()
41+
runtime = DistributedRuntime(loop, config.dynamo_args.store_kv, False)
3942

4043
def signal_handler():
4144
asyncio.create_task(graceful_shutdown(runtime))
@@ -45,9 +48,6 @@ def signal_handler():
4548

4649
logging.info("Signal handlers will trigger a graceful shutdown of the runtime")
4750

48-
config = await parse_args(sys.argv[1:])
49-
dump_config(config.dynamo_args.dump_config_to, config)
50-
5151
if config.dynamo_args.embedding_worker:
5252
await init_embedding(runtime, config)
5353
elif config.dynamo_args.multimodal_processor:

components/src/dynamo/trtllm/main.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
from dynamo.common.config_dump import dump_config
4040
from dynamo.common.utils.prometheus import register_engine_metrics_callback
4141
from dynamo.llm import ModelInput, ModelRuntimeConfig, ModelType, register_llm
42-
from dynamo.runtime import DistributedRuntime, dynamo_worker
42+
from dynamo.runtime import DistributedRuntime
4343
from dynamo.runtime.logging import configure_dynamo_logging
4444
from dynamo.trtllm.engine import TensorRTLLMEngine, get_llm_engine
4545
from dynamo.trtllm.health_check import TrtllmHealthCheckPayload
@@ -102,11 +102,13 @@ async def get_engine_runtime_config(
102102
return runtime_config
103103

104104

105-
@dynamo_worker(static=False)
106-
async def worker(runtime: DistributedRuntime):
107-
# Set up signal handler for graceful shutdown
105+
async def worker():
106+
config = cmd_line_args()
107+
108108
loop = asyncio.get_running_loop()
109+
runtime = DistributedRuntime(loop, config.store_kv, False)
109110

111+
# Set up signal handler for graceful shutdown
110112
def signal_handler():
111113
# Schedule the shutdown coroutine instead of calling it directly
112114
asyncio.create_task(graceful_shutdown(runtime))
@@ -116,7 +118,6 @@ def signal_handler():
116118

117119
logging.info("Signal handlers set up for graceful shutdown")
118120

119-
config = cmd_line_args()
120121
await init(runtime, config)
121122

122123

components/src/dynamo/trtllm/utils/trtllm_utils.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ def __init__(self) -> None:
5858
self.tool_call_parser: Optional[str] = None
5959
self.dump_config_to: Optional[str] = None
6060
self.custom_jinja_template: Optional[str] = None
61+
self.store_kv: str = ""
6162

6263
def __str__(self) -> str:
6364
return (
@@ -87,8 +88,9 @@ def __str__(self) -> str:
8788
f"max_file_size_mb={self.max_file_size_mb}, "
8889
f"reasoning_parser={self.reasoning_parser}, "
8990
f"tool_call_parser={self.tool_call_parser}, "
90-
f"dump_config_to={self.dump_config_to},"
91-
f"custom_jinja_template={self.custom_jinja_template}"
91+
f"dump_config_to={self.dump_config_to}, "
92+
f"custom_jinja_template={self.custom_jinja_template}, "
93+
f"store_kv={self.store_kv}"
9294
)
9395

9496

@@ -278,6 +280,12 @@ def cmd_line_args():
278280
default=None,
279281
help="Path to a custom Jinja template file to override the model's default chat template. This template will take precedence over any template found in the model repository.",
280282
)
283+
parser.add_argument(
284+
"--store-kv",
285+
type=str,
286+
default=os.environ.get("DYN_STORE_KV", "etcd"),
287+
help="Which key-value backend to use: etcd, mem, file. Etcd uses the ETCD_* env vars (e.g. ETCD_ENPOINTS) for connection details. File uses root dir from env var DYN_FILE_KV or defaults to $TMPDIR/dynamo_store_kv.",
288+
)
281289

282290
args = parser.parse_args()
283291

@@ -337,6 +345,7 @@ def cmd_line_args():
337345
config.reasoning_parser = args.dyn_reasoning_parser
338346
config.tool_call_parser = args.dyn_tool_call_parser
339347
config.dump_config_to = args.dump_config_to
348+
config.store_kv = args.store_kv
340349

341350
# Handle custom jinja template path expansion (environment variables and home directory)
342351
if args.custom_jinja_template:

components/src/dynamo/vllm/args.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ class Config:
3838
migration_limit: int = 0
3939
kv_port: Optional[int] = None
4040
custom_jinja_template: Optional[str] = None
41+
store_kv: str
4142

4243
# mirror vLLM
4344
model: str
@@ -164,6 +165,12 @@ def parse_args() -> Config:
164165
"'USER: <image> please describe the image ASSISTANT:'."
165166
),
166167
)
168+
parser.add_argument(
169+
"--store-kv",
170+
type=str,
171+
default=os.environ.get("DYN_STORE_KV", "etcd"),
172+
help="Which key-value backend to use: etcd, mem, file. Etcd uses the ETCD_* env vars (e.g. ETCD_ENPOINTS) for connection details. File uses root dir from env var DYN_FILE_KV or defaults to $TMPDIR/dynamo_store_kv.",
173+
)
167174
add_config_dump_args(parser)
168175

169176
parser = AsyncEngineArgs.add_cli_args(parser)
@@ -233,6 +240,7 @@ def parse_args() -> Config:
233240
config.multimodal_worker = args.multimodal_worker
234241
config.multimodal_encode_prefill_worker = args.multimodal_encode_prefill_worker
235242
config.mm_prompt_template = args.mm_prompt_template
243+
config.store_kv = args.store_kv
236244

237245
# Validate custom Jinja template file exists if provided
238246
if config.custom_jinja_template is not None:

components/src/dynamo/vllm/main.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
fetch_llm,
2626
register_llm,
2727
)
28-
from dynamo.runtime import DistributedRuntime, dynamo_worker
28+
from dynamo.runtime import DistributedRuntime
2929
from dynamo.runtime.logging import configure_dynamo_logging
3030
from dynamo.vllm.multimodal_handlers import (
3131
EncodeWorkerHandler,
@@ -70,16 +70,16 @@ async def graceful_shutdown(runtime):
7070
logging.info("DistributedRuntime shutdown complete")
7171

7272

73-
@dynamo_worker(static=False)
74-
async def worker(runtime: DistributedRuntime):
73+
async def worker():
7574
config = parse_args()
7675

76+
loop = asyncio.get_running_loop()
77+
runtime = DistributedRuntime(loop, config.store_kv, False)
78+
7779
await configure_ports(config)
7880
overwrite_args(config)
7981

8082
# Set up signal handler for graceful shutdown
81-
loop = asyncio.get_running_loop()
82-
8383
def signal_handler():
8484
asyncio.create_task(graceful_shutdown(runtime))
8585

0 commit comments

Comments
 (0)