Skip to content

Commit 3b24b9a

Browse files
committed
feat: add instance refresh support
1 parent e9e907d commit 3b24b9a

File tree

16 files changed

+883
-36
lines changed

16 files changed

+883
-36
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,7 @@ proxy_url = VALUE
471471
| <a name="input_function_app_storage_account_prefix"></a> [function\_app\_storage\_account\_prefix](#input\_function\_app\_storage\_account\_prefix) | Weka storage account name prefix | `string` | `"weka"` | no |
472472
| <a name="input_function_app_subnet_delegation_cidr"></a> [function\_app\_subnet\_delegation\_cidr](#input\_function\_app\_subnet\_delegation\_cidr) | Subnet delegation enables you to designate a specific subnet for an Azure PaaS service. | `string` | `"10.0.1.0/25"` | no |
473473
| <a name="input_function_app_subnet_delegation_id"></a> [function\_app\_subnet\_delegation\_id](#input\_function\_app\_subnet\_delegation\_id) | Required to specify if subnet\_name were used to specify pre-defined subnets for weka. Function subnet delegation requires an additional subnet, and in the case of pre-defined networking this one also should be pre-defined | `string` | `""` | no |
474-
| <a name="input_function_app_version"></a> [function\_app\_version](#input\_function\_app\_version) | Function app code version (hash) | `string` | `"d6feec06f7ec19fef73ca148cf1449cc"` | no |
474+
| <a name="input_function_app_version"></a> [function\_app\_version](#input\_function\_app\_version) | Function app code version (hash) | `string` | `"f3ab109f5228ce3b2a5680a5cccdfddf"` | no |
475475
| <a name="input_get_weka_io_token"></a> [get\_weka\_io\_token](#input\_get\_weka\_io\_token) | The token to download the Weka release from get.weka.io. | `string` | `""` | no |
476476
| <a name="input_hotspare"></a> [hotspare](#input\_hotspare) | Number of hotspares to set on weka cluster. Refer to https://docs.weka.io/weka-system-overview/ssd-capacity-management#hot-spare | `number` | `1` | no |
477477
| <a name="input_install_cluster_dpdk"></a> [install\_cluster\_dpdk](#input\_install\_cluster\_dpdk) | Install weka cluster with DPDK | `bool` | `true` | no |

function-app/code/common/common.go

Lines changed: 75 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,15 @@ type BlobObjParams struct {
6969
BlobName string
7070
}
7171

72+
// BlobReadResult contains the blob data along with its ETag for optimistic concurrency
73+
type BlobReadResult struct {
74+
Data []byte
75+
ETag *azcore.ETag
76+
}
77+
78+
// ErrBlobModified is returned when a conditional write fails because the blob was modified
79+
var ErrBlobModified = errors.New("blob was modified by another process")
80+
7281
type AzureObsParams struct {
7382
Name string
7483
ContainerName string
@@ -224,6 +233,15 @@ func UnlockContainer(ctx context.Context, storageAccountName, containerName stri
224233
}
225234

226235
func ReadBlobObject(ctx context.Context, bl BlobObjParams) (state []byte, err error) {
236+
result, err := ReadBlobObjectWithETag(ctx, bl)
237+
if err != nil {
238+
return nil, err
239+
}
240+
return result.Data, nil
241+
}
242+
243+
// ReadBlobObjectWithETag reads a blob and returns its data along with the ETag for optimistic concurrency
244+
func ReadBlobObjectWithETag(ctx context.Context, bl BlobObjParams) (result BlobReadResult, err error) {
227245
logger := logging.LoggerFromCtx(ctx)
228246

229247
credential, err := getCredential(ctx)
@@ -243,13 +261,14 @@ func ReadBlobObject(ctx context.Context, bl BlobObjParams) (state []byte, err er
243261
return
244262
}
245263

246-
state, err = io.ReadAll(downloadResponse.Body)
264+
result.Data, err = io.ReadAll(downloadResponse.Body)
247265
if err != nil {
248266
logger.Error().Err(err).Send()
267+
return
249268
}
250269

270+
result.ETag = downloadResponse.ETag
251271
return
252-
253272
}
254273

255274
func containerExists(ctx context.Context, containerClient *container.Client, storageName, containerName string) (bool, error) {
@@ -402,6 +421,47 @@ func WriteBlobObject(ctx context.Context, bl BlobObjParams, state []byte) (err e
402421

403422
}
404423

424+
// WriteBlobObjectWithETag writes a blob with optimistic concurrency using ETags.
425+
// If etag is provided, the write will only succeed if the blob's current ETag matches.
426+
// Returns ErrBlobModified if the blob was modified by another process.
427+
func WriteBlobObjectWithETag(ctx context.Context, bl BlobObjParams, data []byte, etag *azcore.ETag) error {
428+
logger := logging.LoggerFromCtx(ctx)
429+
430+
credential, err := getCredential(ctx)
431+
if err != nil {
432+
return err
433+
}
434+
435+
blobClient, err := azblob.NewClient(getBlobUrl(bl.StorageName), credential, nil)
436+
if err != nil {
437+
logger.Error().Err(err).Send()
438+
return err
439+
}
440+
441+
opts := &azblob.UploadBufferOptions{}
442+
if etag != nil {
443+
opts.AccessConditions = &blob.AccessConditions{
444+
ModifiedAccessConditions: &blob.ModifiedAccessConditions{
445+
IfMatch: etag,
446+
},
447+
}
448+
}
449+
450+
_, err = blobClient.UploadBuffer(ctx, bl.ContainerName, bl.BlobName, data, opts)
451+
if err != nil {
452+
// Check if this is a precondition failed error (ETag mismatch)
453+
var respErr *azcore.ResponseError
454+
if errors.As(err, &respErr) && respErr.StatusCode == 412 {
455+
logger.Warn().Msgf("blob %s was modified by another process (ETag mismatch)", bl.BlobName)
456+
return ErrBlobModified
457+
}
458+
logger.Error().Err(err).Send()
459+
return err
460+
}
461+
462+
return nil
463+
}
464+
405465
func WriteState(ctx context.Context, stateParams BlobObjParams, state protocol.ClusterState) (err error) {
406466
logger := logging.LoggerFromCtx(ctx)
407467

@@ -1723,6 +1783,19 @@ func GetScaleSetVmsExpandedView(ctx context.Context, p *ScaleSetParams) ([]*VMIn
17231783
}
17241784
}
17251785

1786+
func GetScaleSetVmInstanceIds(ctx context.Context, p *ScaleSetParams) ([]string, error) {
1787+
vms, err := GetScaleSetVmsExpandedView(ctx, p)
1788+
if err != nil {
1789+
return nil, err
1790+
}
1791+
1792+
ids := make([]string, 0, len(vms))
1793+
for _, vm := range vms {
1794+
ids = append(ids, vm.InstanceID)
1795+
}
1796+
return ids, nil
1797+
}
1798+
17261799
func GetAzureInstanceNameCmd() string {
17271800
return "curl -s -H Metadata:true --noproxy * http://169.254.169.254/metadata/instance?api-version=2021-02-01 | jq '.compute.name' | cut -c2- | rev | cut -c2- | rev"
17281801
}

0 commit comments

Comments
 (0)