Skip to content

Commit 8b19a47

Browse files
Refactor sync command
1 parent cfbd683 commit 8b19a47

File tree

3 files changed

+102
-63
lines changed

3 files changed

+102
-63
lines changed

cloudinary_cli/modules/sync.py

Lines changed: 76 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import logging
2-
from functools import reduce
3-
from itertools import product
2+
from collections import Counter
3+
from itertools import groupby
44
from os import path, remove
55

66
from click import command, argument, option, style
@@ -10,7 +10,7 @@
1010
from cloudinary_cli.utils.file_utils import walk_dir, delete_empty_dirs, get_destination_folder, \
1111
normalize_file_extension, posix_rel_path
1212
from cloudinary_cli.utils.json_utils import print_json, read_json_from_file, write_json_to_file
13-
from cloudinary_cli.utils.utils import logger, run_tasks_concurrently, get_user_action, invert_dict
13+
from cloudinary_cli.utils.utils import logger, run_tasks_concurrently, get_user_action, invert_dict, chunker
1414

1515
_DEFAULT_DELETION_BATCH_SIZE = 30
1616
_DEFAULT_CONCURRENT_WORKERS = 30
@@ -108,24 +108,10 @@ def __init__(self, local_dir, remote_dir, include_hidden, concurrent_workers, fo
108108
if self.synced_files_count:
109109
logger.info(f"Skipping {self.synced_files_count} items")
110110

111-
def _get_out_of_sync_file_names(self, common_file_names):
112-
logger.debug("\nCalculating differences...\n")
113-
out_of_sync_file_names = set()
114-
for f in common_file_names:
115-
local_etag = self.local_files[f]['etag']
116-
remote_etag = self.recovered_remote_files[f]['etag']
117-
if local_etag != remote_etag:
118-
logger.warning(f"'{f}' is out of sync" +
119-
(f" with '{self.diverse_file_names[f]}'" if f in self.diverse_file_names else ""))
120-
logger.debug(f"Local etag: {local_etag}. Remote etag: {remote_etag}")
121-
out_of_sync_file_names.add(f)
122-
continue
123-
logger.debug(f"'{f}' is in sync" +
124-
(f" with '{self.diverse_file_names[f]}'" if f in self.diverse_file_names else ""))
125-
126-
return out_of_sync_file_names
127-
128111
def push(self):
112+
"""
113+
Pushes changes from the local folder to the Cloudinary folder.
114+
"""
129115
if not self._handle_unique_remote_files():
130116
logger.info("Aborting...")
131117
return False
@@ -160,6 +146,36 @@ def push(self):
160146
if upload_errors:
161147
raise Exception("Sync did not finish successfully")
162148

149+
def pull(self):
150+
"""
151+
Pulls changes from the Cloudinary folder to the local folder.
152+
"""
153+
download_results = {}
154+
download_errors = {}
155+
if not self._handle_unique_local_files():
156+
return False
157+
158+
files_to_pull = self.unique_remote_file_names | self.out_of_sync_remote_file_names
159+
160+
if not files_to_pull:
161+
return True
162+
163+
logger.info(f"Downloading {len(files_to_pull)} files from Cloudinary")
164+
downloads = []
165+
for file in files_to_pull:
166+
remote_file = self.remote_files[file]
167+
local_path = path.abspath(path.join(self.local_dir, file))
168+
169+
downloads.append((remote_file, local_path, download_results, download_errors))
170+
171+
try:
172+
run_tasks_concurrently(download_file, downloads, self.concurrent_workers)
173+
finally:
174+
self._print_sync_status(download_results, download_errors)
175+
176+
if download_errors:
177+
raise Exception("Sync did not finish successfully")
178+
163179
def _print_sync_status(self, success, errors):
164180
logger.info("==Sync Status==")
165181
logger.info("===============")
@@ -190,6 +206,12 @@ def _save_sync_meta_file(self, upload_results):
190206
logger.warning(f"Failed updating '{self.sync_meta_file}' file: {e}")
191207

192208
def _handle_unique_remote_files(self):
209+
"""
210+
Handles remote files (on Cloudinary servers) that do not exist in the local folder.
211+
User can decide to keep them or to delete. Optionally user can abort the operation.
212+
213+
:return: True if successful, otherwise False
214+
"""
193215
handled = self._handle_files_deletion(len(self.unique_remote_file_names), "remote")
194216
if handled is not None:
195217
return handled
@@ -198,59 +220,51 @@ def _handle_unique_remote_files(self):
198220
f"from Cloudinary folder '{self.user_friendly_remote_dir}'")
199221
files_to_delete_from_cloudinary = list(map(lambda x: self.remote_files[x], self.unique_remote_file_names))
200222

201-
for i in product({"upload", "private", "authenticated"}, {"image", "video", "raw"}):
202-
batch = list(map(lambda x: x['public_id'],
203-
filter(lambda x: x["type"] == i[0] and x["resource_type"] == i[1],
204-
files_to_delete_from_cloudinary)))
205-
if not len(batch):
206-
continue
223+
# We group files into batches by resource_type and type to reduce the number of API calls.
224+
batches = groupby(files_to_delete_from_cloudinary, lambda file: (file["resource_type"], file["type"]))
225+
for attrs, batch_iter in batches:
226+
batch = [file["public_id"] for file in batch_iter]
227+
logger.info("Deleting {} resources with resource_type '{}' and type '{}'".format(len(batch), *attrs))
207228

