Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ proxy_url = VALUE
| <a name="input_function_app_storage_account_prefix"></a> [function\_app\_storage\_account\_prefix](#input\_function\_app\_storage\_account\_prefix) | Weka storage account name prefix | `string` | `"weka"` | no |
| <a name="input_function_app_subnet_delegation_cidr"></a> [function\_app\_subnet\_delegation\_cidr](#input\_function\_app\_subnet\_delegation\_cidr) | Subnet delegation enables you to designate a specific subnet for an Azure PaaS service. | `string` | `"10.0.1.0/25"` | no |
| <a name="input_function_app_subnet_delegation_id"></a> [function\_app\_subnet\_delegation\_id](#input\_function\_app\_subnet\_delegation\_id) | Required to specify if subnet\_name were used to specify pre-defined subnets for weka. Function subnet delegation requires an additional subnet, and in the case of pre-defined networking this one also should be pre-defined | `string` | `""` | no |
| <a name="input_function_app_version"></a> [function\_app\_version](#input\_function\_app\_version) | Function app code version (hash) | `string` | `"dfbf0e60f92791206b77092d24711251"` | no |
| <a name="input_function_app_version"></a> [function\_app\_version](#input\_function\_app\_version) | Function app code version (hash) | `string` | `"f3ab109f5228ce3b2a5680a5cccdfddf"` | no |
| <a name="input_get_weka_io_token"></a> [get\_weka\_io\_token](#input\_get\_weka\_io\_token) | The token to download the Weka release from get.weka.io. | `string` | `""` | no |
| <a name="input_hotspare"></a> [hotspare](#input\_hotspare) | Number of hotspares to set on weka cluster. Refer to https://docs.weka.io/weka-system-overview/ssd-capacity-management#hot-spare | `number` | `1` | no |
| <a name="input_install_cluster_dpdk"></a> [install\_cluster\_dpdk](#input\_install\_cluster\_dpdk) | Install weka cluster with DPDK | `bool` | `true` | no |
Expand Down
77 changes: 75 additions & 2 deletions function-app/code/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ type BlobObjParams struct {
BlobName string
}

// BlobReadResult contains the blob data along with its ETag for optimistic concurrency
type BlobReadResult struct {
Data []byte
ETag *azcore.ETag
}

// ErrBlobModified is returned when a conditional write fails because the blob was modified
var ErrBlobModified = errors.New("blob was modified by another process")

type AzureObsParams struct {
Name string
ContainerName string
Expand Down Expand Up @@ -224,6 +233,15 @@ func UnlockContainer(ctx context.Context, storageAccountName, containerName stri
}

func ReadBlobObject(ctx context.Context, bl BlobObjParams) (state []byte, err error) {
result, err := ReadBlobObjectWithETag(ctx, bl)
if err != nil {
return nil, err
}
return result.Data, nil
}

// ReadBlobObjectWithETag reads a blob and returns its data along with the ETag for optimistic concurrency
func ReadBlobObjectWithETag(ctx context.Context, bl BlobObjParams) (result BlobReadResult, err error) {
logger := logging.LoggerFromCtx(ctx)

credential, err := getCredential(ctx)
Expand All @@ -243,13 +261,14 @@ func ReadBlobObject(ctx context.Context, bl BlobObjParams) (state []byte, err er
return
}

state, err = io.ReadAll(downloadResponse.Body)
result.Data, err = io.ReadAll(downloadResponse.Body)
if err != nil {
logger.Error().Err(err).Send()
return
}

result.ETag = downloadResponse.ETag
return

}

func containerExists(ctx context.Context, containerClient *container.Client, storageName, containerName string) (bool, error) {
Expand Down Expand Up @@ -402,6 +421,47 @@ func WriteBlobObject(ctx context.Context, bl BlobObjParams, state []byte) (err e

}

// WriteBlobObjectWithETag writes a blob with optimistic concurrency using ETags.
// If etag is provided, the write will only succeed if the blob's current ETag matches.
// Returns ErrBlobModified if the blob was modified by another process.
func WriteBlobObjectWithETag(ctx context.Context, bl BlobObjParams, data []byte, etag *azcore.ETag) error {
logger := logging.LoggerFromCtx(ctx)

credential, err := getCredential(ctx)
if err != nil {
return err
}

blobClient, err := azblob.NewClient(getBlobUrl(bl.StorageName), credential, nil)
if err != nil {
logger.Error().Err(err).Send()
return err
}

opts := &azblob.UploadBufferOptions{}
if etag != nil {
opts.AccessConditions = &blob.AccessConditions{
ModifiedAccessConditions: &blob.ModifiedAccessConditions{
IfMatch: etag,
},
}
}

_, err = blobClient.UploadBuffer(ctx, bl.ContainerName, bl.BlobName, data, opts)
if err != nil {
// Check if this is a precondition failed error (ETag mismatch)
var respErr *azcore.ResponseError
if errors.As(err, &respErr) && respErr.StatusCode == 412 {
logger.Warn().Msgf("blob %s was modified by another process (ETag mismatch)", bl.BlobName)
return ErrBlobModified
}
logger.Error().Err(err).Send()
return err
}

return nil
}

func WriteState(ctx context.Context, stateParams BlobObjParams, state protocol.ClusterState) (err error) {
logger := logging.LoggerFromCtx(ctx)

Expand Down Expand Up @@ -1723,6 +1783,19 @@ func GetScaleSetVmsExpandedView(ctx context.Context, p *ScaleSetParams) ([]*VMIn
}
}

func GetScaleSetVmInstanceIds(ctx context.Context, p *ScaleSetParams) ([]string, error) {
vms, err := GetScaleSetVmsExpandedView(ctx, p)
if err != nil {
return nil, err
}

ids := make([]string, 0, len(vms))
for _, vm := range vms {
ids = append(ids, vm.InstanceID)
}
return ids, nil
}

func GetAzureInstanceNameCmd() string {
return "curl -s -H Metadata:true --noproxy * http://169.254.169.254/metadata/instance?api-version=2021-02-01 | jq '.compute.name' | cut -c2- | rev | cut -c2- | rev"
}
Expand Down
Loading