@@ -42,24 +42,32 @@ def update_headers(self, d: Dict[str, str]):
42
42
opts = options ()
43
43
sources = [p .resolve () for p in opts .sources ]
44
44
source_dir = pathlib .Path (os .path .commonpath (src .parent for src in sources ))
45
- source_dir = subprocess .check_output (["git" , "rev-parse" , "--show-toplevel" ], cwd = source_dir , text = True ).strip ()
45
+ source_dir = subprocess .check_output (
46
+ ["git" , "rev-parse" , "--show-toplevel" ], cwd = source_dir , text = True
47
+ ).strip ()
46
48
47
49
48
50
def get_env (s , sep = "=" ):
49
51
ret = {}
50
- for m in re .finditer (fr' (.*?){ sep } (.*)' , s , re .M ):
52
+ for m in re .finditer (rf" (.*?){ sep } (.*)" , s , re .M ):
51
53
ret .setdefault (* m .groups ())
52
54
return ret
53
55
54
56
55
57
def git (* args , ** kwargs ):
56
- return subprocess .run (("git" ,) + args , stdout = subprocess .PIPE , text = True , cwd = source_dir , ** kwargs ).stdout .strip ()
58
+ return subprocess .run (
59
+ ("git" ,) + args , stdout = subprocess .PIPE , text = True , cwd = source_dir , ** kwargs
60
+ ).stdout .strip ()
57
61
58
62
59
63
def get_endpoint ():
60
- lfs_env_items = iter (get_env (subprocess .check_output (["git" , "lfs" , "env" ], text = True , cwd = source_dir )).items ())
61
- endpoint = next (v for k , v in lfs_env_items if k .startswith ('Endpoint' ))
62
- endpoint , _ , _ = endpoint .partition (' ' )
64
+ lfs_env_items = iter (
65
+ get_env (
66
+ subprocess .check_output (["git" , "lfs" , "env" ], text = True , cwd = source_dir )
67
+ ).items ()
68
+ )
69
+ endpoint = next (v for k , v in lfs_env_items if k .startswith ("Endpoint" ))
70
+ endpoint , _ , _ = endpoint .partition (" " )
63
71
# only take the ssh endpoint if it follows directly after the first endpoint we found
64
72
# in a situation like
65
73
# Endpoint (a)=...
@@ -69,21 +77,32 @@ def get_endpoint():
69
77
following_key , following_value = next (lfs_env_items , (None , None ))
70
78
ssh_endpoint = following_value if following_key == " SSH" else None
71
79
72
- endpoint = Endpoint (endpoint , {
73
- "Content-Type" : "application/vnd.git-lfs+json" ,
74
- "Accept" : "application/vnd.git-lfs+json" ,
75
- })
80
+ endpoint = Endpoint (
81
+ endpoint ,
82
+ {
83
+ "Content-Type" : "application/vnd.git-lfs+json" ,
84
+ "Accept" : "application/vnd.git-lfs+json" ,
85
+ },
86
+ )
76
87
if ssh_endpoint :
77
88
# see https://github.com/git-lfs/git-lfs/blob/main/docs/api/authentication.md
78
89
server , _ , path = ssh_endpoint .partition (":" )
79
- ssh_command = shutil .which (os .environ .get ("GIT_SSH" , os .environ .get ("GIT_SSH_COMMAND" , "ssh" )))
90
+ ssh_command = shutil .which (
91
+ os .environ .get ("GIT_SSH" , os .environ .get ("GIT_SSH_COMMAND" , "ssh" ))
92
+ )
80
93
assert ssh_command , "no ssh command found"
81
- resp = json .loads (subprocess .check_output ([ssh_command ,
82
- "-oStrictHostKeyChecking=accept-new" ,
83
- server ,
84
- "git-lfs-authenticate" ,
85
- path ,
86
- "download" ]))
94
+ resp = json .loads (
95
+ subprocess .check_output (
96
+ [
97
+ ssh_command ,
98
+ "-oStrictHostKeyChecking=accept-new" ,
99
+ server ,
100
+ "git-lfs-authenticate" ,
101
+ path ,
102
+ "download" ,
103
+ ]
104
+ )
105
+ )
87
106
endpoint .href = resp .get ("href" , endpoint )
88
107
endpoint .update_headers (resp .get ("header" , {}))
89
108
url = urlparse (endpoint .href )
@@ -96,10 +115,18 @@ def get_endpoint():
96
115
if "Authorization" not in endpoint .headers :
97
116
# last chance: use git credentials (possibly backed by a credential helper like the one installed by gh)
98
117
# see https://git-scm.com/docs/git-credential
99
- credentials = get_env (git ("credential" , "fill" , check = True ,
100
- # drop leading / from url.path
101
- input = f"protocol={ url .scheme } \n host={ url .netloc } \n path={ url .path [1 :]} \n " ))
102
- auth = base64 .b64encode (f'{ credentials ["username" ]} :{ credentials ["password" ]} ' .encode ()).decode ('ascii' )
118
+ credentials = get_env (
119
+ git (
120
+ "credential" ,
121
+ "fill" ,
122
+ check = True ,
123
+ # drop leading / from url.path
124
+ input = f"protocol={ url .scheme } \n host={ url .netloc } \n path={ url .path [1 :]} \n " ,
125
+ )
126
+ )
127
+ auth = base64 .b64encode (
128
+ f'{ credentials ["username" ]} :{ credentials ["password" ]} ' .encode ()
129
+ ).decode ("ascii" )
103
130
endpoint .headers ["Authorization" ] = f"Basic { auth } "
104
131
return endpoint
105
132
@@ -129,23 +156,25 @@ def get_locations(objects):
129
156
)
130
157
with urllib .request .urlopen (req ) as resp :
131
158
data = json .load (resp )
132
- assert len (data ["objects" ]) == len (indexes ), f"received { len (data )} objects, expected { len (indexes )} "
159
+ assert len (data ["objects" ]) == len (
160
+ indexes
161
+ ), f"received { len (data )} objects, expected { len (indexes )} "
133
162
for i , resp in zip (indexes , data ["objects" ]):
134
163
ret [i ] = f'{ resp ["oid" ]} { resp ["actions" ]["download" ]["href" ]} '
135
164
return ret
136
165
137
166
138
167
def get_lfs_object (path ):
139
- with open (path , 'rb' ) as fileobj :
168
+ with open (path , "rb" ) as fileobj :
140
169
lfs_header = "version https://git-lfs.github.com/spec" .encode ()
141
170
actual_header = fileobj .read (len (lfs_header ))
142
171
sha256 = size = None
143
172
if lfs_header != actual_header :
144
173
return None
145
- data = get_env (fileobj .read ().decode (' ascii' ), sep = ' ' )
146
- assert data [' oid' ].startswith (' sha256:' ), f"unknown oid type: { data ['oid' ]} "
147
- _ , _ , sha256 = data [' oid' ].partition (':' )
148
- size = int (data [' size' ])
174
+ data = get_env (fileobj .read ().decode (" ascii" ), sep = " " )
175
+ assert data [" oid" ].startswith (" sha256:" ), f"unknown oid type: { data ['oid' ]} "
176
+ _ , _ , sha256 = data [" oid" ].partition (":" )
177
+ size = int (data [" size" ])
149
178
return {"oid" : sha256 , "size" : size }
150
179
151
180
0 commit comments