@@ -41,11 +41,9 @@ class Question(BaseModel):
4141# Configure your language model and 'asyncify' your DSPy program.
4242lm = dspy.LM(" openai/gpt-4o-mini" )
4343dspy.settings.configure(lm = lm, async_max_workers = 4 ) # default is 8
44+ dspy_program = dspy.ChainOfThought(" question -> answer" )
45+ dspy_program = dspy.asyncify(dspy_program)
4446
45- dspy_program = dspy.asyncify(dspy.ChainOfThought(" question -> answer" ))
46- streaming_dspy_program = dspy.streamify(dspy_program)
47-
48- # Define an endpoint (no streaming)
4947@app.post (" /predict" )
5048async def predict (question : Question):
5149 try :
@@ -56,32 +54,6 @@ async def predict(question: Question):
5654 }
5755 except Exception as e:
5856 raise HTTPException(status_code = 500 , detail = str (e))
59-
60- # Define an endpoint (streaming)
61- from fastapi.responses import StreamingResponse
62-
63- @app.post (" /predict/stream" )
64- async def stream (question : Question):
65- async def generate ():
66- async for value in streaming_dspy_program(question = question.text):
67- if isinstance (value, dspy.Prediction):
68- data = {" prediction" : value.labels().toDict()}
69- elif isinstance (value, litellm.ModelResponse):
70- data = {" chunk" : value.json()}
71- yield f " data: { ujson.dumps(data)} \n\n "
72- yield " data: [DONE]\n\n "
73-
74- return StreamingResponse(generate(), media_type = " text/event-stream" )
75-
76- # Since you're often going to want to stream the result of a DSPy program as server-sent events,
77- # we've included a helper function for that, which is equivalent to the code above.
78-
79- from dspy.utils.streaming import streaming_response
80-
81- @app.post (" /predict/stream" )
82- async def stream (question : Question):
83- stream = streaming_dspy_program(question = question.text)
84- return StreamingResponse(streaming_response(stream), media_type = " text/event-stream" )
8557```
8658
8759In the code above, we call ` dspy.asyncify ` to convert the dspy program to run in async mode for high-throughput FastAPI
@@ -91,9 +63,41 @@ By default, the limit of spawned threads is 8. Think of this like a worker pool.
9163If you have 8 in-flight programs and call it once more, the 9th call will wait until one of the 8 returns.
9264You can configure the async capacity using the new ` async_max_workers ` setting.
9365
94- We also use ` dspy.streamify ` to convert the dspy program to a streaming mode. This is useful when you want to stream
95- the intermediate outputs (i.e. O1-style reasoning) to the client before the final prediction is ready. This uses
96- asyncify under the hood and inherits the execution semantics.
66+ ??? "Streaming, in DSPy 2.6.0+"
67+
68+ Streaming is also supported in DSPy 2.6.0+, available as a release candidate via `pip install -U --pre dspy`.
69+
70+ We can use `dspy.streamify` to convert the dspy program to a streaming mode. This is useful when you want to stream
71+ the intermediate outputs (i.e. O1-style reasoning) to the client before the final prediction is ready. This uses
72+ asyncify under the hood and inherits the execution semantics.
73+
74+ ```python
75+ dspy_program = dspy.asyncify(dspy.ChainOfThought("question -> answer"))
76+ streaming_dspy_program = dspy.streamify(dspy_program)
77+
78+ @app.post("/predict/stream")
79+ async def stream(question: Question):
80+ async def generate():
81+ async for value in streaming_dspy_program(question=question.text):
82+ if isinstance(value, dspy.Prediction):
83+ data = {"prediction": value.labels().toDict()}
84+ elif isinstance(value, litellm.ModelResponse):
85+ data = {"chunk": value.json()}
86+ yield f"data: {ujson.dumps(data)}\n\n"
87+ yield "data: [DONE]\n\n"
88+
89+ return StreamingResponse(generate(), media_type="text/event-stream")
90+
91+ # Since you're often going to want to stream the result of a DSPy program as server-sent events,
92+ # we've included a helper function for that, which is equivalent to the code above.
93+
94+ from dspy.utils.streaming import streaming_response
95+
96+ @app.post("/predict/stream")
97+ async def stream(question: Question):
98+ stream = streaming_dspy_program(question=question.text)
99+ return StreamingResponse(streaming_response(stream), media_type="text/event-stream")
100+ ```
97101
98102Write your code to a file, e.g., ` fastapi_dspy.py ` . Then you can serve the app with:
99103
0 commit comments