|
6 | 6 | from typing import Tuple |
7 | 7 | from urllib.parse import urlparse, ParseResult |
8 | 8 |
|
| 9 | +import requests |
| 10 | + |
9 | 11 | from .structured_data import load_dict, dump_dict |
10 | 12 | from .util import die, mkdir_p, rsync, run, rm_rf |
11 | 13 | from .example import Example |
@@ -39,25 +41,54 @@ def __init__(self, filepath: str = None, root: dict = None, args: dict = None): |
39 | 41 | logging.debug("EXITING: ") |
40 | 42 |
|
41 | 43 | def _git_clone(self, repo) -> str: |
42 | | - logging.debug("ENTERING: ") |
| 44 | + logging.debug("ENTERING: ") |
| 45 | + |
43 | 46 | git_uri = repo.get('git_uri') |
44 | 47 | private = repo.get('private', False) |
45 | 48 | uri, _, name, ext = parseUri(git_uri) |
46 | 49 | to = f'{self._root._tempdir}/{name}' |
| 50 | + |
47 | 51 | if uri.scheme == 'https' and ext in ['', '.git'] and self._repo_uri() != git_uri: |
48 | 52 | if not self._root._skip_clone and git_uri not in self._root._clones: |
49 | 53 | rm_rf(to) |
50 | 54 | mkdir_p(to) |
51 | | - logging.debug( |
52 | | - f'Cloning {private and "private" or "public"} {git_uri} to {to}') |
| 55 | + |
| 56 | + # Extract owner and repo name from git_uri |
| 57 | + path_parts = uri.path.strip('/').split('/') |
| 58 | + if len(path_parts) >= 2: |
| 59 | + owner, repo_name = path_parts[0], path_parts[1].replace('.git', '') |
| 60 | + |
| 61 | + # Get latest release tag from GitHub API |
| 62 | + api_url = f"https://api.github.com/repos/{owner}/{repo_name}/releases/latest" |
| 63 | + github_token = os.environ.get('PRIVATE_ACCESS_TOKEN') |
| 64 | + headers = {"Authorization": f"Bearer {github_token}"} if github_token else {} |
| 65 | + |
| 66 | + try: |
| 67 | + response = requests.get(api_url, headers=headers) |
| 68 | + response.raise_for_status() |
| 69 | + latest_tag = response.json()["tag_name"] |
| 70 | + logging.debug(f'Found latest release tag: {latest_tag}') |
| 71 | + use_latest_tag = True |
| 72 | + except Exception as e: |
| 73 | + logging.warning(f'Failed to get latest release tag: {str(e)}') |
| 74 | + use_latest_tag = False |
| 75 | + else: |
| 76 | + use_latest_tag = False |
| 77 | + |
| 78 | + logging.debug(f'Cloning {private and "private" or "public"} {git_uri} to {to}') |
53 | 79 | self._root._clones[git_uri] = True |
| 80 | + |
54 | 81 | if private: |
55 | 82 | pat = os.environ.get('PRIVATE_ACCESS_TOKEN') |
56 | 83 | if pat is None: |
57 | 84 | die('Private repos without a PRIVATE_ACCESS_TOKEN - aborting.') |
58 | 85 | git_uri = f'{uri.scheme}://{pat}@{uri.netloc}{uri.path}' |
59 | | - run(f'git clone {git_uri} {to}') |
60 | | - run(f'git fetch --all --tags', cwd=to) |
| 86 | + |
| 87 | + if use_latest_tag: |
| 88 | + run(f'git clone --depth 1 --branch {latest_tag} {git_uri} {to}') |
| 89 | + else: |
| 90 | + run(f'git clone {git_uri} {to}') |
| 91 | + run(f'git fetch --all --tags', cwd=to) |
61 | 92 | else: |
62 | 93 | logging.debug(f'Skipping clone {git_uri}') |
63 | 94 | logging.debug("EXITING: ") |
@@ -173,14 +204,44 @@ def _get_example_id_from_file(self, path): |
173 | 204 | logging.debug("EXITING: ") |
174 | 205 | return None |
175 | 206 |
|
| 207 | + def _get_default_branch(self, git_uri): |
| 208 | + """Get the default branch name for a GitHub repository.""" |
| 209 | + logging.debug("ENTERING: ") |
| 210 | + try: |
| 211 | + # Extract owner and repo name from git_uri |
| 212 | + from urllib.parse import urlparse |
| 213 | + uri = urlparse(git_uri) |
| 214 | + path_parts = uri.path.strip('/').split('/') |
| 215 | + if len(path_parts) >= 2: |
| 216 | + owner, repo_name = path_parts[0], path_parts[1].replace('.git', '') |
| 217 | + |
| 218 | + # Get repository info from GitHub API |
| 219 | + api_url = f"https://api.github.com/repos/{owner}/{repo_name}" |
| 220 | + github_token = os.environ.get('PRIVATE_ACCESS_TOKEN') |
| 221 | + headers = {"Authorization": f"Bearer {github_token}"} if github_token else {} |
| 222 | + |
| 223 | + response = requests.get(api_url, headers=headers) |
| 224 | + response.raise_for_status() |
| 225 | + default_branch = response.json()["default_branch"] |
| 226 | + logging.debug(f'Found default branch: {default_branch} for {git_uri}') |
| 227 | + logging.debug("EXITING: ") |
| 228 | + return default_branch |
| 229 | + except Exception as e: |
| 230 | + logging.warning(f'Failed to get default branch for {git_uri}: {str(e)}') |
| 231 | + |
| 232 | + # Fallback to 'main' if API call fails |
| 233 | + logging.debug("EXITING: ") |
| 234 | + return 'main' |
| 235 | + |
176 | 236 | def _copy_examples(self): |
177 | 237 | logging.debug("ENTERING: ") |
178 | 238 | if ex := self.get('examples'): |
179 | 239 | repo = self._git_clone(ex) |
180 | | - dev_branch = ex.get('dev_branch') |
181 | | - self._checkout(dev_branch, repo, ex) |
182 | 240 | path = ex.get('path', '') |
183 | 241 |
|
| 242 | + # Get the default branch for sourceUrl generation |
| 243 | + default_branch = self._get_default_branch(ex.get('git_uri')) |
| 244 | + |
184 | 245 | src = f'{repo}/{path}/' |
185 | 246 | dst = f'{self._root._website.get("path")}/{self._root._website.get("examples_path")}' |
186 | 247 |
|
@@ -210,7 +271,7 @@ def _copy_examples(self): |
210 | 271 | example_metadata['hidden'] = e.hidden |
211 | 272 | example_metadata['named_steps'] = e.named_steps |
212 | 273 | example_metadata['sourceUrl'] = ( |
213 | | - f'{ex["git_uri"]}/tree/{ex["dev_branch"]}/{ex["path"]}/{os.path.basename(f)}' |
| 274 | + f'{ex["git_uri"]}/tree/{default_branch}/{ex["path"]}/{os.path.basename(f)}' |
214 | 275 | ) |
215 | 276 | examples = self._root._examples |
216 | 277 | if example_id not in examples: |
|
0 commit comments