Skip to content

Commit 3be8e09

Browse files
authored
More rsync fixes (#360)
* Replaced 'procrunner' with 'subprocess' * Fixed rsync subprocesses by prepending them with 'bash -c' Co-authored by: stephen-riggs <[email protected]>
1 parent 975e2cb commit 3be8e09

File tree

7 files changed

+156
-79
lines changed

7 files changed

+156
-79
lines changed

src/murfey/client/__init__.py

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,31 @@ def read_config() -> configparser.ConfigParser:
6565
requests.delete = partial(requests.delete, headers={"Authorization": f"Bearer {token}"})
6666

6767

68+
def write_config(config: configparser.ConfigParser):
69+
mcch = os.environ.get("MURFEY_CLIENT_CONFIG_HOME")
70+
murfey_client_config_home = Path(mcch) if mcch else Path.home()
71+
with open(murfey_client_config_home / ".murfey", "w") as configfile:
72+
config.write(configfile)
73+
74+
75+
def main_loop(
76+
source_watchers: List[murfey.client.watchdir.DirWatcher],
77+
appearance_time: float,
78+
transfer_all: bool,
79+
):
80+
log.info(
81+
f"Murfey {murfey.__version__} on Python {'.'.join(map(str, sys.version_info[0:3]))} entering main loop"
82+
)
83+
if appearance_time > 0:
84+
modification_time: float | None = time.time() - appearance_time * 3600
85+
else:
86+
modification_time = None
87+
while True:
88+
for sw in source_watchers:
89+
sw.scan(modification_time=modification_time, transfer_all=transfer_all)
90+
time.sleep(15)
91+
92+
6893
def _enable_webbrowser_in_cygwin():
6994
"""Helper function to make webbrowser.open() work in CygWin"""
7095
if "cygwin" in platform.system().lower() and shutil.which("cygstart"):
@@ -325,28 +350,3 @@ def run():
325350
)
326351
app.run()
327352
rich_handler.redirect = False
328-
329-
330-
def main_loop(
331-
source_watchers: List[murfey.client.watchdir.DirWatcher],
332-
appearance_time: float,
333-
transfer_all: bool,
334-
):
335-
log.info(
336-
f"Murfey {murfey.__version__} on Python {'.'.join(map(str, sys.version_info[0:3]))} entering main loop"
337-
)
338-
if appearance_time > 0:
339-
modification_time: float | None = time.time() - appearance_time * 3600
340-
else:
341-
modification_time = None
342-
while True:
343-
for sw in source_watchers:
344-
sw.scan(modification_time=modification_time, transfer_all=transfer_all)
345-
time.sleep(15)
346-
347-
348-
def write_config(config: configparser.ConfigParser):
349-
mcch = os.environ.get("MURFEY_CLIENT_CONFIG_HOME")
350-
murfey_client_config_home = Path(mcch) if mcch else Path.home()
351-
with open(murfey_client_config_home / ".murfey", "w") as configfile:
352-
config.write(configfile)

src/murfey/client/multigrid_control.py

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import json
22
import logging
3+
import subprocess
34
import threading
45
from dataclasses import dataclass, field
56
from datetime import datetime
@@ -8,7 +9,6 @@
89
from typing import Dict, List, Optional
910
from urllib.parse import urlparse
1011

11-
import procrunner
1212
import requests
1313

1414
import murfey.client.websocket
@@ -19,7 +19,7 @@
1919
from murfey.client.rsync import RSyncer, RSyncerUpdate, TransferResult
2020
from murfey.client.tui.screens import determine_default_destination
2121
from murfey.client.watchdir import DirWatcher
22-
from murfey.util import capture_post
22+
from murfey.util import capture_post, posix_path
2323

2424
log = logging.getLogger("murfey.client.mutligrid_control")
2525

