Skip to content

Commit 74fb1a8

Browse files
authored
Merge pull request #115 from restackio/re-act-example
Re_act example
2 parents 012ad29 + 08074c9 commit 74fb1a8

File tree

11 files changed

+375
-0
lines changed

11 files changed

+375
-0
lines changed

re_act/README.md

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
# Restack AI - ReAct Example
2+
3+
## Prerequisites
4+
5+
- Python 3.11
6+
- Poetry (for dependency management)
7+
- Docker (for running the Restack services)
8+
9+
## Usage
10+
11+
2. Open the web UI to see the workflows:
12+
13+
```bash
14+
http://localhost:5233
15+
```
16+
17+
3. Clone this repository:
18+
19+
```bash
20+
git clone https://github.com/restackio/examples-python
21+
cd examples-python/re_act
22+
```
23+
24+
4. Install dependencies using Poetry:
25+
26+
```bash
27+
poetry env use 3.11
28+
```
29+
30+
```bash
31+
poetry shell
32+
```
33+
34+
```bash
35+
poetry install
36+
```
37+
38+
```bash
39+
poetry env info # Optional: copy the interpreter path to use in your IDE (e.g. Cursor, VSCode, etc.)
40+
```
41+
42+
5. Run the services:
43+
44+
```bash
45+
poetry run services
46+
```
47+
48+
6. Schedule workflow
49+
50+
```bash
51+
poetry run schedule_workflow
52+
```
53+

re_act/pyproject.toml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
[tool.poetry]
2+
name = "re_act_example"
3+
version = "0.0.1"
4+
description = "ReAct Example"
5+
authors = [
6+
"Restack Team <[email protected]>",
7+
]
8+
packages = [{include = "src"}]
9+
10+
[tool.poetry.dependencies]
11+
python = ">=3.10,<4.0"
12+
openai = "^1.57.2"
13+
python-dotenv = "1.0.1"
14+
restack-ai = "0.0.51"
15+
sendgrid = "^6.11.0"
16+
17+
[build-system]
18+
requires = ["poetry-core"]
19+
build-backend = "poetry.core.masonry.api"
20+
21+
[tool.poetry.scripts]
22+
services = "src.services:run_services"
23+
schedule_workflow = "schedule_workflow:run_schedule_workflow"

re_act/schedule_workflow.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import asyncio
2+
import time
3+
from restack_ai import Restack
4+
5+
async def main():
6+
7+
client = Restack()
8+
9+
workflow_id = f"{int(time.time() * 1000)}-ParentWorkflow"
10+
runId = await client.schedule_workflow(
11+
workflow_name="ParentWorkflow",
12+
workflow_id=workflow_id,
13+
input={
14+
"email": "[email protected]",
15+
"current_accepted_applicants_count": 9
16+
}
17+
)
18+
19+
await client.get_workflow_result(
20+
workflow_id=workflow_id,
21+
run_id=runId
22+
)
23+
24+
exit(0)
25+
26+
def run_schedule_workflow():
27+
asyncio.run(main())
28+
29+
if __name__ == "__main__":
30+
run_schedule_workflow()

re_act/src/client.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from restack_ai import Restack
2+
3+
client = Restack()

re_act/src/functions/decide.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
from restack_ai.function import function, log, FunctionFailure
2+
from dataclasses import dataclass
3+
from openai import OpenAI
4+
import os
5+
from dotenv import load_dotenv
6+
7+
load_dotenv()
8+
9+
@dataclass
10+
class DecideInput:
11+
email: str
12+
current_accepted_applicants_count: int
13+
14+
@function.defn()
15+
async def decide(input: DecideInput):
16+
try:
17+
if (os.environ.get("OPENAI_API_KEY") is None):
18+
raise FunctionFailure("OPENAI_API_KEY is not set", non_retryable=True)
19+
20+
client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
21+
22+
response = client.chat.completions.create(
23+
model="gpt-4o-mini",
24+
messages=[
25+
{
26+
"role": "system",
27+
"content": f"""
28+
You are a helpful assistant for event registration that decides if the applicant should be accepted or rejected.
29+
"""
30+
},
31+
{
32+
"role": "user",
33+
"content": f"""
34+
The event is called "Restack AI Summit 2025"
35+
Restack is the main sponsor of the event.
36+
The applicant has the following email: {input.email}
37+
The current number of accepted applicants is: {input.current_accepted_applicants_count}
38+
The maximum number of accepted applicants is: 10
39+
40+
Decide if the applicant should be accepted or rejected.
41+
Return only a JSON object with these exact fields:
42+
{{
43+
"accepted": boolean
44+
}}
45+
""",
46+
}
47+
],
48+
response_format={"type": "json_object"},
49+
)
50+
51+
return response.choices[0].message.content
52+
except Exception as e:
53+
log.error("Failed to decide", error=e)
54+
raise FunctionFailure("Failed to decide", non_retryable=True)
55+
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
from restack_ai.function import function, log, FunctionFailure
2+
from dataclasses import dataclass
3+
from openai import OpenAI
4+
import os
5+
from dotenv import load_dotenv
6+
7+
load_dotenv()
8+
9+
@dataclass
10+
class GenerateEmailInput:
11+
email_context: str
12+
13+
@function.defn()
14+
async def generate_email_content(input: GenerateEmailInput):
15+
16+
try:
17+
client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
18+
19+
response = client.chat.completions.create(
20+
model="gpt-4o-mini",
21+
messages=[
22+
{
23+
"role": "system",
24+
"content": f"""
25+
You are a helpful assistant that generates short emails based on the provided context.
26+
"""
27+
},
28+
{
29+
"role": "user",
30+
"content": f"""Generate a short email based on the following context: {input.email_context}
31+
"""
32+
}
33+
],
34+
max_tokens=150
35+
)
36+
37+
return response.choices[0].message.content
38+
except Exception as e:
39+
log.error("Failed to generate email content", error=e)
40+
raise FunctionFailure("Failed to generate email content", non_retryable=True)
41+

