|
1 | | -from pyforgejo import PyforgejoApi |
2 | 1 | from interface_wrapper import ( |
3 | 2 | logging, |
4 | | - datetime, |
5 | 3 | IRepositoryAPI, |
6 | 4 | Repository, |
7 | 5 | Commit, |
|
14 | 12 | Comment, |
15 | 13 | Invite |
16 | 14 | ) |
| 15 | +import base64 |
17 | 16 |
|
18 | 17 | class ForgejoRepoAPI(IRepositoryAPI): |
19 | 18 | def __init__(self, client): |
@@ -132,13 +131,82 @@ def get_pull_requests(self, repo: Repository) -> list[PullRequest]: |
132 | 131 | return [] |
133 | 132 |
|
134 | 133 | def get_branches(self, repo: Repository) -> list[Branch]: |
135 | | - return [] |
| 134 | + try: |
| 135 | + branches = self.client.repository.repo_list_branches(repo.owner.login, repo.name) |
| 136 | + result = [] |
| 137 | + |
| 138 | + for branch in branches: |
| 139 | + commit = branch.commit |
| 140 | + |
| 141 | + author = commit.author |
| 142 | + contributor = Contributor( |
| 143 | + username=author.username if author else "unknown", |
| 144 | + email=author.email if author and author.email else "", |
| 145 | + ) |
| 146 | + |
| 147 | + commit_details = self.client.repository.repo_get_single_commit(repo.owner.login, repo.name, commit.id) |
| 148 | + files = [file.filename for file in getattr(commit_details, "files", [])] |
| 149 | + |
| 150 | + commit_obj = Commit( |
| 151 | + _id=commit.id, |
| 152 | + message=commit.message, |
| 153 | + author=contributor, |
| 154 | + date=commit.timestamp, |
| 155 | + files=files, |
| 156 | + ) |
| 157 | + |
| 158 | + result.append(Branch(name=branch.name, last_commit=commit_obj)) |
| 159 | + |
| 160 | + return result |
| 161 | + |
| 162 | + except Exception as e: |
| 163 | + logging.error(f"Failed to get branches from Forgejo for repo {repo.name}: {e}") |
| 164 | + return [] |
136 | 165 |
|
137 | 166 | def get_wiki_pages(self, repo: Repository) -> list[WikiPage]: |
138 | | - return [] |
| 167 | + try: |
| 168 | + pages = self.client.repository.repo_get_wiki_pages(repo.owner.login, repo.name) |
| 169 | + result = [] |
| 170 | + |
| 171 | + for page in pages: |
| 172 | + page_details = self.client.repository.repo_get_wiki_page(repo.owner.login, repo.name, page.title) |
| 173 | + |
| 174 | + wiki_page = WikiPage( |
| 175 | + title=page_details.title, |
| 176 | + content=base64.b64decode(page_details.content_base_64).decode('utf-8') |
| 177 | + ) |
| 178 | + result.append(wiki_page) |
| 179 | + |
| 180 | + return result |
| 181 | + |
| 182 | + except Exception as e: |
| 183 | + logging.error(f"Failed to get wiki pages from Forgejo for repo {repo.name}: {e}") |
| 184 | + return [] |
139 | 185 |
|
140 | 186 | def get_forks(self, repo: Repository) -> list[Repository]: |
141 | | - return [] |
| 187 | + try: |
| 188 | + forks = self.client.repository.list_forks(repo.owner.login, repo.name) |
| 189 | + result = [] |
| 190 | + |
| 191 | + for fork in forks: |
| 192 | + default_branch = Branch(name=fork.default_branch,last_commit=None) |
| 193 | + owner = fork.owner |
| 194 | + |
| 195 | + result.append( |
| 196 | + Repository( |
| 197 | + _id=fork.full_name, |
| 198 | + name=fork.name, |
| 199 | + url=fork.html_url, |
| 200 | + default_branch=default_branch, |
| 201 | + owner=owner |
| 202 | + |
| 203 | + ) |
| 204 | + ) |
| 205 | + return result |
| 206 | + |
| 207 | + except Exception as e: |
| 208 | + logging.error(f"Failed to get forks from Forgejo for repo {repo.name}: {e}") |
| 209 | + return [] |
142 | 210 |
|
143 | 211 | def get_comments(self, obj) -> list[Comment]: |
144 | 212 | return [] |
|
0 commit comments