Skip to content

Commit d3484dd

Browse files
committed
Add retry logic for port conflicts in cluster startup
1 parent f7164e8 commit d3484dd

File tree

1 file changed

+70
-60
lines changed

1 file changed

+70
-60
lines changed

nats-server/src/nats/server/__init__.py

Lines changed: 70 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -377,66 +377,76 @@ async def run_cluster(
377377
if size < 1:
378378
raise ValueError("Cluster size must be at least 1")
379379

380-
# Use OS to allocate available ports for each node (client + cluster port)
381-
available_ports = []
382-
cluster_ports = []
383-
sockets = []
384-
385-
try:
386-
# Create socket pairs for each node to reserve both client and cluster ports
387-
for _ in range(size):
388-
client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
389-
client_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
390-
client_sock.bind(("127.0.0.1", 0))
391-
client_port = client_sock.getsockname()[1]
392-
393-
cluster_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
394-
cluster_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
395-
cluster_sock.bind(("127.0.0.1", 0))
396-
cluster_port = cluster_sock.getsockname()[1]
397-
398-
available_ports.append(client_port)
399-
cluster_ports.append(cluster_port)
400-
sockets.extend([client_sock, cluster_sock])
401-
finally:
402-
for sock in sockets:
403-
sock.close()
404-
405-
servers = []
406-
try:
407-
for i in range(size):
408-
routes = [cluster_ports[j] for j in range(size) if j != i]
409-
410-
# Always create unique store directory for each cluster node
411-
# This prevents conflicts when JetStream is enabled (via flag or config)
412-
# If JetStream is disabled, the server will ignore this parameter
413-
if store_dir:
414-
# Use provided base directory and create subdirectory for each node
415-
node_store_dir = os.path.join(store_dir, f"node{i + 1}")
416-
os.makedirs(node_store_dir, exist_ok=True)
417-
else:
418-
# Create unique temp directory for each node to avoid conflicts
419-
node_store_dir = tempfile.mkdtemp(prefix=f"nats-node{i + 1}-")
420-
421-
server = await _run_cluster_node(
422-
config_path=config_path,
423-
port=available_ports[i],
424-
routes=routes,
425-
name=f"node{i + 1}",
426-
cluster_name="cluster",
427-
cluster_port=cluster_ports[i],
428-
jetstream=jetstream,
429-
store_dir=node_store_dir,
430-
)
431-
servers.append(server)
432-
433-
except Exception as e:
434-
for server in servers:
435-
with contextlib.suppress(Exception):
436-
await server.shutdown()
437-
raise ServerError(f"Failed to start cluster: {e}") from e
438-
439-
return ServerCluster(servers)
380+
max_retries = 3
381+
last_error: Exception | None = None
382+
383+
for attempt in range(max_retries):
384+
# Use OS to allocate available ports for each node (client + cluster port)
385+
available_ports = []
386+
cluster_ports = []
387+
sockets = []
388+
389+
try:
390+
# Create socket pairs for each node to reserve both client and cluster ports
391+
for _ in range(size):
392+
client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
393+
client_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
394+
client_sock.bind(("127.0.0.1", 0))
395+
client_port = client_sock.getsockname()[1]
396+
397+
cluster_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
398+
cluster_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
399+
cluster_sock.bind(("127.0.0.1", 0))
400+
cluster_port = cluster_sock.getsockname()[1]
401+
402+
available_ports.append(client_port)
403+
cluster_ports.append(cluster_port)
404+
sockets.extend([client_sock, cluster_sock])
405+
finally:
406+
for sock in sockets:
407+
sock.close()
408+
409+
servers = []
410+
try:
411+
for i in range(size):
412+
routes = [cluster_ports[j] for j in range(size) if j != i]
413+
414+
# Always create unique store directory for each cluster node
415+
# This prevents conflicts when JetStream is enabled (via flag or config)
416+
# If JetStream is disabled, the server will ignore this parameter
417+
if store_dir:
418+
# Use provided base directory and create subdirectory for each node
419+
node_store_dir = os.path.join(store_dir, f"node{i + 1}")
420+
os.makedirs(node_store_dir, exist_ok=True)
421+
else:
422+
# Create unique temp directory for each node to avoid conflicts
423+
node_store_dir = tempfile.mkdtemp(prefix=f"nats-node{i + 1}-")
424+
425+
server = await _run_cluster_node(
426+
config_path=config_path,
427+
port=available_ports[i],
428+
routes=routes,
429+
name=f"node{i + 1}",
430+
cluster_name="cluster",
431+
cluster_port=cluster_ports[i],
432+
jetstream=jetstream,
433+
store_dir=node_store_dir,
434+
)
435+
servers.append(server)
436+
437+
return ServerCluster(servers)
438+
439+
except ServerError as e:
440+
last_error = e
441+
for server in servers:
442+
with contextlib.suppress(Exception):
443+
await server.shutdown()
444+
# Retry on port conflicts
445+
if "address already in use" in str(e) and attempt < max_retries - 1:
446+
continue
447+
raise ServerError(f"Failed to start cluster: {e}") from e
448+
449+
raise ServerError(f"Failed to start cluster after {max_retries} attempts: {last_error}") from last_error
440450

441451

442452
async def _run_cluster_node(

0 commit comments

Comments
 (0)