Skip to content

Commit 4b64e8c

Browse files
authored
Add support for restricted assets in clone command
1 parent 9a989d0 commit 4b64e8c

File tree

2 files changed

+495
-45
lines changed

2 files changed

+495
-45
lines changed

cloudinary_cli/modules/clone.py

Lines changed: 178 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
from click import command, option, style, argument
22
from cloudinary_cli.utils.utils import normalize_list_params, print_help_and_exit
33
import cloudinary
4+
from cloudinary.auth_token import _digest
45
from cloudinary_cli.utils.utils import run_tasks_concurrently
56
from cloudinary_cli.utils.api_utils import upload_file
6-
from cloudinary_cli.utils.config_utils import load_config, get_cloudinary_config, config_to_dict
7+
from cloudinary_cli.utils.config_utils import get_cloudinary_config, config_to_dict
78
from cloudinary_cli.defaults import logger
89
from cloudinary_cli.core.search import execute_single_request, handle_auto_pagination
10+
import time
11+
import re
912

1013
DEFAULT_MAX_RESULTS = 500
14+
ALLOWED_TYPE_VALUES = ("upload", "private", "authenticated")
1115

1216

1317
@command("clone",
@@ -16,7 +20,6 @@
1620
\b
1721
Clone assets from one product environment to another with/without tags and/or context (structured metadata is not currently supported).
1822
Source will be your `CLOUDINARY_URL` environment variable but you also can specify a different source using the `-c/-C` option.
19-
Cloning restricted assets is also not supported currently.
2023
Format: cld clone <target_environment> <command options>
2124
`<target_environment>` can be a CLOUDINARY_URL or a saved config (see `config` command)
2225
Example 1 (Copy all assets including tags and context using CLOUDINARY URL):
@@ -32,50 +35,99 @@
3235
@option("-w", "--concurrent_workers", type=int, default=30,
3336
help="Specify the number of concurrent network threads.")
3437
@option("-fi", "--fields", multiple=True,
35-
help="Specify whether to copy tags and/or context. Valid options: `tags,context`.")
38+
help=("Specify whether to copy tags and/or context. "
39+
"Valid options: `tags,context`."))
3640
@option("-se", "--search_exp", default="",
3741
help="Define a search expression to filter the assets to clone.")
3842
@option("--async", "async_", is_flag=True, default=False,
3943
help="Clone the assets asynchronously.")
4044
@option("-nu", "--notification_url",
4145
help="Webhook notification URL.")
42-
def clone(target, force, overwrite, concurrent_workers, fields, search_exp, async_, notification_url):
46+
@option("-ue", "--url_expiry", type=int, default=3600,
47+
help=("URL expiration duration in seconds. Only relevant if cloning "
48+
"restricted assets with an auth_key configured. "
49+
"If you do not provide an auth_key, "
50+
"a private download URL is generated which may incur additional "
51+
"bandwidth costs."))
52+
def clone(target, force, overwrite, concurrent_workers, fields,
53+
search_exp, async_, notification_url, url_expiry):
54+
target_config, auth_token = _validate_clone_inputs(target)
55+
if not target_config:
56+
return False
57+
58+
source_assets = search_assets(search_exp, force)
59+
if not source_assets:
60+
return False
61+
if not isinstance(source_assets, dict) or not source_assets.get('resources'):
62+
logger.error(style(f"No asset(s) found in {cloudinary.config().cloud_name}", fg="red"))
63+
return False
64+
65+
upload_list = _prepare_upload_list(
66+
source_assets, target_config, overwrite, async_,
67+
notification_url, auth_token, url_expiry, fields
68+
)
69+
70+
logger.info(style(f"Copying {len(upload_list)} asset(s) from "
71+
f"{cloudinary.config().cloud_name} to "
72+
f"{target_config.cloud_name}", fg="blue"))
73+
74+
run_tasks_concurrently(upload_file, upload_list, concurrent_workers)
75+
76+
return True
77+
78+
79+
def _validate_clone_inputs(target):
4380
if not target:
4481
print_help_and_exit()
4582