208-
logger.info("Deleting {} resources with type '{}' and resource_type '{}'".format(len(batch), *i))
209-
counter = 0
210-
while counter * self.deletion_batch_size < len(batch) and len(batch) > 0:
211-
counter += 1
212-
res = api.delete_resources(
213-
batch[(counter - 1) * self.deletion_batch_size:counter * self.deletion_batch_size], invalidate=True,
214-
resource_type=i[1], type=i[0])
215-
num_deleted = reduce(lambda x, y: x + 1 if y == "deleted" else x, res['deleted'].values(), 0)
229+
# Each batch is further chunked by a deletion batch size that can be specified by the user.
230+
for deletion_batch in chunker(batch, self.deletion_batch_size):
231+
res = api.delete_resources(deletion_batch, invalidate=True, resource_type=attrs[0], type=attrs[1])
232+
num_deleted = Counter(res['deleted'].values())["deleted"]
216233
if self.verbose:
217234
print_json(res)
218-
if num_deleted != len(batch):
219-
logger.error("Failed deletes:\n{}".format("\n".join(list(
220-
map(lambda x: x[0], filter(lambda x: x[1] != 'deleted', res['deleted'].items()))))))
235+
if num_deleted != len(deletion_batch):
236+
# This should not happen in reality, unless some terrible race condition happens with the folder.
237+
failed = [f"{file}: {reason}" for file, reason in res['deleted'].items() if reason != "deleted"]
238+
logger.error("Failed deletes:\n{}".format("\n".join(failed)))
221239
else:
222240
logger.info(style(f"Deleted {num_deleted} resources", fg="green"))
223241

224242
return True
225243

226-
def pull(self):
227-
download_results = {}
228-
download_errors = {}
229-
if not self._handle_unique_local_files():
230-
return False
231-
232-
files_to_pull = self.unique_remote_file_names | self.out_of_sync_remote_file_names
233-
234-
if not files_to_pull:
235-
return True
236-
237-
logger.info(f"Downloading {len(files_to_pull)} files from Cloudinary")
238-
downloads = []
239-
for file in files_to_pull:
240-
remote_file = self.remote_files[file]
241-
local_path = path.abspath(path.join(self.local_dir, file))
242-
243-
downloads.append((remote_file, local_path, download_results, download_errors))
244-
245-
try:
246-
run_tasks_concurrently(download_file, downloads, self.concurrent_workers)
247-
finally:
248-
self._print_sync_status(download_results, download_errors)
244+
def _get_out_of_sync_file_names(self, common_file_names):
245+
logger.debug("\nCalculating differences...\n")
246+
out_of_sync_file_names = set()
247+
for f in common_file_names:
248+
local_etag = self.local_files[f]['etag']
249+
remote_etag = self.recovered_remote_files[f]['etag']
250+
if local_etag != remote_etag:
251+
logger.warning(f"'{f}' is out of sync" +
252+
(f" with '{self.diverse_file_names[f]}'" if f in self.diverse_file_names else ""))
253+
logger.debug(f"Local etag: {local_etag}. Remote etag: {remote_etag}")
254+
out_of_sync_file_names.add(f)
255+
continue
256+
logger.debug(f"'{f}' is in sync" +
257+
(f" with '{self.diverse_file_names[f]}'" if f in self.diverse_file_names else ""))
249258

250-
if download_errors:
251-
raise Exception("Sync did not finish successfully")
259+
return out_of_sync_file_names
252260

253261
def _handle_unique_local_files(self):
262+
"""
263+
Handles local files that do not exist on the Cloudinary server.
264+
User can decide to keep them or to delete. Optionally user can abort the operation.
265+
266+
:return: True if successful, otherwise False
267+
"""
254268
handled = self._handle_files_deletion(len(self.unique_local_file_names), "local")
255269
if handled is not None:
256270
return handled

cloudinary_cli/utils/utils.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,3 +262,23 @@ def normalize_list_params(params):
262262
normalized_params.append(f)
263263

264264
return normalized_params
265+
266+
267+
def chunker(seq, size):
268+
"""
269+
Iterates a sequence in chunks of a given size.
270+
271+
>>> for group in chunker(['cat', 'dog', 'rabbit', 'duck', 'bird', 'cow', 'gnu', 'fish'], 3):
272+
>>> print(group)
273+
274+
Produces:
275+
276+
['cat', 'dog', 'rabbit']
277+
['duck', 'bird', 'cow']
278+
['gnu', 'fish']
279+
280+
:param seq: The sequence to iterate.
281+
:param size: The size of a single chunk.
282+
:return: a single chunk
283+
"""
284+
return (seq[pos:pos + size] for pos in range(0, len(seq), size))

test/test_utils.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import unittest
22

33
from cloudinary_cli.utils.utils import parse_option_value, parse_args_kwargs, whitelist_keys, merge_responses, \
4-
normalize_list_params
4+
normalize_list_params, chunker
55

66

77
class UtilsTest(unittest.TestCase):
@@ -90,6 +90,11 @@ def test_normalize_list_params(self):
9090
""" should normalize a list of parameters """
9191
self.assertEqual(["f1", "f2", "f3"], normalize_list_params(["f1,f2", "f3"]))
9292

93+
def test_chunker(self):
94+
animals = ['cat', 'dog', 'rabbit', 'duck', 'bird', 'cow', 'gnu', 'fish']
95+
groups = [group for group in chunker(animals, 3)]
96+
self.assertListEqual([['cat', 'dog', 'rabbit'], ['duck', 'bird', 'cow'], ['gnu', 'fish']], groups)
97+
9398

9499
def _no_args_test_func():
95100
pass

0 commit comments

Comments
 (0)