Skip to content

Commit 1589275

Browse files
committed
refactor to be more capability-based on blob upload
1 parent 85ce19f commit 1589275

File tree

1 file changed

+95
-48
lines changed

1 file changed

+95
-48
lines changed

Microsoft.NET.Build.Containers/Registry.cs

Lines changed: 95 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,27 @@ public readonly bool IsAmazonECRRegistry
6161
}
6262
}
6363

64+
/// <summary>
65+
/// Check to see if the registry is for Google Artifact Registry.
66+
/// </summary>
67+
/// <remarks>
68+
/// Google Artifact Registry locations (one for each availability zone) are of the form "ZONE-docker.pkg.dev".
69+
/// </remarks>
70+
public readonly bool IsGoogleArtifactRegistry {
71+
get => RegistryName.EndsWith("-docker.pkg.dev");
72+
}
73+
74+
/// <summary>
75+
/// Google Artifact Registry doesn't support chunked upload, but we want the capability check to be agnostic to the target.
76+
/// </summary>
77+
private readonly bool SupportsChunkedUpload => !IsGoogleArtifactRegistry;
78+
79+
/// <summary>
80+
/// Pushing to ECR uses a much larger chunk size. To avoid getting too many socket disconnects trying to do too many
81+
/// parallel uploads be more conservative and upload one layer at a time.
82+
/// </summary>
83+
private readonly bool SupportsParallelUploads => !IsAmazonECRRegistry;
84+
6485
public async Task<Image> GetImageManifest(string name, string reference)
6586
{
6687
HttpClient client = GetClient();
@@ -143,42 +164,10 @@ public async Task Push(Layer layer, string name, Action<string> logProgressMessa
143164
}
144165
}
145166