@@ -180,16 +180,23 @@ def _start_rsyncer(
180180
if self._environment:
181181
self._environment.default_destinations[source] = destination
182182
if self._environment.gain_ref and visit_path:
183-
gain_rsync = procrunner.run(
184-
[
185-
"rsync",
186-
str(self._environment.gain_ref),
187-
f"{self._environment.url.hostname}::{visit_path}/processing",
188-
]
189-
)
183+
# Set up rsync command
184+
rsync_cmd = [
185+
"rsync",
186+
f"{posix_path(self._environment.gain_ref)!r}", # '!r' will print strings in ''
187+
f"{self._environment.url.hostname}::{visit_path}/processing",
188+
]
189+
# Wrap in bash shell
190+
cmd = [
191+
"bash",
192+
"-c",
193+
" ".join(rsync_cmd),
194+
]
195+
# Run rsync subprocess
196+
gain_rsync = subprocess.run(cmd)
190197
if gain_rsync.returncode:
191198
log.warning(
192-
f"Gain reference file {self._environment.gain_ref} was not successfully transferred to {visit_path}/processing"
199+
f"Gain reference file {posix_path(self._environment.gain_ref)!r} was not successfully transferred to {visit_path}/processing"
193200
)
194201
if transfer:
195202
self.rsync_processes[source] = RSyncer(

src/murfey/client/rsync.py

Lines changed: 66 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818
from typing import Callable, List, NamedTuple
1919
from urllib.parse import ParseResult
2020

21-
import procrunner
22-
2321
from murfey.client.tui.status_bar import StatusBar
2422
from murfey.util import Observer
2523

@@ -380,16 +378,19 @@ def parse_stdout(line: str):
380378
return
381379

382380
def parse_stderr(line: str):
383-
logger.warning(f"rsync stderr: {line!r}")
381+
if line.strip():
382+
logger.warning(f"rsync stderr: {line!r}")
384383

385384
# Generate list of relative filenames for this batch of transferred files
385+
# Relative filenames will be safe to use on both Windows and Unix
386386
relative_filenames: List[Path] = []
387387
for f in files:
388388
try:
389389
relative_filenames.append(f.relative_to(self._basepath))
390390
except ValueError:
391391
raise ValueError(f"File '{f}' is outside of {self._basepath}") from None
392392

393+
# Encode files to rsync as bytestring
393394
if self._remove_files:
394395
if self._required_substrings_for_removal:
395396
rsync_stdin_remove = b"\n".join(
@@ -418,52 +419,88 @@ def parse_stderr(line: str):
418419
rsync_stdin = b"\n".join(os.fsencode(f) for f in relative_filenames)
419420

420421
# Create and run rsync subprocesses
421-
# rsync commands to pass to subprocess
422+
# rsync default settings
422423
rsync_cmd = [
423424
"rsync",
424425
"-iiv",
425426
"--times",
426427
"--progress",
427428
"--outbuf=line",
428-
"--files-from=-",
429-
"-p", # preserve permissions
430-
"--chmod=D0750,F0750", # 4: Read, 2: Write, 1: Execute | User, Group, Others
429+
"--files-from=-", # '-' indicates reading from standard input
430+
# Needed as a pair to trigger permission modifications
431+
# Ref: https://serverfault.com/a/796341
432+
"-p",
433+
"--chmod=D0750,F0750", # Use extended chmod format
431434
]
435+
# Add file locations
432436
rsync_cmd.extend([".", self._remote])
433-
result: subprocess.CompletedProcess | None = None
437+
438+
# Transfer files to destination
439+
result: subprocess.CompletedProcess[bytes] | None = None
434440
success = True
435441
if rsync_stdin:
436-
result = procrunner.run(
437-
rsync_cmd,
438-
callback_stdout=parse_stdout,
439-
callback_stderr=parse_stderr,
440-
working_directory=str(self._basepath),
441-
stdin=rsync_stdin,
442-
print_stdout=False,
443-
print_stderr=False,
442+
# Wrap rsync command in a bash command
443+
cmd = [
444+
"bash",
445+
"-c",
446+
# rsync command passed in as a single string
447+
" ".join(rsync_cmd),
448+
]
449+
result = subprocess.run(
450+
cmd,
451+
cwd=self._basepath, # As-is Path is fine
452+
capture_output=True,
453+
input=rsync_stdin,
444454
)
445-
success = result.returncode == 0 if result else False
446-
455+
# Parse outputs
456+
for line in result.stdout.decode("utf-8", "replace").split("\n"):
457+
parse_stdout(line)
458+
for line in result.stderr.decode("utf-8", "replace").split("\n"):
459+
parse_stderr(line)
460+
success = result.returncode == 0
461+
462+
# Remove files from source
447463
if rsync_stdin_remove:
464+
# Insert file removal flag before locations
448465
rsync_cmd.insert(-2, "--remove-source-files")
449-
result = procrunner.run(
450-
rsync_cmd,
451-
callback_stdout=parse_stdout,
452-
callback_stderr=parse_stderr,
453-
working_directory=str(self._basepath),
454-
stdin=rsync_stdin_remove,
455-
print_stdout=False,
456-
print_stderr=False,
466+
# Wrap rsync command in a bash command
467+
cmd = [
468+
"bash",
469+
"-c",
470+
# Pass rsync command as single string
471+
" ".join(rsync_cmd),
472+
]
473+
result = subprocess.run(
474+
cmd,
475+
cwd=self._basepath,
476+
capture_output=True,
477+
input=rsync_stdin_remove,
457478
)
458-
479+
# Parse outputs
480+
for line in result.stdout.decode("utf-8", "replace").split("\n"):
481+
parse_stdout(line)
482+
for line in result.stderr.decode("utf-8", "replace").split("\n"):
483+
parse_stderr(line)
484+
# Leave it as a failure if the previous rsync subprocess failed
459485
if success:
460-
success = result.returncode == 0 if result else False
486+
success = result.returncode == 0
461487

462488
self.notify(successful_updates, secondary=True)
463489

490+
# Print out a summary message for each file transfer batch instead of individual messages
491+
# List out file paths as stored in memory to see if issue is due to file path mismatch
492+
if len(set(relative_filenames) - transfer_success) != 0:
493+
logger.debug(
494+
f"Files identified for transfer ({len(relative_filenames)}): {relative_filenames!r}"
495+
)
496+
logger.debug(
497+
f"Files successfully transferred ({len(transfer_success)}): {list(transfer_success)!r}"
498+
)
499+
464500
# Compare files from rsync stdout to original list to verify transfer
465501
for f in set(relative_filenames) - transfer_success:
466-
logger.warning(f"Transfer of file {f.name!r} considered a failure")
502+
# Mute individual file warnings; replace with summarised one above
503+
# logger.warning(f"Transfer of file {f.name!r} considered a failure")
467504
self._files_transferred += 1
468505
current_outstanding = self.queue.unfinished_tasks - (
469506
self._files_transferred - previously_transferred

src/murfey/client/tui/app.py

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
from __future__ import annotations
22

33
import logging
4+
import subprocess
45
from datetime import datetime
56
from functools import partial
67
from pathlib import Path
78
from queue import Queue
89
from typing import Awaitable, Callable, Dict, List, OrderedDict, TypeVar
910
from urllib.parse import urlparse
1011

11-
import procrunner
1212
import requests
1313
from textual.app import App
1414
from textual.reactive import reactive
@@ -36,6 +36,7 @@
3636
from murfey.util import (
3737
capture_post,
3838
get_machine_config_client,
39+
posix_path,
3940
read_config,
4041
set_default_acquisition_output,
4142
)
@@ -211,16 +212,23 @@ def _start_rsyncer(
211212
if self._environment:
212213
self._environment.default_destinations[source] = destination
213214
if self._environment.gain_ref and visit_path:
214-
gain_rsync = procrunner.run(
215-
[
216-
"rsync",
217-
str(self._environment.gain_ref),
218-
f"{self._url.hostname}::{visit_path}/processing",
219-
]
220-
)
215+
# Set up rsync command
216+
rsync_cmd = [
217+
"rsync",
218+
f"{posix_path(self._environment.gain_ref)!r}",
219+
f"{self._url.hostname}::{visit_path}/processing",
220+
]
221+
# Encase in bash shell
222+
cmd = [
223+
"bash",
224+
"-c",
225+
" ".join(rsync_cmd),
226+
]
227+
# Run rsync subprocess
228+
gain_rsync = subprocess.run(cmd)
221229
if gain_rsync.returncode:
222230
log.warning(
223-
f"Gain reference file {self._environment.gain_ref} was not successfully transferred to {visit_path}/processing"
231+
f"Gain reference file {posix_path(self._environment.gain_ref)!r} was not successfully transferred to {visit_path}/processing"
224232
)
225233
if transfer:
226234
self.rsync_processes[source] = RSyncer(

src/murfey/client/tui/screens.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
# import contextlib
44
import logging
5+
import subprocess
56
from datetime import datetime
67
from functools import partial
78
from pathlib import Path
@@ -17,7 +18,6 @@
1718
TypeVar,
1819
)
1920

20-
import procrunner
2121
import requests
2222
from pydantic import BaseModel, ValidationError
2323
from rich.box import SQUARE
@@ -56,7 +56,7 @@
5656
)
5757
from murfey.client.rsync import RSyncer
5858
from murfey.client.tui.forms import FormDependency
59-
from murfey.util import capture_post, get_machine_config_client, read_config
59+
from murfey.util import capture_post, get_machine_config_client, posix_path, read_config
6060
from murfey.util.models import PreprocessingParametersTomo, ProcessingParametersSPA
6161

6262
log = logging.getLogger("murfey.tui.screens")
@@ -906,18 +906,22 @@ def on_button_pressed(self, event):
906906
if event.button.id == "suggested-gain-ref":
907907
self._dir_tree._gain_reference = self._gain_reference
908908
visit_path = f"data/{datetime.now().year}/{self.app._environment.visit}"
909-
cmd = [
909+
# Set up rsync command
910+
rsync_cmd = [
910911
"rsync",
911-
str(self._dir_tree._gain_reference),
912+
f"{posix_path(self._dir_tree._gain_reference)!r}",
912913
f"{self.app._environment.url.hostname}::{visit_path}/processing/{secure_filename(self._dir_tree._gain_reference.name)}",
913914
]
915+
# Encase in bash shell
916+
cmd = ["bash", "-c", " ".join(rsync_cmd)]
914917
if self.app._environment.demo:
915918
log.info(f"Would perform {' '.join(cmd)}")
916919
else:
917-
gain_rsync = procrunner.run(cmd)
920+
# Run rsync subprocess
921+
gain_rsync = subprocess.run(cmd)
918922
if gain_rsync.returncode:
919923
log.warning(
920-
f"Gain reference file {self._dir_tree._gain_reference} was not successfully transferred to {visit_path}/processing"
924+
f"Gain reference file {posix_path(self._dir_tree._gain_reference)!r} was not successfully transferred to {visit_path}/processing"
921925
)
922926
process_gain_response = requests.post(
923927
url=f"{str(self.app._environment.url.geturl())}/sessions/{self.app._environment.murfey_session}/process_gain",

src/murfey/instrument_server/api.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from __future__ import annotations
2+
13
import secrets
24
import time
35
from datetime import datetime

0 commit comments

Comments
 (0)