Skip to content

Commit b0225bd

Browse files
authored
feat: celery example improvements (#14)
* feat: celery transaction minor x02 * feat: celery transaction minor x03
1 parent 488815c commit b0225bd

File tree

5 files changed

+54
-28
lines changed

5 files changed

+54
-28
lines changed

celery-transaction-processing/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ pip install -r requirements.txt
33
```
44

55
```bash
6-
uvicorn main:app --reload
6+
uvicorn main:api_app --reload
77
```
88

99
```bash

celery-transaction-processing/main.py

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,24 @@
1111
from deps import get_deps, get_constants
1212
from tasks import reconcile_transaction
1313

14-
app = FastAPI()
14+
api_app = FastAPI()
1515

1616

17-
@app.post("/transactions")
18-
async def transaction(
17+
@api_app.post("/transactions")
18+
async def create_transaction(
1919
req: utils.TransactionRequest,
2020
db: Session = Depends(get_deps().get_db_session),
2121
df: Dragonfly = Depends(get_deps().get_dragonfly),
2222
w3: Web3 = Depends(get_deps().get_web3),
2323
) -> utils.TransactionResponse:
2424
# Try to acquire a lock on the user account.
2525
lock_key = utils.user_account_lock_key(req.user_account_id)
26-
lock = df.set(name=lock_key, value=utils.LOCK_VALUE, nx=True, ex=utils.LOCK_EXPIRATION_SECONDS)
27-
if not lock:
26+
locked = df.set(
27+
name=lock_key, value=utils.LOCK_VALUE,
28+
nx=True, ex=utils.LOCK_EXPIRATION_SECONDS,
29+
)
30+
31+
if not locked:
2832
raise HTTPException(
2933
status_code=409,
3034
detail="User account is locked since a transaction is submitted very recently. Please try again later.",
@@ -81,8 +85,8 @@ async def transaction(
8185

8286
# Cache the transaction in Dragonfly.
8387
cache_key = utils.txn_cache_key(txn.id)
84-
_ = df.hset(cache_key, mapping=utils.txn_to_dict(txn))
85-
_ = df.expire(cache_key, utils.CACHE_NORMAL_EXPIRATION_SECONDS)
88+
mapping = utils.txn_to_dict(txn)
89+
utils.hset_and_expire(df, cache_key, mapping, utils.CACHE_NORMAL_EXPIRATION_SECONDS)
8690

8791
# Start the transaction reconciliation task.
8892
reconcile_transaction.delay(txn.id)
@@ -91,7 +95,7 @@ async def transaction(
9195
return utils.txn_to_response(txn)
9296

9397

94-
@app.get("/transactions/{txn_id}")
98+
@api_app.get("/transactions/{txn_id}")
9599
async def get_transaction(
96100
txn_id: int,
97101
db: Session = Depends(get_deps().get_db_session),
@@ -115,11 +119,10 @@ async def get_transaction(
115119
# If the transaction is not found, cache an empty value with only the ID and return a 404.
116120
# Caching an empty value is important to prevent cache penetrations.
117121
if txn is None:
118-
_ = df.hset(cache_key, mapping={"id": txn_id})
119-
_ = df.expire(cache_key, utils.CACHE_EMPTY_EXPIRATION_SECONDS)
122+
utils.hset_and_expire(df, cache_key, {"id": txn_id}, utils.CACHE_EMPTY_EXPIRATION_SECONDS)
120123
raise HTTPException(status_code=404, detail="Transaction not found")
121124

122125
# Cache the transaction in Dragonfly and return the response.
123-
_ = df.hset(cache_key, mapping=utils.txn_to_dict(txn))
124-
_ = df.expire(cache_key, utils.CACHE_NORMAL_EXPIRATION_SECONDS)
126+
mapping = utils.txn_to_dict(txn)
127+
utils.hset_and_expire(df, cache_key, mapping, utils.CACHE_NORMAL_EXPIRATION_SECONDS)
125128
return utils.txn_to_response(txn)

celery-transaction-processing/models.sql

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,16 @@ CREATE TABLE user_accounts
88
CREATE TABLE user_account_transactions
99
(
1010
id INTEGER PRIMARY KEY,
11-
user_account_id INTEGER REFERENCES user_accounts (id) NOT NULL,
12-
transaction_hash TEXT NOT NULL,
13-
from_public_address TEXT NOT NULL,
14-
to_public_address TEXT NOT NULL,
15-
transaction_amount_in_wei INTEGER NOT NULL,
16-
transaction_fee_total_in_wei INTEGER NOT NULL,
17-
transaction_fee_blockchain_in_wei INTEGER NOT NULL,
18-
status TEXT NOT NULL DEFAULT 'PENDING' CHECK (status IN ('PENDING', 'SUCCESSFUL', 'FAILED'))
11+
user_account_id INTEGER NOT NULL,
12+
transaction_hash TEXT NOT NULL,
13+
from_public_address TEXT NOT NULL,
14+
to_public_address TEXT NOT NULL,
15+
transaction_amount_in_wei INTEGER NOT NULL,
16+
transaction_fee_total_in_wei INTEGER NOT NULL,
17+
transaction_fee_blockchain_in_wei INTEGER NOT NULL,
18+
status TEXT NOT NULL DEFAULT 'PENDING' CHECK (status IN ('PENDING', 'SUCCESSFUL', 'FAILED')),
19+
20+
FOREIGN KEY (user_account_id) REFERENCES user_accounts (id)
1921
);
2022

2123
CREATE INDEX idx_user_account_id ON user_account_transactions (user_account_id);

celery-transaction-processing/tasks.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ def __init__(self, message):
2323
super().__init__(message)
2424

2525

26-
# Use Dragonfly as the broker and the result backend for Celery.
26+
# Celery app configuration.
27+
# Use Dragonfly as the message broker and the result storage backend for Celery.
2728
# Dragonfly is wire-protocol compatible with Redis.
2829
# We can use the same Redis URL(s) as long as Dragonfly is running on the port specified.
2930
app = Celery(
@@ -32,22 +33,28 @@ def __init__(self, message):
3233
backend=get_constants().get_celery_backend_url(),
3334
)
3435

35-
TASK_MAX_RETRIES: Final[int] = 20
36-
TASK_RETRY_DELAY: Final[int] = 100
36+
TASK_MAX_RETRIES: Final[int] = 32
37+
TASK_RETRY_BACKOFF: Final[int] = 100
38+
TASK_RETRY_BACKOFF_MAX: Final[int] = 1600
3739

3840

3941
# Define a Celery task to reconcile a transaction.
4042
# We check the transaction status from the blockchain and update the database accordingly.
4143
# The blockchain is just an example. Many finical systems have similar reconciliation or settlement processes.
4244
#
43-
# Maximum retry count is TASK_MAX_RETRIES, and the delay between retries is exponential.
44-
# The retry delays are [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, ...] * TASK_RETRY_DELAY seconds.
45-
# Thus, the first retry is after 100 seconds, the second retry is after 200 seconds, and so on.
45+
# Maximum retry count is 32, and the delay between retries is exponential.
46+
#
47+
# The retry delays are [1, 2, 4, 8, 16, ...] x 100 seconds.
48+
# So, the first retry is after 100 seconds, the second retry is after 200 seconds, and so on.
49+
#
50+
# The maximum backoff delay is set to 1600 seconds.
51+
# Thus, the retry delays in this configuration are [100, 200, 400, 800, 1600, 1600, ...] seconds up to 32 retries.
4652
@app.task(
4753
bind=True,
4854
autoretry_for=(TaskRetryException,),
4955
max_retries=TASK_MAX_RETRIES,
50-
retry_backoff=TASK_RETRY_DELAY,
56+
retry_backoff=TASK_RETRY_BACKOFF,
57+
retry_backoff_max=TASK_RETRY_BACKOFF_MAX,
5158
retry_jitter=False,
5259
)
5360
def reconcile_transaction(_self, txn_id: str):

celery-transaction-processing/utils.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
from typing import Final
33
from urllib.parse import urlparse
44

5+
from redis import Redis as Dragonfly
6+
57
import models
68

79
CACHE_NORMAL_EXPIRATION_SECONDS: Final[int] = 60
@@ -112,3 +114,15 @@ def parse_dragonfly_url(dragonfly_url) -> DragonflyURLParsed:
112114
port = 6379
113115

114116
return DragonflyURLParsed(host=host, port=port, db=db)
117+
118+
119+
def hset_and_expire(
120+
df: Dragonfly,
121+
key: str,
122+
mapping: dict,
123+
expiration: int,
124+
):
125+
pipe = df.pipeline()
126+
pipe.hset(key, mapping=mapping)
127+
pipe.expire(key, expiration)
128+
pipe.execute()

0 commit comments

Comments
 (0)