re_act/src/functions/send_email.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import sendgrid
2+
from sendgrid.helpers.mail import Mail
3+
from restack_ai.function import function, log, FunctionFailure
4+
from dotenv import load_dotenv
5+
import os
6+
from dataclasses import dataclass
7+
8+
load_dotenv()
9+
10+
@dataclass
11+
class SendEmailInput:
12+
subject: str
13+
body: str
14+
15+
@function.defn()
16+
async def send_email(input: SendEmailInput):
17+
from_email = os.environ.get("FROM_EMAIL")
18+
19+
if not from_email:
20+
raise FunctionFailure('FROM_EMAIL is not set', non_retryable=True)
21+
22+
sendgrid_api_key = os.getenv('SENDGRID_API_KEY')
23+
24+
if not sendgrid_api_key:
25+
raise FunctionFailure('SENDGRID_API_KEY is not set', non_retryable=True)
26+
27+
message = Mail(
28+
from_email=from_email,
29+
to_emails=from_email,
30+
subject=input.subject,
31+
plain_text_content=input.body
32+
)
33+
34+
try:
35+
sg = sendgrid.SendGridAPIClient(api_key=sendgrid_api_key)
36+
sg.send(message)
37+
except Exception as e:
38+
log.error("Failed to send email", error=e)
39+
raise FunctionFailure("Failed to send email", non_retryable=True)

re_act/src/services.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import asyncio
2+
from src.client import client
3+
from src.functions.decide import decide
4+
from src.functions.generate_email_content import generate_email_content
5+
from src.functions.send_email import send_email
6+
from src.workflows.parent_workflow import ParentWorkflow
7+
from src.workflows.child_workflow_a import ChildWorkflowA
8+
from src.workflows.child_workflow_b import ChildWorkflowB
9+
10+
async def main():
11+
await asyncio.gather(
12+
client.start_service(
13+
workflows=[ParentWorkflow, ChildWorkflowA, ChildWorkflowB],
14+
functions=[decide, generate_email_content, send_email]
15+
)
16+
)
17+
18+
def run_services():
19+
asyncio.run(main())
20+
21+
if __name__ == "__main__":
22+
run_services()
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
from restack_ai.workflow import workflow, log, import_functions
2+
from datetime import timedelta
3+
from dataclasses import dataclass
4+
5+
with import_functions():
6+
from src.functions.generate_email_content import generate_email_content, GenerateEmailInput
7+
from src.functions.send_email import send_email, SendEmailInput
8+
9+
@workflow.defn()
10+
class ChildWorkflowA:
11+
@workflow.run
12+
async def run(self, email: str):
13+
log.info(f"Sending email to {email}")
14+
15+
text = await workflow.step(
16+
generate_email_content,
17+
input=GenerateEmailInput(
18+
email_context="Application was accepted"
19+
),
20+
start_to_close_timeout=timedelta(seconds=120)
21+
)
22+
23+
await workflow.step(
24+
send_email,
25+
input=SendEmailInput(
26+
subject="Restack AI Summit 2025",
27+
body=text
28+
),
29+
start_to_close_timeout=timedelta(seconds=120)
30+
)
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
from restack_ai.workflow import workflow, log, import_functions
2+
from datetime import timedelta
3+
from dataclasses import dataclass
4+
5+
with import_functions():
6+
from src.functions.generate_email_content import generate_email_content, GenerateEmailInput
7+
from src.functions.send_email import send_email, SendEmailInput
8+
9+
@workflow.defn()
10+
class ChildWorkflowB:
11+
@workflow.run
12+
async def run(self, email: str):
13+
log.info(f"Sending email to {email}")
14+
15+
text = await workflow.step(
16+
generate_email_content,
17+
input=GenerateEmailInput(
18+
email_context="Application was rejected"
19+
),
20+
start_to_close_timeout=timedelta(seconds=120)
21+
)
22+
23+
await workflow.step(
24+
send_email,
25+
input=SendEmailInput(
26+
subject="Restack AI Summit 2025",
27+
body=text
28+
),
29+
start_to_close_timeout=timedelta(seconds=120)
30+
)

0 commit comments

Comments
 (0)