diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f473bdf..39b507b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -37,6 +37,7 @@ jobs: run: | az group create -l southcentralus -n "AzureBackupRG-azstorage-${{ matrix.os }}-${{ matrix.version }}-${{ github.run_id }}" az storage account create --min-tls-version TLS1_2 -n "s${{ steps.uuid.outputs.uuid }}" -g "AzureBackupRG-azstorage-${{ matrix.os }}-${{ matrix.version }}-${{ github.run_id }}" -l southcentralus + az storage account create --min-tls-version TLS1_2 -n "s${{ steps.uuid.outputs.uuid }}2" -g "AzureBackupRG-azstorage-${{ matrix.os }}-${{ matrix.version }}-${{ github.run_id }}" -l southcentralus - uses: julia-actions/cache@v1 - uses: julia-actions/julia-buildpkg@v1 - uses: julia-actions/julia-runtest@v1 @@ -45,6 +46,7 @@ jobs: CLIENT_SECRET: ${{ secrets.CLIENT_SECRET }} TENANT: ${{ secrets.TENANT_ID }} STORAGE_ACCOUNT: "s${{ steps.uuid.outputs.uuid }}" + STORAGE_ACCOUNT_TOO: "s${{ steps.uuid.outputs.uuid }}2" - uses: julia-actions/julia-processcoverage@v1 - uses: codecov/codecov-action@v5 with: diff --git a/Project.toml b/Project.toml index ab76461..ec5f3b9 100644 --- a/Project.toml +++ b/Project.toml @@ -1,6 +1,6 @@ name = "AzStorage" uuid = "c6697862-1611-5eae-9ef8-48803c85c8d6" -version = "2.7.2" +version = "2.7.3" [deps] AbstractStorage = "14dbef02-f468-5f15-853e-5ec8dee7b899" @@ -12,6 +12,7 @@ DelimitedFiles = "8bb1440f-4735-579b-a4ab-409b98df4dab" HTTP = "cd3eb016-35fb-5094-929b-558a96fad6f3" Printf = "de0858da-6303-5e67-8744-51eddeeeb8d7" ProgressMeter = "92933f4c-e287-5a05-a399-4b506db050ca" +SHA = "ea8e919c-243c-51af-8825-aaa63cd721ce" Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b" Sockets = "6462fe0b-24de-5631-8697-dd941f90decc" XML = "72c71f33-b9b6-44de-8c94-c961784809e2" @@ -20,8 +21,10 @@ XML = "72c71f33-b9b6-44de-8c94-c961784809e2" AbstractStorage = "^1.3" AzSessions = "2" AzStorage_jll = "0.9" +Base64 = "1" DelimitedFiles = "1" HTTP = "1" ProgressMeter = "1" +SHA = "0.7" XML = "0.3" julia = "^1.6" diff --git a/src/AzStorage.jl b/src/AzStorage.jl index 78b2469..cee703c 100644 --- a/src/AzStorage.jl +++ b/src/AzStorage.jl @@ -1,6 +1,6 @@ module AzStorage -using AbstractStorage, AzSessions, AzStorage_jll, Base64, Dates, DelimitedFiles, XML, HTTP, Printf, ProgressMeter, Serialization, Sockets +using AbstractStorage, AzSessions, AzStorage_jll, Base64, Dates, DelimitedFiles, XML, HTTP, Printf, ProgressMeter, SHA, Serialization, Sockets # https://docs.microsoft.com/en-us/rest/api/storageservices/common-rest-api-error-codes # https://learn.microsoft.com/en-us/azure/azure-resource-manager/management/request-limits-and-throttling @@ -854,9 +854,196 @@ function Base.cp(inc::AzContainer, inb::AbstractString, out::AbstractString; buf close(io) end -function Base.cp(inc::AzContainer, inb::AbstractString, outc::AzContainer, outb::AbstractString) - bytes = read!(inc, inb, Vector{UInt8}(undef, filesize(inc, inb))) - write(outc, outb, bytes) +#= +If the source and destination storage accounts for a blob copy are different, then the Azure storage API does not allow us +to use OAuth2/RBAC directly for the source blob. But, we can use a user delegation SAS token which is built in the following +two methods: 'generate_user_delegation_key' and 'get_user_delegation_sas'. +=# +function get_user_delegation_key(c::AzContainer; start=now(UTC), expiry=now(UTC)+Hour(1)) + start_str = Dates.format(start, "yyyy-mm-ddTHH:MM:SSZ") + expiry_str = Dates.format(expiry, "yyyy-mm-ddTHH:MM:SSZ") + + r = @retry c.nretry HTTP.request( + "POST", + "https://$(c.storageaccount).blob.core.windows.net/?restype=service&comp=userdelegationkey", + [ + "Authorization" => "Bearer $(token(c.session))", + "x-ms-version" => API_VERSION, + "Content-Type" => "application/xml" + ], + """ + + + $start_str + $expiry_str + + """; + retry = false, + verbose = c.verbose, + connect_timeout = c.connect_timeout, + readtimeout = c.read_timeout) + + b = XML.parse(String(r.body), LazyNode) + delegation_key = Dict{String,String}() + for child in children(b) + if tag(child) == "UserDelegationKey" + for grandchild in children(child) + if tag(grandchild) in ("SignedOid", "SignedTid", "SignedStart", "SignedExpiry", "SignedService", "SignedVersion", "Value") + delegation_key[string(tag(grandchild))] = value(first(children(grandchild))) + end + end + end + end + + delegation_key +end + +function generate_user_delegation_sas(c::AzContainer, b::AbstractString; permissions="r", start=now(UTC), expiry=now(UTC)+Hour(1)) + delegation_key = get_user_delegation_key(c; start, expiry) + + signedPermissions = permissions + signedStart = Dates.format(start, "yyyy-mm-ddTHH:MM:SSZ") + signedExpiry = Dates.format(expiry, "yyyy-mm-ddTHH:MM:SSZ") + canonicalizedResource = "/blob/$(c.storageaccount)/$(c.containername)/$(addprefix(c,b))" + signedKeyObjectId = delegation_key["SignedOid"] + signedKeyTenantId = delegation_key["SignedTid"] + signedKeyStart = delegation_key["SignedStart"] + signedKeyExpiry = delegation_key["SignedExpiry"] + signedKeyService = delegation_key["SignedService"] + signedKeyVersion = delegation_key["SignedVersion"] + signedAuthorizedUserObjectId = "" + signedUnauthorizedUserObjectId = "" + signedCorrelationId = "" + signedIP = "" + signedProtocol = "https" + signedVersion = API_VERSION + signedResource = "b" + signedSnapshotTime = "" + signedEncryptionScope = "" + rscc = "" + rscd = "" + rsce = "" + rscl = "" + rsct = "" + + string_to_sign = + signedPermissions * "\n" * + signedStart * "\n" * + signedExpiry * "\n" * + canonicalizedResource * "\n" * + signedKeyObjectId * "\n" * + signedKeyTenantId * "\n" * + signedKeyStart * "\n" * + signedKeyExpiry * "\n" * + signedKeyService * "\n" * + signedKeyVersion * "\n" * + signedAuthorizedUserObjectId * "\n" * + signedUnauthorizedUserObjectId * "\n" * + signedCorrelationId * "\n" * + "\n" * + "\n" * + signedIP * "\n" * + signedProtocol * "\n" * + signedVersion * "\n" * + signedResource * "\n" * + signedSnapshotTime * "\n" * + signedEncryptionScope * "\n" * + rscc * "\n" * + rscd * "\n" * + rsce * "\n" * + rscl * "\n" * + rsct + + # sign the string using the delegation key + key = base64decode(delegation_key["Value"]) + message = collect(codeunits(string_to_sign)) + signed_string = HTTP.escapeuri(base64encode(hmac_sha256(key, message))) + + # sas token + "sp=$signedPermissions&" * + "st=$signedStart&" * + "se=$signedExpiry&" * + "skoid=$signedKeyObjectId&" * + "sktid=$signedKeyTenantId&" * + "skt=$signedKeyStart&" * + "ske=$signedKeyExpiry&" * + "sks=$signedKeyService&" * + "skv=$signedKeyVersion&" * + (isempty(signedIP) ? "" : "sip=$signedIP&") * + "spr=$signedProtocol&" * + "sv=$signedVersion&" * + "sr=$signedResource&" * + "sig=$signed_string" +end + +function status(c::AzContainer, b::AbstractString) + r_status = @retry c.nretry HTTP.request( + "HEAD", + "https://$(c.storageaccount).blob.core.windows.net/$(c.containername)/$(addprefix(c,b))", + [ + "Authorization" => "Bearer $(token(c.session))", + "x-ms-version" => API_VERSION + ]; + retry = false, + verbose = c.verbose, + connect_timeout = c.connect_timeout, + readtimeout = c.read_timeout + ) + + copy_status = HTTP.header(r_status, "x-ms-copy-status") + copy_progress = HTTP.header(r_status, "x-ms-copy-progress") + copy_reason = HTTP.header(r_status, "x-ms-copy-status-description") + + Dict("status"=>copy_status, "progress"=>copy_progress, "reason"=>copy_reason) +end + +function Base.cp(inc::AzContainer, inb::AbstractString, outc::AzContainer, outb::AbstractString; showprogress=false, async=false) + source_url = "https://$(inc.storageaccount).blob.core.windows.net/$(inc.containername)/$(addprefix(inc,inb))" + + if inc.storageaccount != outc.storageaccount + sas = generate_user_delegation_sas(inc, inb; permissions="r", start=now(UTC), expiry=now(UTC)+Hour(1)) + source_url *= "?$sas" + end + + headers = [ + "Authorization" => "Bearer $(token(outc.session))", + "x-ms-version" => API_VERSION, + "x-ms-copy-source" => source_url + ] + + r_copy = @retry inc.nretry HTTP.request( + "PUT", + "https://$(outc.storageaccount).blob.core.windows.net/$(outc.containername)/$(addprefix(outc,outb))", + headers; + retry = false, + verbose = inc.verbose, + connect_timeout = inc.connect_timeout, + readtimeout = inc.read_timeout + ) + + if !async && r_copy.status == 202 + while true + local stat + try + stat = status(outc, outb) + catch + @warn "unable to get copy status for blob copy, retrying..." + stat = Dict("status"=>"unknown") + end + + if stat["status"] == "success" + break + elseif stat["status"] == "aborted" + error("blob copy aborted, dest=$(outc.storageaccount): $(outc.containername)/$(addprefix(outc,outb)), reason=$(stat["reason"])") + break + elseif stat["status"] == "pending" && showprogress + print("copy progress: $(stat["progress"])\r") + end + sleep(1) + end + end + + nothing end """ @@ -1193,19 +1380,8 @@ function Base.cp(src::AzContainer, dst::AzContainer) mkpath(dst) blobs = readdir(src) - for blob in blobs - @retry dst.nretry HTTP.request( - "PUT", - "https://$(dst.storageaccount).blob.core.windows.net/$(dst.containername)/$(addprefix(dst,blob))", - [ - "Authorization" => "Bearer $(token(dst.session))", - "x-ms-version" => API_VERSION, - "x-ms-copy-source" => "https://$(src.storageaccount).blob.core.windows.net/$(src.containername)/$(addprefix(src,blob))" - ], - retry = false, - verbose = src.verbose, - connect_timeout = src.connect_timeout, - readtimeout = src.read_timeout) + @sync for blob in blobs + @async cp(src, blob, dst, blob) end nothing end @@ -1240,6 +1416,6 @@ Note that the information stored is global, and not specfic to any one given IO """ getperf_counters() = @ccall libAzStorage.getperf_counters()::PerfCounters -export AzContainer, containers, readdlm, writedlm +export AzContainer, containers, readdlm, status, writedlm end diff --git a/test/runtests.jl b/test/runtests.jl index 11d1107..38fd573 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -48,6 +48,9 @@ end storageaccount = ENV["STORAGE_ACCOUNT"] @info "storageaccount=$storageaccount" +storageaccount_too = get(ENV, "STORAGE_ACCOUNT_TOO", "") +@info "storageaccount_too=$storageaccount_too" + for container in containers(;storageaccount=storageaccount,session=session) rm(AzContainer(container;storageaccount=storageaccount,session=session)) end @@ -545,9 +548,32 @@ end write(c, "foo.txt", "Hello world") cp(c, "foo.txt", c, "bar.txt") @test read(c, "bar.txt", String) == "Hello world" + cp(c, "foo.txt", c, "baz.txt"; async=true) + timeout = 30 + tic = time() + while status(c, "baz.txt")["status"] != "success" + if time() - tic > timeout + error("failed async copy") + end + sleep(1) + end + s = status(c, "baz.txt") rm(c) end +@testset "Container, copy blob to blob, different storage accounts" begin + r = uuid4() + c1 = AzContainer("foo-$r-o", storageaccount=storageaccount, session=session, nthreads=2, nretry=10) + c1 = robust_mkpath(c1) + write(c1, "foo.txt", "Hello world") + c2 = AzContainer("foo-$r-o", storageaccount=storageaccount_too, session=session, nthreads=2, nretry=10) + c2 = robust_mkpath(c2) + cp(c1, "foo.txt", c2, "bar.txt") + @test read(c2, "bar.txt", String) == "Hello world" + rm(c1) + rm(c2) +end + @testset "Object, copy blob to local file" begin r = uuid4() c = AzContainer("foo-$r-o", storageaccount=storageaccount, session=session, nthreads=2, nretry=10)