Skip to content

Commit 05970e2

Browse files
authored
Dont block executor when syncing files or mutexes (#32)
* Make Mutex.set async * Make folder sync async
1 parent 71b2dec commit 05970e2

File tree

4 files changed

+37
-25
lines changed

4 files changed

+37
-25
lines changed

livesync/folder.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,18 +79,18 @@ async def watch(self) -> None:
7979
watch_filter=lambda _, filepath: not self._ignore_spec.match_file(filepath)):
8080
for change, filepath in changes:
8181
print('?+U-'[change], filepath)
82-
self.sync()
82+
await self.sync()
8383
except RuntimeError as e:
8484
if 'Already borrowed' not in str(e):
8585
raise
8686

87-
def sync(self) -> None:
87+
async def sync(self) -> None:
8888
args = ' '.join(self._rsync_args)
8989
args += ''.join(f' --exclude="{e}"' for e in self._get_ignores())
9090
args += f' -e "ssh -p {self.ssh_port}"' # NOTE: use SSH with custom port
9191
args += f' --rsync-path="mkdir -p {self.target_path} && rsync"' # NOTE: create target folder if not exists
92-
run_subprocess(f'rsync {args} "{self.source_path}/" "{self.target}/"', quiet=True)
92+
await run_subprocess(f'rsync {args} "{self.source_path}/" "{self.target}/"', quiet=True)
9393
if isinstance(self.on_change, str):
94-
run_subprocess(f'ssh {self.host} -p {self.ssh_port} "cd {self.target_path}; {self.on_change}"')
94+
await run_subprocess(f'ssh {self.host} -p {self.ssh_port} "cd {self.target_path}; {self.on_change}"')
9595
if callable(self.on_change):
9696
self.on_change()

livesync/mutex.py

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1+
import asyncio
12
import logging
23
import socket
3-
import subprocess
44
from datetime import datetime, timedelta
55
from typing import Optional
66

@@ -14,10 +14,10 @@ def __init__(self, host: str, port: int) -> None:
1414
self.occupant: Optional[str] = None
1515
self.user_id = socket.gethostname()
1616

17-
def is_free(self) -> bool:
17+
async def is_free(self) -> bool:
1818
try:
1919
command = f'[ -f {self.DEFAULT_FILEPATH} ] && cat {self.DEFAULT_FILEPATH} || echo'
20-
output = self._run_ssh_command(command).strip()
20+
output = (await self._run_ssh_command(command)).strip()
2121
if not output:
2222
return True
2323
words = output.splitlines()[0].strip().split()
@@ -30,20 +30,27 @@ def is_free(self) -> bool:
3030
logging.exception('Could not access target system')
3131
return False
3232

33-
def set(self, info: str) -> bool:
34-
if not self.is_free():
33+
async def set(self, info: str) -> bool:
34+
if not await self.is_free():
3535
return False
3636
try:
37-
self._run_ssh_command(f'echo "{self.tag}\n{info}" > {self.DEFAULT_FILEPATH}')
37+
await self._run_ssh_command(f'echo "{self.tag}\n{info}" > {self.DEFAULT_FILEPATH}')
3838
return True
39-
except subprocess.CalledProcessError:
39+
except RuntimeError:
4040
print('Could not write mutex file')
4141
return False
4242

4343
@property
4444
def tag(self) -> str:
4545
return f'{self.user_id} {datetime.now().isoformat()}'
4646

47-
def _run_ssh_command(self, command: str) -> str:
48-
ssh_command = ['ssh', self.host, '-p', str(self.port), command]
49-
return subprocess.check_output(ssh_command, stderr=subprocess.DEVNULL).decode()
47+
async def _run_ssh_command(self, command: str) -> str:
48+
process = await asyncio.create_subprocess_exec(
49+
'ssh', self.host, '-p', str(self.port), command,
50+
stdout=asyncio.subprocess.PIPE,
51+
stderr=asyncio.subprocess.DEVNULL,
52+
)
53+
stdout, _ = await process.communicate()
54+
if process.returncode != 0:
55+
raise RuntimeError(f'SSH command failed with return code {process.returncode}')
56+
return stdout.decode()

livesync/run_subprocess.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
1+
import asyncio
12
import subprocess
23

34

4-
def run_subprocess(command: str, *, quiet: bool = False) -> None:
5-
try:
6-
result = subprocess.run(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, check=True)
7-
if not quiet:
8-
print(result.stdout.decode())
9-
except subprocess.CalledProcessError as e:
10-
print(e.stdout.decode())
11-
raise
5+
async def run_subprocess(command: str, *, quiet: bool = False) -> None:
6+
process = await asyncio.create_subprocess_shell(
7+
command,
8+
stdout=asyncio.subprocess.PIPE,
9+
stderr=asyncio.subprocess.STDOUT,
10+
)
11+
stdout, _ = await process.communicate()
12+
if process.returncode != 0:
13+
print(stdout.decode())
14+
raise subprocess.CalledProcessError(process.returncode, command, stdout)
15+
if not quiet:
16+
print(stdout.decode())

livesync/sync.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@ async def run_folder_tasks(
1919
mutexes = {folder.host: Mutex(folder.host, folder.ssh_port) for folder in folders}
2020
for mutex in mutexes.values():
2121
print(f'Checking mutex on {mutex.host}', flush=True)
22-
if not mutex.set(summary):
22+
if not await mutex.set(summary):
2323
print(f'Target is in use by {mutex.occupant}')
2424
sys.exit(1)
2525

2626
for folder in folders:
2727
print(f' {folder.source_path} --> {folder.target}', flush=True)
28-
folder.sync()
28+
await folder.sync()
2929

3030
if watch:
3131
for folder in folders:
@@ -36,7 +36,7 @@ async def run_folder_tasks(
3636
if not ignore_mutex:
3737
summary = get_summary(folders)
3838
for mutex in mutexes.values():
39-
if not mutex.set(summary):
39+
if not await mutex.set(summary):
4040
break
4141
await asyncio.sleep(mutex_interval)
4242
except Exception as e:

0 commit comments

Comments
 (0)