@@ -61,6 +61,27 @@ public readonly bool IsAmazonECRRegistry
61
61
}
62
62
}
63
63
64
+ /// <summary>
65
+ /// Check to see if the registry is for Google Artifact Registry.
66
+ /// </summary>
67
+ /// <remarks>
68
+ /// Google Artifact Registry locations (one for each availability zone) are of the form "ZONE-docker.pkg.dev".
69
+ /// </remarks>
70
+ public readonly bool IsGoogleArtifactRegistry {
71
+ get => RegistryName . EndsWith ( "-docker.pkg.dev" ) ;
72
+ }
73
+
74
+ /// <summary>
75
+ /// Google Artifact Registry doesn't support chunked upload, but we want the capability check to be agnostic to the target.
76
+ /// </summary>
77
+ private readonly bool SupportsChunkedUpload => ! IsGoogleArtifactRegistry ;
78
+
79
+ /// <summary>
80
+ /// Pushing to ECR uses a much larger chunk size. To avoid getting too many socket disconnects trying to do too many
81
+ /// parallel uploads be more conservative and upload one layer at a time.
82
+ /// </summary>
83
+ private readonly bool SupportsParallelUploads => ! IsAmazonECRRegistry ;
84
+
64
85
public async Task < Image > GetImageManifest ( string name , string reference )
65
86
{
66
87
HttpClient client = GetClient ( ) ;
@@ -143,42 +164,10 @@ public async Task Push(Layer layer, string name, Action<string> logProgressMessa
143
164
}
144
165
}
145
166
146
- private readonly async Task UploadBlob ( string name , string digest , Stream contents )
147
- {
148
- HttpClient client = GetClient ( ) ;
149
-
150
- if ( await BlobAlreadyUploaded ( name , digest , client ) )
151
- {
152
- // Already there!
153
- return ;
154
- }
155
-
156
- Uri pushUri = new Uri ( BaseUri , $ "/v2/{ name } /blobs/uploads/") ;
157
- HttpResponseMessage pushResponse = await client . PostAsync ( pushUri , content : null ) ;
158
-
159
- if ( pushResponse . StatusCode != HttpStatusCode . Accepted )
160
- {
161
- string errorMessage = $ "Failed to upload blob to { pushUri } ; received { pushResponse . StatusCode } with detail { await pushResponse . Content . ReadAsStringAsync ( ) } ";
162
- throw new ApplicationException ( errorMessage ) ;
163
- }
164
-
165
- UriBuilder x ;
166
- if ( pushResponse . Headers . Location is { IsAbsoluteUri : true } )
167
- {
168
- x = new UriBuilder ( pushResponse . Headers . Location ) ;
169
- }
170
- else
171
- {
172
- // if we don't trim the BaseUri and relative Uri of slashes, you can get invalid urls.
173
- // Uri constructor does this on our behalf.
174
- x = new UriBuilder ( new Uri ( BaseUri , pushResponse . Headers . Location ? . OriginalString ?? "" ) ) ;
175
- }
176
-
177
- Uri patchUri = x . Uri ;
178
-
179
- x . Query += $ "&digest={ Uri . EscapeDataString ( digest ) } ";
180
-
181
- Uri putUri = x . Uri ;
167
+ private readonly async Task < UriBuilder > UploadBlobChunked ( string name , string digest , Stream contents , HttpClient client , UriBuilder uploadUri ) {
168
+ Uri patchUri = uploadUri . Uri ;
169
+ var localUploadUri = new UriBuilder ( uploadUri . Uri ) ;
170
+ localUploadUri . Query += $ "&digest={ Uri . EscapeDataString ( digest ) } ";
182
171
183
172
// TODO: this chunking is super tiny and probably not necessary; what does the docker client do
184
173
// and can we be smarter?
@@ -209,27 +198,66 @@ private readonly async Task UploadBlob(string name, string digest, Stream conten
209
198
throw new ApplicationException ( errorMessage ) ;
210
199
}
211
200
212
- if ( patchResponse . Headers . Location is { IsAbsoluteUri : true } )
213
- {
214
- x = new UriBuilder ( patchResponse . Headers . Location ) ;
215
- }
216
- else
217
- {
218
- // if we don't trim the BaseUri and relative Uri of slashes, you can get invalid urls.
219
- // Uri constructor does this on our behalf.
220
- x = new UriBuilder ( new Uri ( BaseUri , patchResponse . Headers . Location ? . OriginalString ?? "" ) ) ;
221
- }
201
+ localUploadUri = GetNextLocation ( patchResponse ) ;
222
202
223
- patchUri = x . Uri ;
203
+ patchUri = localUploadUri . Uri ;
224
204
225
205
chunkCount += 1 ;
226
206
chunkStart += bytesRead ;
227
207
}
208
+ return new UriBuilder ( patchUri ) ;
209
+ }
210
+
211
+ private readonly UriBuilder GetNextLocation ( HttpResponseMessage response ) {
212
+ if ( response . Headers . Location is { IsAbsoluteUri : true } )
213
+ {
214
+ return new UriBuilder ( response . Headers . Location ) ;
215
+ }
216
+ else
217
+ {
218
+ // if we don't trim the BaseUri and relative Uri of slashes, you can get invalid urls.
219
+ // Uri constructor does this on our behalf.
220
+ return new UriBuilder ( new Uri ( BaseUri , response . Headers . Location ? . OriginalString ?? "" ) ) ;
221
+ }
222
+ }
223
+
224
+ private readonly async Task < UriBuilder > UploadBlobWhole ( string name , string digest , Stream contents , HttpClient client , UriBuilder uploadUri ) {
225
+ StreamContent content = new StreamContent ( contents ) ;
226
+ content . Headers . ContentType = new MediaTypeHeaderValue ( "application/octet-stream" ) ;
227
+ content . Headers . ContentLength = contents . Length ;
228
+ HttpResponseMessage patchResponse = await client . PatchAsync ( uploadUri . Uri , content ) ;
229
+ if ( patchResponse . StatusCode != HttpStatusCode . Accepted )
230
+ {
231
+ string errorMessage = $ "Failed to upload to { uploadUri } ; received { patchResponse . StatusCode } with detail { await patchResponse . Content . ReadAsStringAsync ( ) } ";
232
+ throw new ApplicationException ( errorMessage ) ;
233
+ }
234
+ return GetNextLocation ( patchResponse ) ;
235
+ }
236
+
237
+ private readonly async Task < UriBuilder > StartUploadSession ( string name , string digest , HttpClient client ) {
238
+ Uri startUploadUri = new Uri ( BaseUri , $ "/v2/{ name } /blobs/uploads/") ;
239
+
240
+ HttpResponseMessage pushResponse = await client . PostAsync ( startUploadUri , content : null ) ;
241
+
242
+ if ( pushResponse . StatusCode != HttpStatusCode . Accepted )
243
+ {
244
+ string errorMessage = $ "Failed to upload blob to { startUploadUri } ; received { pushResponse . StatusCode } with detail { await pushResponse . Content . ReadAsStringAsync ( ) } ";
245
+ throw new ApplicationException ( errorMessage ) ;
246
+ }
247
+
248
+ return GetNextLocation ( pushResponse ) ;
249
+ }
250
+
251
+ private readonly async Task < UriBuilder > UploadBlobContents ( string name , string digest , Stream contents , HttpClient client , UriBuilder uploadUri ) {
252
+ if ( SupportsChunkedUpload ) return await UploadBlobChunked ( name , digest , contents , client , uploadUri ) ;
253
+ else return await UploadBlobWhole ( name , digest , contents , client , uploadUri ) ;
254
+ }
228
255
256
+ private readonly async Task FinishUploadSession ( string digest , HttpClient client , UriBuilder uploadUri ) {
229
257
// PUT with digest to finalize
230
- x . Query += $ "&digest={ Uri . EscapeDataString ( digest ) } ";
258
+ uploadUri . Query += $ "&digest={ Uri . EscapeDataString ( digest ) } ";
231
259
232
- putUri = x . Uri ;
260
+ var putUri = uploadUri . Uri ;
233
261
234
262
HttpResponseMessage finalizeResponse = await client . PutAsync ( putUri , content : null ) ;
235
263
@@ -240,6 +268,26 @@ private readonly async Task UploadBlob(string name, string digest, Stream conten
240
268
}
241
269
}
242
270
271
+ private readonly async Task UploadBlob ( string name , string digest , Stream contents )
272
+ {
273
+ HttpClient client = GetClient ( ) ;
274
+
275
+ if ( await BlobAlreadyUploaded ( name , digest , client ) )
276
+ {
277
+ // Already there!
278
+ return ;
279
+ }
280
+
281
+ // Three steps to this process:
282
+ // * start an upload session
283
+ var uploadUri = await StartUploadSession ( name , digest , client ) ;
284
+ // * upload the blob
285
+ var finalChunkUri = await UploadBlobContents ( name , digest , contents , client , uploadUri ) ;
286
+ // * finish the upload session
287
+ await FinishUploadSession ( digest , client , finalChunkUri ) ;
288
+
289
+ }
290
+
243
291
private readonly async Task < bool > BlobAlreadyUploaded ( string name , string digest , HttpClient client )
244
292
{
245
293
HttpResponseMessage response = await client . SendAsync ( new HttpRequestMessage ( HttpMethod . Head , new Uri ( BaseUri , $ "/v2/{ name } /blobs/{ digest } ") ) ) ;
@@ -320,19 +368,17 @@ public async Task Push(Image x, string name, string? tag, string baseName, Actio
320
368
}
321
369
} ;
322
370
323
- // Pushing to ECR uses a much larger chunk size. To avoid getting too many socket disconnects trying to do too many
324
- // parallel uploads be more conservative and upload one layer at a time.
325
- if ( IsAmazonECRRegistry )
371
+ if ( SupportsParallelUploads )
372
+ {
373
+ await Task . WhenAll ( x . LayerDescriptors . Select ( descriptor => uploadLayerFunc ( descriptor ) ) ) ;
374
+ }
375
+ else
326
376
{
327
377
foreach ( var descriptor in x . LayerDescriptors )
328
378
{
329
379
await uploadLayerFunc ( descriptor ) ;
330
380
}
331
381
}
332
- else
333
- {
334
- await Task . WhenAll ( x . LayerDescriptors . Select ( descriptor => uploadLayerFunc ( descriptor ) ) ) ;
335
- }
336
382
337
383
using ( MemoryStream stringStream = new MemoryStream ( Encoding . UTF8 . GetBytes ( x . config . ToJsonString ( ) ) ) )
338
384
{
0 commit comments