|
1 | 1 | module AzStorage |
2 | 2 |
|
3 | | -using AbstractStorage, AzSessions, AzStorage_jll, Base64, Dates, DelimitedFiles, XML, HTTP, Printf, ProgressMeter, Serialization, Sockets |
| 3 | +using AbstractStorage, AzSessions, AzStorage_jll, Base64, Dates, DelimitedFiles, XML, HTTP, Printf, ProgressMeter, SHA, Serialization, Sockets |
4 | 4 |
|
5 | 5 | # https://docs.microsoft.com/en-us/rest/api/storageservices/common-rest-api-error-codes |
6 | 6 | # https://learn.microsoft.com/en-us/azure/azure-resource-manager/management/request-limits-and-throttling |
@@ -854,9 +854,196 @@ function Base.cp(inc::AzContainer, inb::AbstractString, out::AbstractString; buf |
854 | 854 | close(io) |
855 | 855 | end |
856 | 856 |
|
857 | | -function Base.cp(inc::AzContainer, inb::AbstractString, outc::AzContainer, outb::AbstractString) |
858 | | - bytes = read!(inc, inb, Vector{UInt8}(undef, filesize(inc, inb))) |
859 | | - write(outc, outb, bytes) |
| 857 | +#= |
| 858 | +If the source and destination storage accounts for a blob copy are different, then the Azure storage API does not allow us |
| 859 | +to use OAuth2/RBAC directly for the source blob. But, we can use a user delegation SAS token which is built in the following |
| 860 | +two methods: 'generate_user_delegation_key' and 'get_user_delegation_sas'. |
| 861 | +=# |
| 862 | +function get_user_delegation_key(c::AzContainer; start=now(UTC), expiry=now(UTC)+Hour(1)) |
| 863 | + start_str = Dates.format(start, "yyyy-mm-ddTHH:MM:SSZ") |
| 864 | + expiry_str = Dates.format(expiry, "yyyy-mm-ddTHH:MM:SSZ") |
| 865 | + |
| 866 | + r = @retry c.nretry HTTP.request( |
| 867 | + "POST", |
| 868 | + "https://$(c.storageaccount).blob.core.windows.net/?restype=service&comp=userdelegationkey", |
| 869 | + [ |
| 870 | + "Authorization" => "Bearer $(token(c.session))", |
| 871 | + "x-ms-version" => API_VERSION, |
| 872 | + "Content-Type" => "application/xml" |
| 873 | + ], |
| 874 | + """ |
| 875 | + <?xml version="1.0" encoding="utf-8"?> |
| 876 | + <KeyInfo> |
| 877 | + <Start>$start_str</Start> |
| 878 | + <Expiry>$expiry_str</Expiry> |
| 879 | + </KeyInfo> |
| 880 | + """; |
| 881 | + retry = false, |
| 882 | + verbose = c.verbose, |
| 883 | + connect_timeout = c.connect_timeout, |
| 884 | + readtimeout = c.read_timeout) |
| 885 | + |
| 886 | + b = XML.parse(String(r.body), LazyNode) |
| 887 | + delegation_key = Dict{String,String}() |
| 888 | + for child in children(b) |
| 889 | + if tag(child) == "UserDelegationKey" |
| 890 | + for grandchild in children(child) |
| 891 | + if tag(grandchild) in ("SignedOid", "SignedTid", "SignedStart", "SignedExpiry", "SignedService", "SignedVersion", "Value") |
| 892 | + delegation_key[string(tag(grandchild))] = value(first(children(grandchild))) |
| 893 | + end |
| 894 | + end |
| 895 | + end |
| 896 | + end |
| 897 | + |
| 898 | + delegation_key |
| 899 | +end |
| 900 | + |
| 901 | +function generate_user_delegation_sas(c::AzContainer, b::AbstractString; permissions="r", start=now(UTC), expiry=now(UTC)+Hour(1)) |
| 902 | + delegation_key = get_user_delegation_key(c; start, expiry) |
| 903 | + |
| 904 | + signedPermissions = permissions |
| 905 | + signedStart = Dates.format(start, "yyyy-mm-ddTHH:MM:SSZ") |
| 906 | + signedExpiry = Dates.format(expiry, "yyyy-mm-ddTHH:MM:SSZ") |
| 907 | + canonicalizedResource = "/blob/$(c.storageaccount)/$(c.containername)/$(addprefix(c,b))" |
| 908 | + signedKeyObjectId = delegation_key["SignedOid"] |
| 909 | + signedKeyTenantId = delegation_key["SignedTid"] |
| 910 | + signedKeyStart = delegation_key["SignedStart"] |
| 911 | + signedKeyExpiry = delegation_key["SignedExpiry"] |
| 912 | + signedKeyService = delegation_key["SignedService"] |
| 913 | + signedKeyVersion = delegation_key["SignedVersion"] |
| 914 | + signedAuthorizedUserObjectId = "" |
| 915 | + signedUnauthorizedUserObjectId = "" |
| 916 | + signedCorrelationId = "" |
| 917 | + signedIP = "" |
| 918 | + signedProtocol = "https" |
| 919 | + signedVersion = API_VERSION |
| 920 | + signedResource = "b" |
| 921 | + signedSnapshotTime = "" |
| 922 | + signedEncryptionScope = "" |
| 923 | + rscc = "" |
| 924 | + rscd = "" |
| 925 | + rsce = "" |
| 926 | + rscl = "" |
| 927 | + rsct = "" |
| 928 | + |
| 929 | + string_to_sign = |
| 930 | + signedPermissions * "\n" * |
| 931 | + signedStart * "\n" * |
| 932 | + signedExpiry * "\n" * |
| 933 | + canonicalizedResource * "\n" * |
| 934 | + signedKeyObjectId * "\n" * |
| 935 | + signedKeyTenantId * "\n" * |
| 936 | + signedKeyStart * "\n" * |
| 937 | + signedKeyExpiry * "\n" * |
| 938 | + signedKeyService * "\n" * |
| 939 | + signedKeyVersion * "\n" * |
| 940 | + signedAuthorizedUserObjectId * "\n" * |
| 941 | + signedUnauthorizedUserObjectId * "\n" * |
| 942 | + signedCorrelationId * "\n" * |
| 943 | + "\n" * |
| 944 | + "\n" * |
| 945 | + signedIP * "\n" * |
| 946 | + signedProtocol * "\n" * |
| 947 | + signedVersion * "\n" * |
| 948 | + signedResource * "\n" * |
| 949 | + signedSnapshotTime * "\n" * |
| 950 | + signedEncryptionScope * "\n" * |
| 951 | + rscc * "\n" * |
| 952 | + rscd * "\n" * |
| 953 | + rsce * "\n" * |
| 954 | + rscl * "\n" * |
| 955 | + rsct |
| 956 | + |
| 957 | + # sign the string using the delegation key |
| 958 | + key = base64decode(delegation_key["Value"]) |
| 959 | + message = collect(codeunits(string_to_sign)) |
| 960 | + signed_string = HTTP.escapeuri(base64encode(hmac_sha256(key, message))) |
| 961 | + |
| 962 | + # sas token |
| 963 | + "sp=$signedPermissions&" * |
| 964 | + "st=$signedStart&" * |
| 965 | + "se=$signedExpiry&" * |
| 966 | + "skoid=$signedKeyObjectId&" * |
| 967 | + "sktid=$signedKeyTenantId&" * |
| 968 | + "skt=$signedKeyStart&" * |
| 969 | + "ske=$signedKeyExpiry&" * |
| 970 | + "sks=$signedKeyService&" * |
| 971 | + "skv=$signedKeyVersion&" * |
| 972 | + (isempty(signedIP) ? "" : "sip=$signedIP&") * |
| 973 | + "spr=$signedProtocol&" * |
| 974 | + "sv=$signedVersion&" * |
| 975 | + "sr=$signedResource&" * |
| 976 | + "sig=$signed_string" |
| 977 | +end |
| 978 | + |
| 979 | +function status(c::AzContainer, b::AbstractString) |
| 980 | + r_status = @retry c.nretry HTTP.request( |
| 981 | + "HEAD", |
| 982 | + "https://$(c.storageaccount).blob.core.windows.net/$(c.containername)/$(addprefix(c,b))", |
| 983 | + [ |
| 984 | + "Authorization" => "Bearer $(token(c.session))", |
| 985 | + "x-ms-version" => API_VERSION |
| 986 | + ]; |
| 987 | + retry = false, |
| 988 | + verbose = c.verbose, |
| 989 | + connect_timeout = c.connect_timeout, |
| 990 | + readtimeout = c.read_timeout |
| 991 | + ) |
| 992 | + |
| 993 | + copy_status = HTTP.header(r_status, "x-ms-copy-status") |
| 994 | + copy_progress = HTTP.header(r_status, "x-ms-copy-progress") |
| 995 | + copy_reason = HTTP.header(r_status, "x-ms-copy-status-description") |
| 996 | + |
| 997 | + Dict("status"=>copy_status, "progress"=>copy_progress, "reason"=>copy_reason) |
| 998 | +end |
| 999 | + |
| 1000 | +function Base.cp(inc::AzContainer, inb::AbstractString, outc::AzContainer, outb::AbstractString; showprogress=false, async=false) |
| 1001 | + source_url = "https://$(inc.storageaccount).blob.core.windows.net/$(inc.containername)/$(addprefix(inc,inb))" |
| 1002 | + |
| 1003 | + if inc.storageaccount != outc.storageaccount |
| 1004 | + sas = generate_user_delegation_sas(inc, inb; permissions="r", start=now(UTC), expiry=now(UTC)+Hour(1)) |
| 1005 | + source_url *= "?$sas" |
| 1006 | + end |
| 1007 | + |
| 1008 | + headers = [ |
| 1009 | + "Authorization" => "Bearer $(token(outc.session))", |
| 1010 | + "x-ms-version" => API_VERSION, |
| 1011 | + "x-ms-copy-source" => source_url |
| 1012 | + ] |
| 1013 | + |
| 1014 | + r_copy = @retry inc.nretry HTTP.request( |
| 1015 | + "PUT", |
| 1016 | + "https://$(outc.storageaccount).blob.core.windows.net/$(outc.containername)/$(addprefix(outc,outb))", |
| 1017 | + headers; |
| 1018 | + retry = false, |
| 1019 | + verbose = inc.verbose, |
| 1020 | + connect_timeout = inc.connect_timeout, |
| 1021 | + readtimeout = inc.read_timeout |
| 1022 | + ) |
| 1023 | + |
| 1024 | + if !async && r_copy.status == 202 |
| 1025 | + while true |
| 1026 | + local stat |
| 1027 | + try |
| 1028 | + stat = status(outc, outb) |
| 1029 | + catch |
| 1030 | + @warn "unable to get copy status for blob copy, retrying..." |
| 1031 | + stat = Dict("status"=>"unknown") |
| 1032 | + end |
| 1033 | + |
| 1034 | + if stat["status"] == "success" |
| 1035 | + break |
| 1036 | + elseif stat["status"] == "aborted" |
| 1037 | + error("blob copy aborted, dest=$(outc.storageaccount): $(outc.containername)/$(addprefix(outc,outb)), reason=$(stat["reason"])") |
| 1038 | + break |
| 1039 | + elseif stat["status"] == "pending" && showprogress |
| 1040 | + print("copy progress: $(stat["progress"])\r") |
| 1041 | + end |
| 1042 | + sleep(1) |
| 1043 | + end |
| 1044 | + end |
| 1045 | + |
| 1046 | + nothing |
860 | 1047 | end |
861 | 1048 |
|
862 | 1049 | """ |
@@ -1193,19 +1380,8 @@ function Base.cp(src::AzContainer, dst::AzContainer) |
1193 | 1380 | mkpath(dst) |
1194 | 1381 |
|
1195 | 1382 | blobs = readdir(src) |
1196 | | - for blob in blobs |
1197 | | - @retry dst.nretry HTTP.request( |
1198 | | - "PUT", |
1199 | | - "https://$(dst.storageaccount).blob.core.windows.net/$(dst.containername)/$(addprefix(dst,blob))", |
1200 | | - [ |
1201 | | - "Authorization" => "Bearer $(token(dst.session))", |
1202 | | - "x-ms-version" => API_VERSION, |
1203 | | - "x-ms-copy-source" => "https://$(src.storageaccount).blob.core.windows.net/$(src.containername)/$(addprefix(src,blob))" |
1204 | | - ], |
1205 | | - retry = false, |
1206 | | - verbose = src.verbose, |
1207 | | - connect_timeout = src.connect_timeout, |
1208 | | - readtimeout = src.read_timeout) |
| 1383 | + @sync for blob in blobs |
| 1384 | + @async cp(src, blob, dst, blob) |
1209 | 1385 | end |
1210 | 1386 | nothing |
1211 | 1387 | end |
@@ -1240,6 +1416,6 @@ Note that the information stored is global, and not specfic to any one given IO |
1240 | 1416 | """ |
1241 | 1417 | getperf_counters() = @ccall libAzStorage.getperf_counters()::PerfCounters |
1242 | 1418 |
|
1243 | | -export AzContainer, containers, readdlm, writedlm |
| 1419 | +export AzContainer, containers, readdlm, status, writedlm |
1244 | 1420 |
|
1245 | 1421 | end |
0 commit comments