|
1 | | -import time |
2 | 1 | from multiprocessing import shared_memory |
3 | 2 | import logging |
4 | | -import random |
| 3 | +import os |
| 4 | +import fcntl |
5 | 5 |
|
6 | 6 | logger = logging.getLogger(__name__) |
7 | 7 |
|
8 | | -MAX_RETRIES = 25 |
9 | | -RETRY_DELAY = 0.2 # seconds |
| 8 | +LIGHTLLM_SHM_LOCK_FILE = "/tmp/lightllm_shm_creation.lock" |
10 | 9 |
|
11 | 10 |
|
12 | | -def create_or_link_shm(name: str, expected_size: int) -> shared_memory.SharedMemory: |
13 | | - for _ in range(MAX_RETRIES): |
14 | | - shm = None |
| 11 | +def acquire_lock(): |
| 12 | + lock_fd = os.open(LIGHTLLM_SHM_LOCK_FILE, os.O_CREAT | os.O_RDWR) |
| 13 | + fcntl.flock(lock_fd, fcntl.LOCK_EX) |
| 14 | + return lock_fd |
| 15 | + |
| 16 | + |
| 17 | +def release_lock(lock_fd): |
| 18 | + fcntl.flock(lock_fd, fcntl.LOCK_UN) |
| 19 | + os.close(lock_fd) |
| 20 | + |
| 21 | + |
| 22 | +def create_or_link_shm(name, expected_size): |
| 23 | + lock_fd = acquire_lock() |
| 24 | + try: |
15 | 25 | try: |
16 | 26 | shm = shared_memory.SharedMemory(name=name, create=True, size=expected_size) |
17 | | - logger.info(f"Created new shared memory: {name} (size={expected_size})") |
| 27 | + logger.info(f"Created new shared memory: {name} ({expected_size=})") |
18 | 28 | return shm |
19 | 29 | except FileExistsError: |
20 | 30 | try: |
21 | | - shm = shared_memory.SharedMemory(name=name, create=False) |
| 31 | + shm = shared_memory.SharedMemory(name=name) |
22 | 32 | except FileNotFoundError: |
23 | 33 | logger.warning(f"Shared memory {name} disappeared, retrying...") |
24 | | - time.sleep(RETRY_DELAY * random.uniform(1, 2)) |
25 | | - continue |
26 | | - |
| 34 | + shm = shared_memory.SharedMemory(name=name, create=True, size=expected_size) |
| 35 | + except Exception as e: |
| 36 | + logger.error(f"Unexpected error attaching to shared memory {name}: {e}") |
| 37 | + raise |
27 | 38 | if shm.size != expected_size: |
28 | 39 | logger.warning(f"Size mismatch: expected {expected_size}, got {shm.size}. Recreating {name}...") |
29 | 40 | shm.close() |
30 | 41 | try: |
31 | 42 | shm.unlink() |
32 | 43 | except FileNotFoundError: |
33 | 44 | pass |
34 | | - |
35 | | - time.sleep(RETRY_DELAY * random.uniform(1, 2)) |
36 | | - continue |
37 | | - else: |
38 | | - logger.info(f"Attached to existing shared memory: {name} (size={shm.size})") |
39 | | - return shm |
40 | | - except Exception as e: |
41 | | - if shm: |
42 | | - shm.close() |
43 | | - logger.error(f"Unexpected error creating/attaching shm {name}: {e}") |
44 | | - raise |
45 | | - |
46 | | - raise RuntimeError(f"Failed to create or attach to shared memory '{name}' after {MAX_RETRIES} attempts") |
| 45 | + shm = shared_memory.SharedMemory(name=name, create=True, size=expected_size) |
| 46 | + logger.info(f"Attached to existing shared memory: {name} ({expected_size=})") |
| 47 | + return shm |
| 48 | + except Exception as e: |
| 49 | + logger.error(f"Unexpected error creating shared memory {name}: {e}") |
| 50 | + raise |
| 51 | + finally: |
| 52 | + release_lock(lock_fd) |
0 commit comments