4683
target_config = get_cloudinary_config(target)
4784
if not target_config:
48-
logger.error("The specified config does not exist or the CLOUDINARY_URL scheme provided is invalid"
49-
" (expecting to start with 'cloudinary://').")
50-
return False
85+
logger.error("The specified config does not exist or the "
86+
"CLOUDINARY_URL scheme provided is invalid "
87+
"(expecting to start with 'cloudinary://').")
88+
return None, None
5189

5290
if cloudinary.config().cloud_name == target_config.cloud_name:
53-
logger.error("Target environment cannot be the same as source environment.")
54-
return False
55-
56-
source_assets = search_assets(force, search_exp)
57-
91+
logger.error("Target environment cannot be the same "
92+
"as source environment.")
93+
return None, None
94+
95+
auth_token = cloudinary.config().auth_token
96+
if auth_token:
97+
# It is important to validate auth_token if provided as this prevents
98+
# customer from having to re-run the command as well as
99+
# saving Admin API calls and time.
100+
try:
101+
cloudinary.utils.generate_auth_token(acl="/image/*")
102+
except Exception as e:
103+
logger.error(f"{e} - auth_token validation failed. "
104+
"Please double-check your auth_token parameters.")
105+
return None, None
106+
107+
return target_config, auth_token
108+
109+
110+
def _prepare_upload_list(source_assets, target_config, overwrite, async_,
111+
notification_url, auth_token, url_expiry, fields):
58112
upload_list = []
59113
for r in source_assets.get('resources'):
60-
updated_options, asset_url = process_metadata(r, overwrite, async_, notification_url,
114+
updated_options, asset_url = process_metadata(r, overwrite, async_,
115+
notification_url,
116+
auth_token, url_expiry,
61117
normalize_list_params(fields))
62118
updated_options.update(config_to_dict(target_config))
63119
upload_list.append((asset_url, {**updated_options}))
120+
return upload_list
64121

65-
if not upload_list:
66-
logger.error(style(f'No assets found in {cloudinary.config().cloud_name}', fg="red"))
67-
return False
68-
69-
logger.info(style(f'Copying {len(upload_list)} asset(s) from {cloudinary.config().cloud_name} to {target_config.cloud_name}', fg="blue"))
70-
71-
run_tasks_concurrently(upload_file, upload_list, concurrent_workers)
72-
73-
return True
74122

123+
def search_assets(search_exp, force):
124+
search_exp = _normalize_search_expression(search_exp)
125+
if not search_exp:
126+
return False
75127

76-
def search_assets(force, search_exp):
77128
search = cloudinary.search.Search().expression(search_exp)
78-
search.fields(['tags', 'context', 'access_control', 'secure_url', 'display_name'])
129+
search.fields(['tags', 'context', 'access_control',
130+
'secure_url', 'display_name', 'format'])
79131
search.max_results(DEFAULT_MAX_RESULTS)
80132

81133
res = execute_single_request(search, fields_to_keep="")
@@ -84,29 +136,110 @@ def search_assets(force, search_exp):
84136
return res
85137

86138

87-
def process_metadata(res, overwrite, async_, notification_url, copy_fields=""):
88-
cloned_options = {}
89-
asset_url = res.get('secure_url')
90-
cloned_options['public_id'] = res.get('public_id')
91-
cloned_options['type'] = res.get('type')
92-
cloned_options['resource_type'] = res.get('resource_type')
93-
cloned_options['overwrite'] = overwrite
94-
cloned_options['async'] = async_
95-
if "tags" in copy_fields:
96-
cloned_options['tags'] = res.get('tags')
97-
if "context" in copy_fields:
98-
cloned_options['context'] = res.get('context')
139+
def _normalize_search_expression(search_exp):
140+
"""
141+
Ensures the search expression has a valid 'type' filter.
142+
143+
- If no expression is given, a default is created.
144+
- If 'type' filters exist, they are validated.
145+
- If no 'type' filters exist, the default is appended.
146+
"""
147+
default_types_str = " OR ".join(f"type:{t}" for t in ALLOWED_TYPE_VALUES)
148+
149+
if not search_exp:
150+
return default_types_str
151+
152+
# Use a simple regex to find all 'type' filters
153+
found_types = re.findall(r"\btype\s*[:=]\s*(\w+)", search_exp)
154+
155+
if not found_types:
156+
# No 'type' filter found, so append the default
157+
return f"{search_exp} AND ({default_types_str})"
158+
159+
# A 'type' filter was found, so validate it
160+
invalid_types = {t for t in found_types if t not in ALLOWED_TYPE_VALUES}
161+
162+
if invalid_types:
163+
error_msg = ", ".join(f"type:{t}" for t in invalid_types)
164+
logger.error(
165+
f"Unsupported type(s) in search expression: {error_msg}. "
166+
f"Only {', '.join(ALLOWED_TYPE_VALUES)} types allowed."
167+
)
168+
return None
169+
170+
# All found types are valid, so return the original expression
171+
return search_exp
172+
173+
174+
def process_metadata(res, overwrite, async_, notification_url, auth_token, url_expiry, copy_fields=None):
175+
if copy_fields is None:
176+
copy_fields = []
177+
asset_url = _get_asset_url(res, auth_token, url_expiry)
178+
cloned_options = _build_cloned_options(res, overwrite, async_, notification_url, copy_fields)
179+
180+
return cloned_options, asset_url
181+
182+
183+
def _get_asset_url(res, auth_token, url_expiry):
184+
if not (isinstance(res.get('access_control'), list) and
185+
len(res.get('access_control')) > 0 and
186+
isinstance(res['access_control'][0], dict) and
187+
res['access_control'][0].get("access_type") == "token"):
188+
return res.get('secure_url')
189+
190+
reso_type = res.get('resource_type')
191+
del_type = res.get('type')
192+
pub_id = res.get('public_id')
193+
file_format = res.get('format')
194+
195+
if auth_token:
196+
# Raw assets already have the format in the public_id
197+
pub_id_format = pub_id if reso_type == "raw" else f"{pub_id}.{file_format}"
198+
return cloudinary.utils.cloudinary_url(
199+
pub_id_format,
200+
type=del_type,
201+
resource_type=reso_type,
202+
auth_token={"duration": url_expiry},
203+
secure=True,
204+
sign_url=True
205+
)
206+
207+
# Use private url if no auth_token provided
208+
return cloudinary.utils.private_download_url(
209+
pub_id,
210+
file_format,
211+
resource_type=reso_type,
212+
type=del_type,
213+
expires_at=int(time.time()) + url_expiry
214+
)
215+
216+
217+
def _build_cloned_options(res, overwrite, async_, notification_url, copy_fields):
218+
# 1. Start with mandatory options
219+
cloned_options = {
220+
'overwrite': overwrite,
221+
'async': async_,
222+
}
223+
224+
# 2. Copy fields from source asset. Some are standard, others are from user input.
225+
fields_to_copy = {'public_id', 'type', 'resource_type', 'access_control'}.union(copy_fields)
226+
cloned_options.update({field: res.get(field) for field in fields_to_copy})
227+
228+
# 3. Handle fields that are added only if they have a truthy value
229+
if res.get('display_name'):
230+
cloned_options['display_name'] = res['display_name']
231+
232+
# This is required to put the asset in the correct asset_folder
233+
# when copying from a fixed to DF (dynamic folder) cloud as if
234+
# you just pass a `folder` param to a DF cloud, it will append
235+
# this to the `public_id` and we don't want this.
99236
if res.get('folder'):
100-
# This is required to put the asset in the correct asset_folder
101-
# when copying from a fixed to DF (dynamic folder) cloud as if
102-
# you just pass a `folder` param to a DF cloud, it will append
103-
# this to the `public_id` and we don't want this.
104-
cloned_options['asset_folder'] = res.get('folder')
237+
cloned_options['asset_folder'] = res['folder']
105238
elif res.get('asset_folder'):
106-
cloned_options['asset_folder'] = res.get('asset_folder')
107-
if res.get('display_name'):
108-
cloned_options['display_name'] = res.get('display_name')
239+
cloned_options['asset_folder'] = res['asset_folder']
240+
109241
if notification_url:
110242
cloned_options['notification_url'] = notification_url
111243

112-
return cloned_options, asset_url
244+
# 4. Clean up any None values before returning
245+
return {k: v for k, v in cloned_options.items() if v is not None}

0 commit comments

Comments
 (0)