Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 113 additions & 18 deletions broker/stores/azure/account.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package azure

import (
"context"
"fmt"
"net/url"
"os"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
"github.com/Azure/azure-storage-blob-go/azblob"
Expand All @@ -21,7 +25,13 @@ type accountStore struct {
sasKey *service.SharedKeyCredential
}

// NewAccount creates a new Azure Account authenticated Store from the provided URL.
// NewAccount creates a new Azure Store from the provided URL (azure://container/prefix).
// Authentication is chosen in order:
// - If AZURE_ACCOUNT_NAME and AZURE_ACCOUNT_KEY are set: Shared Key (existing behavior, backwards compatible).
// - Else if AZURE_ACCOUNT_NAME, AZURE_TENANT_ID, and both AZURE_CLIENT_ID and AZURE_CLIENT_SECRET are set: client secret.
// - Else if AZURE_ACCOUNT_NAME and AZURE_TENANT_ID are set: DefaultAzureCredential (workload identity, managed identity, Azure CLI).
//
// So existing customers keep using azure:// with account key; migrating to managed identity only requires setting AZURE_TENANT_ID and removing the key (and optionally AZURE_CLIENT_ID for workload identity).
func NewAccount(ep *url.URL) (stores.Store, error) {
var args StoreQueryArgs

Expand All @@ -35,49 +45,134 @@ func NewAccount(ep *url.URL) (stores.Store, error) {
var storageAccount = os.Getenv("AZURE_ACCOUNT_NAME")
var accountKey = os.Getenv("AZURE_ACCOUNT_KEY")

if storageAccount == "" || accountKey == "" {
return nil, fmt.Errorf("AZURE_ACCOUNT_NAME and AZURE_ACCOUNT_KEY must be set for azure:// URLs")
}

// arize change to support china cloud
blobDomain := os.Getenv("AZURE_BLOB_DOMAIN")
var blobDomain = os.Getenv("AZURE_BLOB_DOMAIN")
if blobDomain == "" {
blobDomain = "blob.core.windows.net"
}

credentials, err := azblob.NewSharedKeyCredential(storageAccount, accountKey)
if err != nil {
return nil, err
// Shared Key: backwards-compatible path for existing azure:// users.
if storageAccount != "" && accountKey != "" {
credentials, err := azblob.NewSharedKeyCredential(storageAccount, accountKey)
if err != nil {
return nil, err
}

var pipeline = azblob.NewPipeline(credentials, azblob.PipelineOptions{})

sasKey, err := service.NewSharedKeyCredential(storageAccount, accountKey)
if err != nil {
return nil, err
}

var store = &accountStore{
storeBase: storeBase{
storageAccount: storageAccount,
blobDomain: blobDomain,
container: container,
prefix: prefix,
args: args,
pipeline: pipeline,
},
sasKey: sasKey,
}

log.WithFields(log.Fields{
"storageAccount": storageAccount,
"blobDomain": blobDomain,
"container": container,
"prefix": prefix,
}).Info("constructed new Azure Shared Key storage client")

return store, nil
}

var pipeline = azblob.NewPipeline(credentials, azblob.PipelineOptions{})
// AD auth (client secret or workload/managed identity): requires tenant and storage account from env.
var tenantID = os.Getenv("AZURE_TENANT_ID")
if storageAccount == "" || tenantID == "" {
return nil, fmt.Errorf("azure:// requires either AZURE_ACCOUNT_NAME+AZURE_ACCOUNT_KEY (shared key) or AZURE_ACCOUNT_NAME+AZURE_TENANT_ID (AD / workload identity)")
}

var credentials azcore.TokenCredential
var err error
var clientID = os.Getenv("AZURE_CLIENT_ID")
var clientSecret = os.Getenv("AZURE_CLIENT_SECRET")
var authMethod string
if clientID != "" && clientSecret != "" {
credentials, err = azidentity.NewClientSecretCredential(
tenantID,
clientID,
clientSecret,
&azidentity.ClientSecretCredentialOptions{
DisableInstanceDiscovery: true,
},
)
if err != nil {
return nil, err
}
authMethod = "client secret"
} else {
credentials, err = azidentity.NewDefaultAzureCredential(&azidentity.DefaultAzureCredentialOptions{
TenantID: tenantID,
DisableInstanceDiscovery: true,
})
if err != nil {
return nil, err
}
authMethod = "workload identity / default chain"
}

var refreshFn = func(credential azblob.TokenCredential) time.Duration {
if token, err := credentials.GetToken(
context.Background(),
policy.TokenRequestOptions{
TenantID: tenantID,
Scopes: []string{"https://storage.azure.com/.default"},
},
); err != nil {
log.WithFields(log.Fields{
"err": err,
"tenant": tenantID,
}).Errorf("failed to refresh Azure credential (will retry)")
return time.Minute
} else {
credential.SetToken(token.Token)
return token.ExpiresOn.Sub(time.Now().Add(time.Minute))
}
}
var accessKey = azblob.NewTokenCredential("", refreshFn)

// Create the new SDK credential for SAS signing
sasKey, err := service.NewSharedKeyCredential(storageAccount, accountKey)
client, err := service.NewClient(
azureStorageURL(storageAccount, blobDomain),
credentials,
&service.ClientOptions{},
)
if err != nil {
return nil, err
}

var store = &accountStore{
var adStoreInstance = &adStore{
storeBase: storeBase{
storageAccount: storageAccount,
blobDomain: blobDomain,
container: container,
prefix: prefix,
args: args,
pipeline: pipeline,
pipeline: azblob.NewPipeline(accessKey, azblob.PipelineOptions{}),
},
sasKey: sasKey,
tenantID: tenantID,
client: client,
}

log.WithFields(log.Fields{
"tenant": tenantID,
"storageAccount": storageAccount,
"blobDomain": blobDomain,
"container": container,
"prefix": prefix,
}).Info("constructed new Azure Shared Key storage client")
"auth": authMethod,
}).Info("constructed new Azure AD storage client (azure://)")

return store, nil
return adStoreInstance, nil
}

// SignGet returns a signed URL for GET operations using Shared Key signing
Expand Down
42 changes: 14 additions & 28 deletions broker/stores/azure/ad.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"sync"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
Expand Down Expand Up @@ -37,9 +36,6 @@ type adStore struct {
}

// NewAD creates a new Azure AD authenticated Store from the provided URL.
// Authentication: if AZURE_CLIENT_ID and AZURE_CLIENT_SECRET are both set, client secret is used.
// Otherwise DefaultAzureCredential is used, which supports workload identity (e.g. in Kubernetes
// with AZURE_CLIENT_ID, AZURE_TENANT_ID, AZURE_FEDERATED_TOKEN_FILE), managed identity, and Azure CLI.
func NewAD(ep *url.URL) (stores.Store, error) {
var args StoreQueryArgs

Expand All @@ -57,40 +53,31 @@ func NewAD(ep *url.URL) (stores.Store, error) {
var container = path[1]
var prefix = strings.Join(path[2:], "/")

var clientID = os.Getenv("AZURE_CLIENT_ID")
var clientSecret = os.Getenv("AZURE_CLIENT_SECRET")

if clientID == "" || clientSecret == "" {
return nil, fmt.Errorf("AZURE_CLIENT_ID and AZURE_CLIENT_SECRET must be set for azure-ad:// URLs")
}

// arize change to support china cloud
blobDomain := os.Getenv("AZURE_BLOB_DOMAIN")
if blobDomain == "" {
blobDomain = "blob.core.windows.net"
}

var credentials azcore.TokenCredential
var err error
var clientID = os.Getenv("AZURE_CLIENT_ID")
var clientSecret = os.Getenv("AZURE_CLIENT_SECRET")
if clientID != "" && clientSecret != "" {
credentials, err = azidentity.NewClientSecretCredential(
tenantID,
clientID,
clientSecret,
&azidentity.ClientSecretCredentialOptions{
DisableInstanceDiscovery: true,
},
)
} else {
credentials, err = azidentity.NewDefaultAzureCredential(&azidentity.DefaultAzureCredentialOptions{
TenantID: tenantID,
var credentials, err = azidentity.NewClientSecretCredential(
tenantID,
clientID,
clientSecret,
&azidentity.ClientSecretCredentialOptions{
DisableInstanceDiscovery: true,
})
}
},
)
if err != nil {
return nil, err
}

var authMethod = "workload identity / default chain"
if clientID != "" && clientSecret != "" {
authMethod = "client secret"
}

var refreshFn = func(credential azblob.TokenCredential) time.Duration {
if token, err := credentials.GetToken(
context.Background(),
Expand Down Expand Up @@ -139,7 +126,6 @@ func NewAD(ep *url.URL) (stores.Store, error) {
"blobDomain": blobDomain,
"container": container,
"prefix": prefix,
"auth": authMethod,
}).Info("constructed new Azure AD storage client")

return store, nil
Expand Down
4 changes: 2 additions & 2 deletions cmd/gazette/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,8 @@ func isIPv6(s string) bool {
if ip == nil {
return false
}
// Check if it's IPv6 by seeing if it contains a colon
return strings.Contains(s, ":")
// Only IPv6 addresses may be wrapped in brackets in URLs; IPv4 must not be.
return ip.To4() == nil
}

func main() {
Expand Down