|
1 | | -import os |
2 | 1 | import logging |
| 2 | +import os |
3 | 3 | import re |
4 | 4 | from datetime import datetime |
5 | | -from dataflow_transfer.utils.statusdb import StatusdbSession |
| 5 | + |
6 | 6 | import dataflow_transfer.utils.filesystem as fs |
| 7 | +from dataflow_transfer.utils.statusdb import StatusdbSession |
7 | 8 |
|
8 | 9 | logger = logging.getLogger(__name__) |
9 | 10 |
|
@@ -65,53 +66,33 @@ def generate_rsync_command(self, is_final_sync=False): |
65 | 66 | command_str += f"; echo $? > {self.final_rsync_exitcode_file}" |
66 | 67 | return command_str |
67 | 68 |
|
68 | | - def initiate_background_transfer(self): |
| 69 | + def start_transfer(self, final=False): |
69 | 70 | """Start background rsync transfer to storage.""" |
70 | | - background_transfer_command = self.generate_rsync_command(is_final_sync=False) |
| 71 | + transfer_command = self.generate_rsync_command(is_final_sync=final) |
71 | 72 | if fs.rsync_is_running(src=self.run_dir): |
72 | 73 | logger.info( |
73 | 74 | f"Rsync is already running for {self.run_dir}. Skipping background transfer initiation." |
74 | 75 | ) |
75 | 76 | return |
76 | 77 | try: |
77 | | - fs.submit_background_process(background_transfer_command) |
| 78 | + fs.submit_background_process(transfer_command) |
78 | 79 | logger.info( |
79 | | - f"{self.run_id}: Started background rsync to {self.miarka_destination}" |
80 | | - + f" with the following command: '{background_transfer_command}'" |
| 80 | + f"{self.run_id}: Started rsync to {self.miarka_destination}" |
| 81 | + + f" with the following command: '{transfer_command}'" |
81 | 82 | ) |
82 | 83 | except Exception as e: |
83 | | - logger.error(f"Failed to start background transfer for {self.run_id}: {e}") |
| 84 | + logger.error(f"Failed to start rsync for {self.run_id}: {e}") |
84 | 85 | raise e |
85 | 86 | rsync_info = { |
86 | | - "command": background_transfer_command, |
| 87 | + "command": transfer_command, |
87 | 88 | "destination_path": self.miarka_destination, |
88 | 89 | } |
89 | | - self.update_statusdb(status="transfer_started", additional_info=rsync_info) |
90 | | - |
91 | | - def do_final_transfer(self): |
92 | | - """Start final rsync transfer to storage.""" |
93 | | - final_transfer_command = self.generate_rsync_command(is_final_sync=True) |
94 | | - if fs.rsync_is_running(src=self.run_dir): |
95 | | - logger.info( |
96 | | - f"Rsync is already running for {self.run_dir}. Skipping final transfer initiation." |
97 | | - ) |
98 | | - return |
99 | | - try: |
100 | | - fs.submit_background_process(final_transfer_command) |
101 | | - logger.info( |
102 | | - f"{self.run_id}: Started FINAL rsync to {self.miarka_destination}" |
103 | | - + f" with the following command: '{final_transfer_command}'" |
| 90 | + if final: |
| 91 | + self.update_statusdb( |
| 92 | + status="final_transfer_started", additional_info=rsync_info |
104 | 93 | ) |
105 | | - except Exception as e: |
106 | | - logger.error(f"Failed to start final transfer for {self.run_id}: {e}") |
107 | | - raise e |
108 | | - rsync_info = { |
109 | | - "command": final_transfer_command, |
110 | | - "destination_path": self.miarka_destination, |
111 | | - } |
112 | | - self.update_statusdb( |
113 | | - status="final_transfer_started", additional_info=rsync_info |
114 | | - ) |
| 94 | + else: |
| 95 | + self.update_statusdb(status="transfer_started", additional_info=rsync_info) |
115 | 96 |
|
116 | 97 | @property |
117 | 98 | def final_sync_successful(self): |
|
0 commit comments