Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 130 additions & 7 deletions scripts/setup_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -1012,7 +1012,6 @@ def validate_all_config_files(
for skill_item in skills_list:
if isinstance(skill_item, dict):
skill_dict = cast(dict[str, Any], skill_item)
skill_name = skill_dict.get('name', 'unknown')
skill_base = skill_dict.get('base', '')
skill_files = skill_dict.get('files', [])

Expand All @@ -1025,11 +1024,11 @@ def validate_all_config_files(
# Convert tree/blob URLs to raw URLs for validation
raw_base = convert_to_raw_url(skill_base)
full_url = f"{raw_base.rstrip('/')}/{skill_file_item}"
files_to_check.append(('skill', f'{skill_name}/{skill_file_item}', full_url, True))
files_to_check.append(('skill', full_url, full_url, True))
else:
resolved_base, _ = resolve_resource_path(skill_base, config_source, None)
full_path = str(Path(resolved_base) / skill_file_item)
files_to_check.append(('skill', f'{skill_name}/{skill_file_item}', full_path, False))
files_to_check.append(('skill', full_path, full_path, False))

# Validate each file
info(f'Validating {len(files_to_check)} files...')
Expand Down Expand Up @@ -1093,6 +1092,35 @@ def download_file(url: str, destination: Path, force: bool = True) -> bool:
return False


# Frozen set of binary file extensions (immutable for safety)
BINARY_EXTENSIONS: frozenset[str] = frozenset([
# Archives
'.tar.gz', '.tgz', '.gz', '.zip', '.7z', '.rar',
'.tar', '.bz2', '.xz', '.lz4', '.zst',
# Images
'.png', '.jpg', '.jpeg', '.gif', '.bmp', '.ico', '.webp', '.svg',
# Documents
'.pdf', '.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx',
# Executables
'.exe', '.dll', '.so', '.dylib',
# Python
'.whl', '.pyc', '.pyo',
])


def is_binary_file(file_path: str | Path) -> bool:
"""Check if a file is binary based on its extension.

Args:
file_path: Path to the file (can be URL, local path, or filename)

Returns:
bool: True if the file extension indicates a binary file
"""
path_str = str(file_path).lower()
return any(path_str.endswith(ext) for ext in BINARY_EXTENSIONS)


def detect_repo_type(url: str) -> str | None:
"""Detect the repository type from URL.

Expand Down Expand Up @@ -2562,6 +2590,89 @@ def fetch_url_with_auth(url: str, auth_headers: dict[str, str] | None = None, au
raise


def fetch_url_bytes_with_auth(
url: str,
auth_headers: dict[str, str] | None = None,
auth_param: str | None = None,
) -> bytes:
"""Fetch URL content as bytes, trying without auth first, then with auth if needed.

Similar to fetch_url_with_auth but returns raw bytes without decoding.
Use this for binary files like .tar.gz, .zip, images, etc.

Args:
url: URL to fetch
auth_headers: Optional pre-computed auth headers
auth_param: Optional auth parameter for getting headers

Returns:
bytes: Raw content of the URL

Raises:
HTTPError: If the HTTP request fails after authentication attempts
URLError: If there's a URL/network error (including SSL issues)
"""
# Convert GitLab web URLs to API URLs for authentication
original_url = url
if detect_repo_type(url) == 'gitlab' and '/-/raw/' in url:
url = convert_gitlab_url_to_api(url)
if url != original_url:
info(f'Using API URL: {url}')

# First try without auth (for public repos)
try:
request = Request(url)
response = urlopen(request)
content: bytes = response.read()
return content
except urllib.error.HTTPError as e:
if e.code in (401, 403, 404):
# Authentication might be needed
if not auth_headers:
auth_headers = get_auth_headers(url, auth_param)

if auth_headers:
info('Retrying with authentication...')
request = Request(url)
for header, value in auth_headers.items():
request.add_header(header, value)
try:
response = urlopen(request)
result: bytes = response.read()
return result
except urllib.error.HTTPError as auth_e:
if auth_e.code == 401:
error('Authentication failed. Check your token.')
elif auth_e.code == 403:
error('Access forbidden. Token may lack permissions.')
elif auth_e.code == 404:
error('Resource not found. Check URL and permissions.')
raise
elif e.code == 404:
raise
else:
warning('Authentication may be required for this URL')
raise
else:
raise
except urllib.error.URLError as e:
if 'SSL' in str(e) or 'certificate' in str(e).lower():
warning('SSL certificate verification failed, trying with unverified context')
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE

request = Request(url)
if auth_headers:
for header, value in auth_headers.items():
request.add_header(header, value)

response = urlopen(request, context=ctx)
ctx_result: bytes = response.read()
return ctx_result
raise


def extract_front_matter(file_path: Path) -> dict[str, Any] | None:
"""Extract YAML front matter from a Markdown file.

Expand Down Expand Up @@ -2632,8 +2743,14 @@ def handle_resource(

if is_remote:
# Download from URL
content = fetch_url_with_auth(resolved_path, auth_param=auth_param)
destination.write_text(content, encoding='utf-8')
if is_binary_file(resolved_path):
# Binary file - fetch as bytes and write bytes
content_bytes = fetch_url_bytes_with_auth(resolved_path, auth_param=auth_param)
destination.write_bytes(content_bytes)
else:
# Text file - fetch as text and write text
content = fetch_url_with_auth(resolved_path, auth_param=auth_param)
destination.write_text(content, encoding='utf-8')
success(f'Downloaded: {filename}')
else:
# Copy from local path
Expand Down Expand Up @@ -2888,8 +3005,14 @@ def process_skill(
raw_base = convert_to_raw_url(base)
source_url = f"{raw_base.rstrip('/')}/{file_path}"
try:
content = fetch_url_with_auth(source_url, auth_param=auth_param)
destination.write_text(content, encoding='utf-8')
if is_binary_file(file_path):
# Binary file - fetch as bytes and write bytes
content_bytes = fetch_url_bytes_with_auth(source_url, auth_param=auth_param)
destination.write_bytes(content_bytes)
else:
# Text file - fetch as text and write text
content = fetch_url_with_auth(source_url, auth_param=auth_param)
destination.write_text(content, encoding='utf-8')
success(f' Downloaded: {file_path}')
success_count += 1
except Exception as e:
Expand Down
Loading
Loading