146-
private readonly async Task UploadBlob(string name, string digest, Stream contents)
147-
{
148-
HttpClient client = GetClient();
149-
150-
if (await BlobAlreadyUploaded(name, digest, client))
151-
{
152-
// Already there!
153-
return;
154-
}
155-
156-
Uri pushUri = new Uri(BaseUri, $"/v2/{name}/blobs/uploads/");
157-
HttpResponseMessage pushResponse = await client.PostAsync(pushUri, content: null);
158-
159-
if (pushResponse.StatusCode != HttpStatusCode.Accepted)
160-
{
161-
string errorMessage = $"Failed to upload blob to {pushUri}; received {pushResponse.StatusCode} with detail {await pushResponse.Content.ReadAsStringAsync()}";
162-
throw new ApplicationException(errorMessage);
163-
}
164-
165-
UriBuilder x;
166-
if (pushResponse.Headers.Location is {IsAbsoluteUri: true })
167-
{
168-
x = new UriBuilder(pushResponse.Headers.Location);
169-
}
170-
else
171-
{
172-
// if we don't trim the BaseUri and relative Uri of slashes, you can get invalid urls.
173-
// Uri constructor does this on our behalf.
174-
x = new UriBuilder(new Uri(BaseUri, pushResponse.Headers.Location?.OriginalString ?? ""));
175-
}
176-
177-
Uri patchUri = x.Uri;
178-
179-
x.Query += $"&digest={Uri.EscapeDataString(digest)}";
180-
181-
Uri putUri = x.Uri;
167+
private readonly async Task<UriBuilder> UploadBlobChunked(string name, string digest, Stream contents, HttpClient client, UriBuilder uploadUri) {
168+
Uri patchUri = uploadUri.Uri;
169+
var localUploadUri = new UriBuilder(uploadUri.Uri);
170+
localUploadUri.Query += $"&digest={Uri.EscapeDataString(digest)}";
182171

183172
// TODO: this chunking is super tiny and probably not necessary; what does the docker client do
184173
// and can we be smarter?
@@ -211,25 +200,65 @@ private readonly async Task UploadBlob(string name, string digest, Stream conten
211200

212201
if (patchResponse.Headers.Location is { IsAbsoluteUri: true })
213202
{
214-
x = new UriBuilder(patchResponse.Headers.Location);
203+
localUploadUri = new UriBuilder(patchResponse.Headers.Location);
215204
}
216205
else
217206
{
218207
// if we don't trim the BaseUri and relative Uri of slashes, you can get invalid urls.
219208
// Uri constructor does this on our behalf.
220-
x = new UriBuilder(new Uri(BaseUri, patchResponse.Headers.Location?.OriginalString ?? ""));
209+
localUploadUri = new UriBuilder(new Uri(BaseUri, patchResponse.Headers.Location?.OriginalString ?? ""));
221210
}
222211

223-
patchUri = x.Uri;
212+
patchUri = localUploadUri.Uri;
224213

225214
chunkCount += 1;
226215
chunkStart += bytesRead;
227216
}
217+
return new UriBuilder(patchUri);
218+
}
228219

220+
private readonly async Task<UriBuilder> UploadBlobWhole(string name, string digest, Stream contents, HttpClient client, UriBuilder uploadUri) {
221+
StreamContent content = new StreamContent(contents);
222+
content.Headers.ContentType = new MediaTypeHeaderValue("application/octet-stream");
223+
content.Headers.ContentLength = contents.Length;
224+
HttpResponseMessage putResponse = await client.PutAsync(uploadUri.Uri, content);
225+
putResponse.EnsureSuccessStatusCode();
226+
return new UriBuilder(putResponse.Headers.Location ?? uploadUri.Uri);
227+
}
228+
229+
private readonly async Task<UriBuilder> StartUploadSession(string name, string digest, HttpClient client) {
230+
Uri startUploadUri = new Uri(BaseUri, $"/v2/{name}/blobs/uploads/");
231+
232+
HttpResponseMessage pushResponse = await client.PostAsync(startUploadUri, content: null);
233+
234+
if (pushResponse.StatusCode != HttpStatusCode.Accepted)
235+
{
236+
string errorMessage = $"Failed to upload blob to {startUploadUri}; received {pushResponse.StatusCode} with detail {await pushResponse.Content.ReadAsStringAsync()}";
237+
throw new ApplicationException(errorMessage);
238+
}
239+
240+
if (pushResponse.Headers.Location is {IsAbsoluteUri: true })
241+
{
242+
return new UriBuilder(pushResponse.Headers.Location);
243+
}
244+
else
245+
{
246+
// if we don't trim the BaseUri and relative Uri of slashes, you can get invalid urls.
247+
// Uri constructor does this on our behalf.
248+
return new UriBuilder(new Uri(BaseUri, pushResponse.Headers.Location?.OriginalString ?? ""));
249+
}
250+
}
251+
252+
private readonly async Task<UriBuilder> UploadBlobContents(string name, string digest, Stream contents, HttpClient client, UriBuilder uploadUri) {
253+
if (SupportsChunkedUpload) return await UploadBlobWhole(name, digest, contents, client, uploadUri);
254+
else return await UploadBlobChunked(name, digest, contents, client, uploadUri);
255+
}
256+
257+
private readonly async Task FinishUploadSession(string digest, HttpClient client, UriBuilder uploadUri) {
229258
// PUT with digest to finalize
230-
x.Query += $"&digest={Uri.EscapeDataString(digest)}";
259+
uploadUri.Query += $"&digest={Uri.EscapeDataString(digest)}";
231260

232-
putUri = x.Uri;
261+
var putUri = uploadUri.Uri;
233262

234263
HttpResponseMessage finalizeResponse = await client.PutAsync(putUri, content: null);
235264

@@ -240,6 +269,26 @@ private readonly async Task UploadBlob(string name, string digest, Stream conten
240269
}
241270
}
242271

272+
private readonly async Task UploadBlob(string name, string digest, Stream contents)
273+
{
274+
HttpClient client = GetClient();
275+
276+
if (await BlobAlreadyUploaded(name, digest, client))
277+
{
278+
// Already there!
279+
return;
280+
}
281+
282+
// Three steps to this process:
283+
// * start an upload session
284+
var uploadUri = await StartUploadSession(name, digest, client);
285+
// * upload the blob
286+
var finalChunkUri = await UploadBlobContents(name, digest, contents, client, uploadUri);
287+
// * finish the upload session
288+
await FinishUploadSession(digest, client, finalChunkUri);
289+
290+
}
291+
243292
private readonly async Task<bool> BlobAlreadyUploaded(string name, string digest, HttpClient client)
244293
{
245294
HttpResponseMessage response = await client.SendAsync(new HttpRequestMessage(HttpMethod.Head, new Uri(BaseUri, $"/v2/{name}/blobs/{digest}")));
@@ -320,19 +369,17 @@ public async Task Push(Image x, string name, string? tag, string baseName, Actio
320369
}
321370
};
322371

323-
// Pushing to ECR uses a much larger chunk size. To avoid getting too many socket disconnects trying to do too many
324-
// parallel uploads be more conservative and upload one layer at a time.
325-
if(IsAmazonECRRegistry)
372+
if (SupportsParallelUploads)
373+
{
374+
await Task.WhenAll(x.LayerDescriptors.Select(descriptor => uploadLayerFunc(descriptor)));
375+
}
376+
else
326377
{
327378
foreach(var descriptor in x.LayerDescriptors)
328379
{
329380
await uploadLayerFunc(descriptor);
330381
}
331382
}
332-
else
333-
{
334-
await Task.WhenAll(x.LayerDescriptors.Select(descriptor => uploadLayerFunc(descriptor)));
335-
}
336383

337384
using (MemoryStream stringStream = new MemoryStream(Encoding.UTF8.GetBytes(x.config.ToJsonString())))
338385
{

0 commit comments

Comments
 (0)