Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ jobs:
run: |
az group create -l southcentralus -n "AzureBackupRG-azstorage-${{ matrix.os }}-${{ matrix.version }}-${{ github.run_id }}"
az storage account create --min-tls-version TLS1_2 -n "s${{ steps.uuid.outputs.uuid }}" -g "AzureBackupRG-azstorage-${{ matrix.os }}-${{ matrix.version }}-${{ github.run_id }}" -l southcentralus
az storage account create --min-tls-version TLS1_2 -n "s${{ steps.uuid.outputs.uuid }}2" -g "AzureBackupRG-azstorage-${{ matrix.os }}-${{ matrix.version }}-${{ github.run_id }}" -l southcentralus
- uses: julia-actions/cache@v1
- uses: julia-actions/julia-buildpkg@v1
- uses: julia-actions/julia-runtest@v1
Expand All @@ -45,6 +46,7 @@ jobs:
CLIENT_SECRET: ${{ secrets.CLIENT_SECRET }}
TENANT: ${{ secrets.TENANT_ID }}
STORAGE_ACCOUNT: "s${{ steps.uuid.outputs.uuid }}"
STORAGE_ACCOUNT_TOO: "s${{ steps.uuid.outputs.uuid }}2"
- uses: julia-actions/julia-processcoverage@v1
- uses: codecov/codecov-action@v5
with:
Expand Down
5 changes: 4 additions & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name = "AzStorage"
uuid = "c6697862-1611-5eae-9ef8-48803c85c8d6"
version = "2.7.2"
version = "2.7.3"

[deps]
AbstractStorage = "14dbef02-f468-5f15-853e-5ec8dee7b899"
Expand All @@ -12,6 +12,7 @@ DelimitedFiles = "8bb1440f-4735-579b-a4ab-409b98df4dab"
HTTP = "cd3eb016-35fb-5094-929b-558a96fad6f3"
Printf = "de0858da-6303-5e67-8744-51eddeeeb8d7"
ProgressMeter = "92933f4c-e287-5a05-a399-4b506db050ca"
SHA = "ea8e919c-243c-51af-8825-aaa63cd721ce"
Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b"
Sockets = "6462fe0b-24de-5631-8697-dd941f90decc"
XML = "72c71f33-b9b6-44de-8c94-c961784809e2"
Expand All @@ -20,8 +21,10 @@ XML = "72c71f33-b9b6-44de-8c94-c961784809e2"
AbstractStorage = "^1.3"
AzSessions = "2"
AzStorage_jll = "0.9"
Base64 = "1"
DelimitedFiles = "1"
HTTP = "1"
ProgressMeter = "1"
SHA = "0.7"
XML = "0.3"
julia = "^1.6"
212 changes: 194 additions & 18 deletions src/AzStorage.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module AzStorage

using AbstractStorage, AzSessions, AzStorage_jll, Base64, Dates, DelimitedFiles, XML, HTTP, Printf, ProgressMeter, Serialization, Sockets
using AbstractStorage, AzSessions, AzStorage_jll, Base64, Dates, DelimitedFiles, XML, HTTP, Printf, ProgressMeter, SHA, Serialization, Sockets

