Skip to content

Commit de044d1

Browse files
authored
Merge pull request #33 from corkrean/temporal-py
adding temporal example
2 parents 821591c + d61cae5 commit de044d1

File tree

10 files changed

+285
-0
lines changed

10 files changed

+285
-0
lines changed

data/temporal_example/README.md

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
# Distributed Writes to SpiceDB with Temporal
2+
3+
This is a "quick start" example that demonstrates how you can use Temporal to maintain a consistent state across SpiceDB and an application DB (Postgres in this example) in the event of failures.
4+
5+
Before beginning this example walkthrough, it is recommended to have basic knowledge of Temporal. [Here](https://docs.temporal.io/evaluate/understanding-temporal) is a good place to start with Temporal.
6+
7+
## Prerequisites
8+
- [Docker Compose](https://docs.docker.com/compose/install/)
9+
- [Temporal CLI](https://docs.temporal.io/cli#install)
10+
- [Python 3](https://www.python.org/downloads/)
11+
- [pip](https://packaging.python.org/en/latest/tutorials/installing-packages/) (included with Python if you installed via Homebrew or python.org)
12+
13+
## Running this Example
14+
15+
Follow these steps to run this example:
16+
17+
### Setup
18+
1. Clone the SpiceDB examples repo: ` git clone [email protected]:authzed/examples.git`
19+
2. Change into the Docker Compose directory `cd examples/data/temporal_example/compose`
20+
3. Start SpiceDB and Postgres with `docker-compose up -d`
21+
4. Change to the parent directory: `cd ..`
22+
5. Start a development Temporal Server: `temporal server start-dev`
23+
6. Setup a virtual env: `python3 -m venv env`, then `source env/bin/activate`.
24+
7. Install dependencies: `python3 -m pip install authzed temporalio psycopg`
25+
8. Setup the Postgres DB and SpiceDB: `python3 run_migrations.py`
26+
9. Start the Temporal Worker: `python3 run_worker.py`
27+
28+
### Experiencing Temporal
29+
10. Run the Temporal workflow (worker should still be running while you are doing this): `python3 run_workflow.py --author bob --post some_post`
30+
11. Simulate a Postgres failure by stopping the Docker container running Postgres: `docker stop dev-postgres`
31+
12. Run the Temporal workflow (worker should still be running while you are doing this): `python3 run_workflow.py --author bob --post another_post`
32+
13. Notice that the initial attempt failed.
33+
14. Restart Postgres: `docker start dev-postgres`
34+
15. Wait a few seconds and notice that the Postgres write activity succeeds and that the workflow succeeds. 😎
35+
36+
## Important Takeaways from this Example
37+
38+
- This example uses Temporal's default [retry policy](https://docs.temporal.io/encyclopedia/retry-policies) for Activities and Workflows. This means that an Activity will indefinitely try to complete a successful execution.
39+
40+
- Both Postgres and SpiceDB writes in this example are idempotent. In the case of the record already existing, there will be no errors and the workflow will complete successfully. [Temporal recommends that all activities be idempotent.](https://docs.temporal.io/activity-definition#idempotency)
41+
42+
- This example uses a single Temporal Task Queue partition. [Task Queues with a single partition](https://docs.temporal.io/task-queue#task-ordering) are almost always first-in, first-out, with rare edge case exceptions.
43+
44+
- This example writes directly to SpiceDB and Postgres. For many use cases, Temporal will make API requests to distributed microservices that in turn make requests to SpiceDB and an app DB.
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
from temporalio import activity
2+
from shared import BlogAuthor
3+
from data.spicedb import writeRelationship
4+
from data.postgres import writePostgres
5+
import psycopg
6+
7+
8+
@activity.defn
9+
async def write_to_spicedb(blog_author: BlogAuthor):
10+
try:
11+
await writeRelationship(blog_author)
12+
except Exception:
13+
activity.logger.exception("SpiceDB write failed")
14+
raise
15+
16+
@activity.defn
17+
async def write_to_postgres(blog_author: BlogAuthor):
18+
try:
19+
await writePostgres(blog_author)
20+
except Exception:
21+
activity.logger.exception("Postgres write failed")
22+
raise
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
---
2+
version: "3.8"
3+
4+
services:
5+
postgres:
6+
image: "postgres:15"
7+
container_name: "dev-postgres"
8+
environment:
9+
POSTGRES_USER: "dev"
10+
POSTGRES_PASSWORD: "abc123"
11+
POSTGRES_DB: "blog_authors"
12+
ports:
13+
- "5432:5432"
14+
15+
spicedb:
16+
image: "authzed/spicedb:v1.34.0"
17+
container_name: "dev-spicedb"
18+
command: [
19+
"serve",
20+
"--grpc-preshared-key", "localkey",
21+
]
22+
ports:
23+
- "50051:50051" # gRPC
24+
depends_on:
25+
- "postgres"
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import psycopg
2+
3+
from shared import BlogAuthor
4+
5+
DB_CONFIG = {
6+
"dbname": "blog_authors",
7+
"user": "dev",
8+
"password": "abc123",
9+
"host": "localhost",
10+
"port": 5432
11+
}
12+
13+
async def writePostgres(blog_author: BlogAuthor):
14+
with psycopg.connect(**DB_CONFIG) as conn:
15+
with conn.cursor() as cur:
16+
cur.execute("""
17+
INSERT INTO blog_authors (author_user_id, post_id)
18+
VALUES (%s, %s)
19+
ON CONFLICT DO NOTHING;
20+
""", (blog_author.author_user_id, blog_author.post_id))
21+
22+
async def create_blog_authors_table():
23+
with psycopg.connect(**DB_CONFIG) as conn:
24+
with conn.cursor() as cur:
25+
cur.execute("""
26+
CREATE TABLE IF NOT EXISTS blog_authors (
27+
author_user_id TEXT NOT NULL,
28+
post_id TEXT NOT NULL,
29+
PRIMARY KEY (author_user_id, post_id)
30+
);
31+
""")
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
from authzed.api.v1 import (
2+
AsyncClient,
3+
ObjectReference,
4+
Relationship,
5+
RelationshipUpdate,
6+
SubjectReference,
7+
WriteRelationshipsRequest,
8+
WriteSchemaRequest,
9+
)
10+
from grpcutil import insecure_bearer_token_credentials
11+
12+
from shared import BlogAuthor
13+
14+
async def writeRelationship(blog_author: BlogAuthor):
15+
16+
#Using an insecure bearer token for the example. In a real application, you would use a secure bearer token.
17+
client = AsyncClient(
18+
"localhost:50051",
19+
insecure_bearer_token_credentials("localkey"),
20+
)
21+
22+
try:
23+
await client.WriteRelationships(
24+
WriteRelationshipsRequest(
25+
updates=[
26+
RelationshipUpdate(
27+
#It's a temporal best practice to keep activities idempotent
28+
operation=RelationshipUpdate.Operation.OPERATION_TOUCH,
29+
relationship=Relationship(
30+
resource=ObjectReference(object_type="post", object_id=blog_author.post_id),
31+
relation="author",
32+
subject=SubjectReference(
33+
object=ObjectReference(
34+
object_type="user",
35+
object_id=blog_author.author_user_id,
36+
)
37+
),
38+
),
39+
),
40+
]
41+
)
42+
)
43+
except Exception as e:
44+
raise
45+
46+
async def writeSchema():
47+
SCHEMA = """definition user {}
48+
definition post {
49+
relation author: user
50+
permission edit = author
51+
}"""
52+
53+
client = AsyncClient(
54+
"localhost:50051",
55+
insecure_bearer_token_credentials("localkey"),
56+
)
57+
await client.WriteSchema(WriteSchemaRequest(schema=SCHEMA))
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from data.postgres import create_blog_authors_table
2+
from data.spicedb import writeSchema
3+
import asyncio
4+
5+
async def main():
6+
await create_blog_authors_table()
7+
await writeSchema()
8+
9+
if __name__ == "__main__":
10+
asyncio.run(main())
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import asyncio
2+
import logging
3+
from temporalio.client import Client
4+
from temporalio.worker import Worker
5+
6+
from activities import write_to_spicedb, write_to_postgres
7+
from workflows import writeOwner
8+
9+
async def main():
10+
# Configure logging
11+
logging.basicConfig(level=logging.INFO)
12+
13+
client = await Client.connect("localhost:7233", namespace="default")
14+
# Run the worker
15+
worker = Worker(
16+
client, task_queue="write-task-queue",
17+
workflows=[writeOwner],
18+
activities=[write_to_spicedb, write_to_postgres]
19+
)
20+
await worker.run()
21+
22+
23+
if __name__ == "__main__":
24+
asyncio.run(main())
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import asyncio
2+
import argparse
3+
4+
from run_worker import writeOwner
5+
from temporalio.client import Client
6+
from shared import BlogAuthor
7+
8+
#In a real application, you may invoke this code when someone submits a form, presses a button, or visits a certain URL
9+
10+
async def main(author_user_id: str, post_id: str):
11+
client = await Client.connect("localhost:7233")
12+
13+
await client.execute_workflow(
14+
writeOwner.run, BlogAuthor(author_user_id=author_user_id, post_id=post_id), id=f"write-workflow-{author_user_id}-{post_id}", task_queue="write-task-queue"
15+
)
16+
17+
print("Workflow completed successfully")
18+
19+
20+
if __name__ == "__main__":
21+
parser = argparse.ArgumentParser(description='Run the workflow with specified author and post IDs')
22+
parser.add_argument('--author', required=True, help='The author user ID')
23+
parser.add_argument('--post', required=True, help='The post ID')
24+
25+
args = parser.parse_args()
26+
asyncio.run(main(args.author, args.post))

data/temporal_example/shared.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
from dataclasses import dataclass
2+
3+
@dataclass
4+
class BlogAuthor:
5+
author_user_id: str
6+
post_id: str

data/temporal_example/workflows.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
from temporalio import workflow
2+
from datetime import timedelta
3+
from temporalio.exceptions import ActivityError
4+
# Import activity, passing it through the sandbox without reloading the module
5+
with workflow.unsafe.imports_passed_through():
6+
from activities import write_to_postgres
7+
from activities import write_to_spicedb
8+
from shared import BlogAuthor
9+
10+
11+
#this is the workflow definition
12+
@workflow.defn
13+
class writeOwner:
14+
@workflow.run
15+
async def run (self, blog_author: BlogAuthor):
16+
try:
17+
workflow.logger.info("Starting workflow execution")
18+
19+
workflow.logger.info("Attempting Postgres write")
20+
await workflow.execute_activity(
21+
write_to_postgres,
22+
blog_author,
23+
start_to_close_timeout=timedelta(seconds=5),
24+
)
25+
workflow.logger.info(f"Postgres write completed")
26+
except ActivityError as postgresWriteError:
27+
workflow.logger.error(f"Postgres write failed (from workflow): {postgresWriteError}")
28+
raise postgresWriteError
29+
30+
try:
31+
workflow.logger.info("Attempting SpiceDB write")
32+
await workflow.execute_activity(
33+
write_to_spicedb,
34+
blog_author,
35+
start_to_close_timeout=timedelta(seconds=5),
36+
)
37+
workflow.logger.info("SpiceDB write completed")
38+
except ActivityError as spicedbWriteError:
39+
workflow.logger.error(f"SpiceDB write failed (from workflow): {spicedbWriteError}")
40+
raise spicedbWriteError

0 commit comments

Comments
 (0)