Skip to content

Commit 709a6ad

Browse files
authored
feat(dirty): add stash - global shared state between workers (#3503)
* feat(dirty): add stash - global shared state between workers Add a simple key-value store (stash) that allows dirty workers to share state through the arbiter. Tables are stored directly in arbiter memory for fast access and simplicity. Features: - Auto-create tables on first access - Dict-like interface via stash.table() - Pattern matching for keys (glob patterns) - Module-level API: stash.put(), stash.get(), stash.delete(), etc. Usage: from gunicorn.dirty import stash stash.put("sessions", "user:1", {"name": "Alice"}) user = stash.get("sessions", "user:1") # Or dict-like sessions = stash.table("sessions") sessions["user:1"] = {"name": "Alice"} New files: - gunicorn/dirty/stash.py - Client API and StashTable class - Protocol additions for MSG_TYPE_STASH and STASH_OP_* codes Note: Tables are ephemeral - lost if arbiter restarts. * test(dirty): add tests for stash protocol and encoding Test coverage for: - Stash message creation and encoding - Protocol constants (MSG_TYPE_STASH, STASH_OP_*) - Error classes (StashError, StashTableNotFoundError, StashKeyNotFoundError) - StashTable dict-like interface - Edge cases: unicode, complex values, special patterns * example(dirty): add stash usage example and integration tests - Add SessionApp to dirty_app.py demonstrating stash usage - Add /session/* endpoints to wsgi_app.py - Add test_stash_integration.py with Docker tests - Update docker-compose.yml with stash-test service - Fix: Set GUNICORN_DIRTY_SOCKET in dirty arbiter for worker access * docs(dirty): add stash documentation
1 parent 236c937 commit 709a6ad

File tree

13 files changed

+1626
-3
lines changed

13 files changed

+1626
-3
lines changed

docs/content/dirty.md

Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -558,6 +558,259 @@ def generate_view(request):
558558
4. **Keep chunks small** - Smaller chunks provide better perceived latency
559559
5. **Handle client disconnection** - Streams continue even if client disconnects; design accordingly
560560

561+
## Stash (Shared State via Message Passing)
562+
563+
Stash provides shared state between dirty workers, similar to Erlang's ETS (Erlang Term Storage). Workers remain fully isolated - all state access goes through message passing to the arbiter.
564+
565+
### Architecture
566+
567+
```
568+
+------------------+
569+
| Dirty Arbiter |
570+
| |
571+
| stash_tables: |
572+
| sessions: {} |
573+
| cache: {} |
574+
+--------+---------+
575+
|
576+
Unix Socket IPC (message passing)
577+
|
578+
+-------------------+-------------------+
579+
| | |
580+
+-----v-----+ +-----v-----+ +-----v-----+
581+
| Worker 1 | | Worker 2 | | Worker 3 |
582+
| | | | | |
583+
| (isolated)| | (isolated)| | (isolated)|
584+
+-----------+ +-----------+ +-----------+
585+
586+
Workers have NO shared memory.
587+
All stash operations are IPC messages to arbiter.
588+
```
589+
590+
### How It Works
591+
592+
1. Worker calls `stash.put("sessions", "user:1", data)`
593+
2. Worker sends message to arbiter via Unix socket
594+
3. Arbiter stores data in its memory (`self.stash_tables`)
595+
4. Arbiter sends response back to worker
596+
5. Worker receives confirmation
597+
598+
This is **not** shared memory - workers remain fully isolated. The arbiter acts as a centralized store that workers communicate with via message passing. This matches Erlang's model where ETS tables are owned by a process.
599+
600+
### Basic Usage
601+
602+
```python
603+
from gunicorn.dirty import stash
604+
605+
# Store a value (table auto-created)
606+
# This sends a message to arbiter, which stores it
607+
stash.put("sessions", "user:123", {"name": "Alice", "role": "admin"})
608+
609+
# Retrieve a value
610+
# This sends a request to arbiter, which returns the value
611+
user = stash.get("sessions", "user:123")
612+
613+
# Delete a key
614+
stash.delete("sessions", "user:123")
615+
616+
# Check existence
617+
if stash.exists("sessions", "user:123"):
618+
print("Session exists")
619+
620+
# List keys with pattern matching
621+
keys = stash.keys("sessions", pattern="user:*")
622+
```
623+
624+
### Dict-like Interface
625+
626+
For more Pythonic access, use the table interface:
627+
628+
```python
629+
from gunicorn.dirty import stash
630+
631+
# Get a table reference
632+
sessions = stash.table("sessions")
633+
634+
# Dict-like operations (each is an IPC message)
635+
sessions["user:123"] = {"name": "Alice"}
636+
user = sessions["user:123"]
637+
del sessions["user:123"]
638+
639+
# Iteration
640+
for key in sessions:
641+
print(key, sessions[key])
642+
643+
# Length
644+
count = len(sessions)
645+
```
646+
647+
### Table Management
648+
649+
```python
650+
from gunicorn.dirty import stash
651+
652+
# Explicit table creation (idempotent)
653+
stash.ensure("cache")
654+
655+
# Get table info
656+
info = stash.info("sessions")
657+
print(f"Table has {info['size']} entries")
658+
659+
# Clear all entries in a table
660+
stash.clear("sessions")
661+
662+
# Delete entire table
663+
stash.delete_table("sessions")
664+
665+
# List all tables
666+
tables = stash.tables()
667+
```
668+
669+
### Using Stash in DirtyApp
670+
671+
Declare tables your app uses with the `stashes` class attribute:
672+
673+
```python
674+
from gunicorn.dirty import DirtyApp, stash
675+
676+
class SessionApp(DirtyApp):
677+
# Tables declared here are auto-created on startup
678+
stashes = ["sessions", "counters"]
679+
680+
def init(self):
681+
# Initialize counter if needed
682+
if not stash.exists("counters", "requests"):
683+
stash.put("counters", "requests", 0)
684+
685+
def login(self, user_id, user_data):
686+
"""Store session - any worker can read it via arbiter."""
687+
stash.put("sessions", f"user:{user_id}", {
688+
"data": user_data,
689+
"logged_in_at": time.time(),
690+
})
691+
self._increment_counter()
692+
return {"status": "ok"}
693+
694+
def get_session(self, user_id):
695+
"""Get session - request goes to arbiter."""
696+
return stash.get("sessions", f"user:{user_id}")
697+
698+
def _increment_counter(self):
699+
"""Increment global counter via arbiter."""
700+
current = stash.get("counters", "requests", 0)
701+
stash.put("counters", "requests", current + 1)
702+
703+
def close(self):
704+
pass
705+
```
706+
707+
### API Reference
708+
709+
| Function | Description |
710+
|----------|-------------|
711+
| `stash.put(table, key, value)` | Store a value (table auto-created) |
712+
| `stash.get(table, key, default=None)` | Retrieve a value |
713+
| `stash.delete(table, key)` | Delete a key, returns True if deleted |
714+
| `stash.exists(table, key=None)` | Check if table/key exists |
715+
| `stash.keys(table, pattern=None)` | List keys, optional glob pattern |
716+
| `stash.clear(table)` | Delete all entries in table |
717+
| `stash.info(table)` | Get table info (size, etc.) |
718+
| `stash.ensure(table)` | Create table if not exists |
719+
| `stash.delete_table(table)` | Delete entire table |
720+
| `stash.tables()` | List all table names |
721+
| `stash.table(name)` | Get dict-like interface |
722+
723+
### Patterns and Use Cases
724+
725+
**Session Storage:**
726+
```python
727+
# Store session on login (worker 1)
728+
stash.put("sessions", f"user:{user_id}", session_data)
729+
730+
# Check session on request (may be worker 2)
731+
session = stash.get("sessions", f"user:{user_id}")
732+
if session is None:
733+
raise AuthError("Not logged in")
734+
```
735+
736+
**Shared Cache:**
737+
```python
738+
def get_expensive_result(key):
739+
# Check cache first (via arbiter)
740+
cached = stash.get("cache", key)
741+
if cached is not None:
742+
return cached
743+
744+
# Compute and cache
745+
result = expensive_computation()
746+
stash.put("cache", key, result)
747+
return result
748+
```
749+
750+
**Global Counters:**
751+
```python
752+
def increment_counter(name):
753+
# Note: not atomic - two workers could read same value
754+
current = stash.get("counters", name, 0)
755+
stash.put("counters", name, current + 1)
756+
return current + 1
757+
```
758+
759+
**Feature Flags:**
760+
```python
761+
# Set flag (from admin endpoint)
762+
stash.put("flags", "new_feature", True)
763+
764+
# Check flag (from any worker)
765+
if stash.get("flags", "new_feature", False):
766+
enable_new_feature()
767+
```
768+
769+
### Error Handling
770+
771+
```python
772+
from gunicorn.dirty.stash import (
773+
StashError,
774+
StashTableNotFoundError,
775+
StashKeyNotFoundError,
776+
)
777+
778+
try:
779+
info = stash.info("nonexistent")
780+
except StashTableNotFoundError as e:
781+
print(f"Table not found: {e.table_name}")
782+
783+
# Using get() with default avoids KeyNotFoundError
784+
value = stash.get("table", "key", default="fallback")
785+
```
786+
787+
### Best Practices
788+
789+
1. **Use descriptive table names** - `user_sessions`, `ml_cache`, not `data`
790+
2. **Use key prefixes** - `user:123`, `cache:model:v1` for organization
791+
3. **Handle missing data** - Always provide defaults or check existence
792+
4. **Don't store large data** - Each access is an IPC round-trip
793+
5. **Remember it's ephemeral** - Data is lost on arbiter restart
794+
795+
### Advantages
796+
797+
- **Worker isolation** - Workers remain fully isolated; no shared memory bugs
798+
- **Simple API** - Dict-like interface, no locking required
799+
- **Binary support** - Efficiently stores bytes (images, model weights)
800+
- **Pattern matching** - `keys(pattern="user:*")` for querying
801+
- **Zero setup** - Works automatically with dirty workers
802+
- **Table-based** - Organize data into logical namespaces
803+
804+
### Limitations
805+
806+
- **No persistence** - Data lives only in arbiter memory
807+
- **No transactions** - No atomic read-modify-write operations
808+
- **No TTL** - Entries don't expire automatically
809+
- **IPC overhead** - Each operation is a network round-trip
810+
- **Single arbiter** - Not distributed across multiple machines
811+
812+
For persistent or distributed state, use Redis, PostgreSQL, or similar external systems.
813+
561814
### Flask Example
562815

563816
```python

examples/dirty_example/dirty_app.py

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,11 @@
1111
3. Cleans up on shutdown (close)
1212
"""
1313

14+
import os
1415
import time
1516
import hashlib
1617
from gunicorn.dirty.app import DirtyApp
18+
from gunicorn.dirty import stash
1719

1820

1921
class MLApp(DirtyApp):
@@ -171,3 +173,96 @@ def stats(self):
171173

172174
def close(self):
173175
print(f"[ComputeApp] Shutting down. Total computations: {self.computation_count}")
176+
177+
178+
class SessionApp(DirtyApp):
179+
"""
180+
Example dirty application demonstrating stash (shared state).
181+
182+
This shows how multiple dirty workers can share state through
183+
the arbiter's stash tables. All workers see the same data.
184+
"""
185+
186+
# Declare stash tables used by this app (auto-created on startup)
187+
stashes = ["sessions", "counters"]
188+
189+
def __init__(self):
190+
self.worker_pid = None
191+
192+
def init(self):
193+
self.worker_pid = os.getpid()
194+
print(f"[SessionApp] Initialized on worker {self.worker_pid}")
195+
# Initialize a global counter if it doesn't exist
196+
if not stash.exists("counters", "requests"):
197+
stash.put("counters", "requests", 0)
198+
199+
def __call__(self, action, *args, **kwargs):
200+
method = getattr(self, action, None)
201+
if method is None or action.startswith('_'):
202+
raise ValueError(f"Unknown action: {action}")
203+
return method(*args, **kwargs)
204+
205+
def login(self, user_id, user_data):
206+
"""Store user session in shared stash."""
207+
session = {
208+
"user_id": user_id,
209+
"data": user_data,
210+
"logged_in_at": time.time(),
211+
"worker_pid": self.worker_pid,
212+
}
213+
stash.put("sessions", f"user:{user_id}", session)
214+
self._increment_counter()
215+
return {"status": "ok", "session": session}
216+
217+
def logout(self, user_id):
218+
"""Remove user session."""
219+
key = f"user:{user_id}"
220+
if stash.exists("sessions", key):
221+
stash.delete("sessions", key)
222+
self._increment_counter()
223+
return {"status": "logged_out", "user_id": user_id}
224+
return {"status": "not_found", "user_id": user_id}
225+
226+
def get_session(self, user_id):
227+
"""Get user session - visible from any worker."""
228+
session = stash.get("sessions", f"user:{user_id}")
229+
self._increment_counter()
230+
return {
231+
"session": session,
232+
"served_by_worker": self.worker_pid,
233+
}
234+
235+
def list_sessions(self):
236+
"""List all active sessions."""
237+
keys = stash.keys("sessions", pattern="user:*")
238+
sessions = []
239+
for key in keys:
240+
sessions.append(stash.get("sessions", key))
241+
self._increment_counter()
242+
return {
243+
"sessions": sessions,
244+
"count": len(sessions),
245+
"served_by_worker": self.worker_pid,
246+
}
247+
248+
def get_stats(self):
249+
"""Get global request counter (shared across all workers)."""
250+
count = stash.get("counters", "requests", 0)
251+
return {
252+
"total_requests": count,
253+
"served_by_worker": self.worker_pid,
254+
}
255+
256+
def _increment_counter(self):
257+
"""Increment global request counter."""
258+
current = stash.get("counters", "requests", 0)
259+
stash.put("counters", "requests", current + 1)
260+
261+
def clear_all(self):
262+
"""Clear all sessions (for testing)."""
263+
stash.clear("sessions")
264+
stash.put("counters", "requests", 0)
265+
return {"status": "cleared"}
266+
267+
def close(self):
268+
print(f"[SessionApp] Shutting down worker {self.worker_pid}")

examples/dirty_example/docker-compose.yml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,3 +52,15 @@ services:
5252
environment:
5353
- TEST_BASE_URL=http://server:8000
5454
command: python examples/dirty_example/test_integration.py
55+
56+
# Run stash integration test against the server
57+
stash-test:
58+
build:
59+
context: ../..
60+
dockerfile: examples/dirty_example/Dockerfile
61+
depends_on:
62+
server:
63+
condition: service_healthy
64+
environment:
65+
- TEST_BASE_URL=http://server:8000
66+
command: python examples/dirty_example/test_stash_integration.py

examples/dirty_example/gunicorn_conf.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
dirty_apps = [
2323
"examples.dirty_example.dirty_app:MLApp",
2424
"examples.dirty_example.dirty_app:ComputeApp",
25+
"examples.dirty_example.dirty_app:SessionApp",
2526
]
2627
dirty_workers = 2
2728
dirty_timeout = 300

0 commit comments

Comments
 (0)