# https://docs.microsoft.com/en-us/rest/api/storageservices/common-rest-api-error-codes
# https://learn.microsoft.com/en-us/azure/azure-resource-manager/management/request-limits-and-throttling
Expand Down Expand Up @@ -854,9 +854,196 @@ function Base.cp(inc::AzContainer, inb::AbstractString, out::AbstractString; buf
close(io)
end

function Base.cp(inc::AzContainer, inb::AbstractString, outc::AzContainer, outb::AbstractString)
bytes = read!(inc, inb, Vector{UInt8}(undef, filesize(inc, inb)))
write(outc, outb, bytes)
#=
If the source and destination storage accounts for a blob copy are different, then the Azure storage API does not allow us
to use OAuth2/RBAC directly for the source blob. But, we can use a user delegation SAS token which is built in the following
two methods: 'generate_user_delegation_key' and 'get_user_delegation_sas'.
=#
function get_user_delegation_key(c::AzContainer; start=now(UTC), expiry=now(UTC)+Hour(1))
start_str = Dates.format(start, "yyyy-mm-ddTHH:MM:SSZ")
expiry_str = Dates.format(expiry, "yyyy-mm-ddTHH:MM:SSZ")

r = @retry c.nretry HTTP.request(
"POST",
"https://$(c.storageaccount).blob.core.windows.net/?restype=service&comp=userdelegationkey",
[
"Authorization" => "Bearer $(token(c.session))",
"x-ms-version" => API_VERSION,
"Content-Type" => "application/xml"
],
"""
<?xml version="1.0" encoding="utf-8"?>
<KeyInfo>
<Start>$start_str</Start>
<Expiry>$expiry_str</Expiry>
</KeyInfo>
""";
retry = false,
verbose = c.verbose,
connect_timeout = c.connect_timeout,
readtimeout = c.read_timeout)

b = XML.parse(String(r.body), LazyNode)
delegation_key = Dict{String,String}()
for child in children(b)
if tag(child) == "UserDelegationKey"
for grandchild in children(child)
if tag(grandchild) in ("SignedOid", "SignedTid", "SignedStart", "SignedExpiry", "SignedService", "SignedVersion", "Value")
delegation_key[string(tag(grandchild))] = value(first(children(grandchild)))
end
end
end
end

delegation_key
end

function generate_user_delegation_sas(c::AzContainer, b::AbstractString; permissions="r", start=now(UTC), expiry=now(UTC)+Hour(1))
delegation_key = get_user_delegation_key(c; start, expiry)

signedPermissions = permissions
signedStart = Dates.format(start, "yyyy-mm-ddTHH:MM:SSZ")
signedExpiry = Dates.format(expiry, "yyyy-mm-ddTHH:MM:SSZ")
canonicalizedResource = "/blob/$(c.storageaccount)/$(c.containername)/$(addprefix(c,b))"
signedKeyObjectId = delegation_key["SignedOid"]
signedKeyTenantId = delegation_key["SignedTid"]
signedKeyStart = delegation_key["SignedStart"]
signedKeyExpiry = delegation_key["SignedExpiry"]
signedKeyService = delegation_key["SignedService"]
signedKeyVersion = delegation_key["SignedVersion"]
signedAuthorizedUserObjectId = ""
signedUnauthorizedUserObjectId = ""
signedCorrelationId = ""
signedIP = ""
signedProtocol = "https"
signedVersion = API_VERSION
signedResource = "b"
signedSnapshotTime = ""
signedEncryptionScope = ""
rscc = ""
rscd = ""
rsce = ""
rscl = ""
rsct = ""

string_to_sign =
signedPermissions * "\n" *
signedStart * "\n" *
signedExpiry * "\n" *
canonicalizedResource * "\n" *
signedKeyObjectId * "\n" *
signedKeyTenantId * "\n" *
signedKeyStart * "\n" *
signedKeyExpiry * "\n" *
signedKeyService * "\n" *
signedKeyVersion * "\n" *
signedAuthorizedUserObjectId * "\n" *
signedUnauthorizedUserObjectId * "\n" *
signedCorrelationId * "\n" *
"\n" *
"\n" *
signedIP * "\n" *
signedProtocol * "\n" *
signedVersion * "\n" *
signedResource * "\n" *
signedSnapshotTime * "\n" *
signedEncryptionScope * "\n" *
rscc * "\n" *
rscd * "\n" *
rsce * "\n" *
rscl * "\n" *
rsct

# sign the string using the delegation key
key = base64decode(delegation_key["Value"])
message = collect(codeunits(string_to_sign))
signed_string = HTTP.escapeuri(base64encode(hmac_sha256(key, message)))

# sas token
"sp=$signedPermissions&" *
"st=$signedStart&" *
"se=$signedExpiry&" *
"skoid=$signedKeyObjectId&" *
"sktid=$signedKeyTenantId&" *
"skt=$signedKeyStart&" *
"ske=$signedKeyExpiry&" *
"sks=$signedKeyService&" *
"skv=$signedKeyVersion&" *
(isempty(signedIP) ? "" : "sip=$signedIP&") *
"spr=$signedProtocol&" *
"sv=$signedVersion&" *
"sr=$signedResource&" *
"sig=$signed_string"
end

function status(c::AzContainer, b::AbstractString)
r_status = @retry c.nretry HTTP.request(
"HEAD",
"https://$(c.storageaccount).blob.core.windows.net/$(c.containername)/$(addprefix(c,b))",
[
"Authorization" => "Bearer $(token(c.session))",
"x-ms-version" => API_VERSION
];
retry = false,
verbose = c.verbose,
connect_timeout = c.connect_timeout,
readtimeout = c.read_timeout
)

copy_status = HTTP.header(r_status, "x-ms-copy-status")
copy_progress = HTTP.header(r_status, "x-ms-copy-progress")
copy_reason = HTTP.header(r_status, "x-ms-copy-status-description")

Dict("status"=>copy_status, "progress"=>copy_progress, "reason"=>copy_reason)
end

function Base.cp(inc::AzContainer, inb::AbstractString, outc::AzContainer, outb::AbstractString; showprogress=false, async=false)
source_url = "https://$(inc.storageaccount).blob.core.windows.net/$(inc.containername)/$(addprefix(inc,inb))"

if inc.storageaccount != outc.storageaccount
sas = generate_user_delegation_sas(inc, inb; permissions="r", start=now(UTC), expiry=now(UTC)+Hour(1))
source_url *= "?$sas"
end

headers = [
"Authorization" => "Bearer $(token(outc.session))",
"x-ms-version" => API_VERSION,
"x-ms-copy-source" => source_url
]

r_copy = @retry inc.nretry HTTP.request(
"PUT",
"https://$(outc.storageaccount).blob.core.windows.net/$(outc.containername)/$(addprefix(outc,outb))",
headers;
retry = false,
verbose = inc.verbose,
connect_timeout = inc.connect_timeout,
readtimeout = inc.read_timeout
)

if !async && r_copy.status == 202
while true
local stat
try
stat = status(outc, outb)
catch
@warn "unable to get copy status for blob copy, retrying..."
stat = Dict("status"=>"unknown")
end

if stat["status"] == "success"
break
elseif stat["status"] == "aborted"
error("blob copy aborted, dest=$(outc.storageaccount): $(outc.containername)/$(addprefix(outc,outb)), reason=$(stat["reason"])")
break
elseif stat["status"] == "pending" && showprogress
print("copy progress: $(stat["progress"])\r")
end
sleep(1)
end
end

nothing
end

"""
Expand Down Expand Up @@ -1193,19 +1380,8 @@ function Base.cp(src::AzContainer, dst::AzContainer)
mkpath(dst)

blobs = readdir(src)
for blob in blobs
@retry dst.nretry HTTP.request(
"PUT",
"https://$(dst.storageaccount).blob.core.windows.net/$(dst.containername)/$(addprefix(dst,blob))",
[
"Authorization" => "Bearer $(token(dst.session))",
"x-ms-version" => API_VERSION,
"x-ms-copy-source" => "https://$(src.storageaccount).blob.core.windows.net/$(src.containername)/$(addprefix(src,blob))"
],
retry = false,
verbose = src.verbose,
connect_timeout = src.connect_timeout,
readtimeout = src.read_timeout)
@sync for blob in blobs
@async cp(src, blob, dst, blob)
end
nothing
end
Expand Down Expand Up @@ -1240,6 +1416,6 @@ Note that the information stored is global, and not specfic to any one given IO
"""
getperf_counters() = @ccall libAzStorage.getperf_counters()::PerfCounters

export AzContainer, containers, readdlm, writedlm
export AzContainer, containers, readdlm, status, writedlm

end
26 changes: 26 additions & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ end
storageaccount = ENV["STORAGE_ACCOUNT"]
@info "storageaccount=$storageaccount"

storageaccount_too = get(ENV, "STORAGE_ACCOUNT_TOO", "")
@info "storageaccount_too=$storageaccount_too"

for container in containers(;storageaccount=storageaccount,session=session)
rm(AzContainer(container;storageaccount=storageaccount,session=session))
end
Expand Down Expand Up @@ -545,9 +548,32 @@ end
write(c, "foo.txt", "Hello world")
cp(c, "foo.txt", c, "bar.txt")
@test read(c, "bar.txt", String) == "Hello world"
cp(c, "foo.txt", c, "baz.txt"; async=true)
timeout = 30
tic = time()
while status(c, "baz.txt")["status"] != "success"
if time() - tic > timeout
error("failed async copy")
end
sleep(1)
end
s = status(c, "baz.txt")
rm(c)
end

@testset "Container, copy blob to blob, different storage accounts" begin
r = uuid4()
c1 = AzContainer("foo-$r-o", storageaccount=storageaccount, session=session, nthreads=2, nretry=10)
c1 = robust_mkpath(c1)
write(c1, "foo.txt", "Hello world")
c2 = AzContainer("foo-$r-o", storageaccount=storageaccount_too, session=session, nthreads=2, nretry=10)
c2 = robust_mkpath(c2)
cp(c1, "foo.txt", c2, "bar.txt")
@test read(c2, "bar.txt", String) == "Hello world"
rm(c1)
rm(c2)
end

@testset "Object, copy blob to local file" begin
r = uuid4()
c = AzContainer("foo-$r-o", storageaccount=storageaccount, session=session, nthreads=2, nretry=10)
Expand Down
Loading