Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,16 @@ common:
parquetStatsSkipIndex:
enabled: false # whether to skip parquet stats index when reading; set true to enable skipping.
storageType: remote # please adjust in embedded Milvus: local, available values are [local, remote, opendal], value minio is deprecated, use remote instead
storage:
manifestTransactionRetryLimit: 10 # Maximum number of retry attempts for V3 storage manifest transaction commits on optimistic concurrency conflicts
stv2:
splitSystemColumn:
enabled: true # enable split system column policy in storage v2
includePK: true # whether split system column policy include pk field
splitByAvgSize:
enabled: true # enable split by average size policy in storage v2
threshold: 1024 # split by average size policy threshold(in bytes) in storage v2
useLoonFFI: true
# Default value: auto
# Valid values: [auto, avx512, avx2, avx, sse4_2]
# This configuration is only used by querynode and indexnode, it selects CPU instruction set for Searching and Index-building.
Expand Down Expand Up @@ -1027,15 +1037,6 @@ common:
info: 500 # minimum milliseconds for printing durations in info level
warn: 1000 # minimum milliseconds for printing durations in warn level
maxWLockConditionalWaitTime: 600 # maximum seconds for waiting wlock conditional
storage:
stv2:
splitSystemColumn:
enabled: true # enable split system column policy in storage v2
includePK: true # whether split system column policy include pk field
splitByAvgSize:
enabled: true # enable split by average size policy in storage v2
threshold: 1024 # split by average size policy threshold(in bytes) in storage v2
useLoonFFI: true
traceLogMode: 0 # trace request info
bloomFilterSize: 100000 # bloom filter initial size
bloomFilterType: BlockedBloomFilter # bloom filter type, support BasicBloomFilter and BlockedBloomFilter
Expand Down
4 changes: 2 additions & 2 deletions internal/storagev2/packed/manifest_ffi.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ func CreateManifestForSegment(
cBasePath := C.CString(basePath)
defer C.free(unsafe.Pointer(cBasePath))

// Begin transaction (read_version=-1 for latest, retry_limit=1)
// Begin transaction (read_version=-1 for latest, retry_limit=10)
var transactionHandle C.LoonTransactionHandle
result := C.loon_transaction_begin(cBasePath, cProperties, C.int64_t(-1), C.uint32_t(1), &transactionHandle)
result := C.loon_transaction_begin(cBasePath, cProperties, C.int64_t(-1), getRetryLimit(), &transactionHandle)
if err := HandleLoonFFIResult(result); err != nil {
return "", fmt.Errorf("loon_transaction_begin failed: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/storagev2/packed/packed_writer_ffi.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (pw *FFIPackedWriter) Close() (string, error) {
defer C.free(unsafe.Pointer(cBasePath))
var transationHandle C.LoonTransactionHandle

result = C.loon_transaction_begin(cBasePath, pw.cProperties, C.int64_t(pw.baseVersion), 1, &transationHandle)
result = C.loon_transaction_begin(cBasePath, pw.cProperties, C.int64_t(pw.baseVersion), getRetryLimit(), &transationHandle)
if err := HandleLoonFFIResult(result); err != nil {
return "", err
}
Expand Down
22 changes: 20 additions & 2 deletions internal/storagev2/packed/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,32 @@ import "C"

import (
"fmt"
"math"
"unsafe"

"go.uber.org/zap"

"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)

// getRetryLimit returns the configured manifest transaction retry limit.
// Multiple stats tasks (text index, JSON key, BM25) can write to the same
// segment's manifest concurrently, causing optimistic transaction conflicts.
// The retry mechanism re-reads the latest manifest version and re-applies
// the changes on each attempt.
func getRetryLimit() C.uint32_t {
val := paramtable.Get().CommonCfg.ManifestTransactionRetryLimit.GetAsInt64()
if val <= 0 {
val = 10
}
if val > math.MaxUint32 {
val = math.MaxUint32
}
return C.uint32_t(val)
}

// DeltaLogEntry represents a delta log to be added to the manifest
type DeltaLogEntry struct {
Path string // Full path to the deltalog file
Expand Down Expand Up @@ -74,7 +92,7 @@ func AddDeltaLogsToManifest(

// Start transaction
var transactionHandle C.LoonTransactionHandle
result := C.loon_transaction_begin(cBasePath, cProperties, C.int64_t(version), 1, &transactionHandle)
result := C.loon_transaction_begin(cBasePath, cProperties, C.int64_t(version), getRetryLimit(), &transactionHandle)
if err := HandleLoonFFIResult(result); err != nil {
return "", fmt.Errorf("failed to begin transaction: %w", err)
}
Expand Down Expand Up @@ -203,7 +221,7 @@ func AddStatsToManifest(
defer C.free(unsafe.Pointer(cBasePath))

var transactionHandle C.LoonTransactionHandle
result := C.loon_transaction_begin(cBasePath, cProperties, C.int64_t(version), 1, &transactionHandle)
result := C.loon_transaction_begin(cBasePath, cProperties, C.int64_t(version), getRetryLimit(), &transactionHandle)
if err := HandleLoonFFIResult(result); err != nil {
return "", fmt.Errorf("failed to begin transaction: %w", err)
}
Expand Down
14 changes: 12 additions & 2 deletions pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,9 @@ type commonConfig struct {
GracefulStopTimeout ParamItem `refreshable:"true"`
ParquetStatsSkipIndex ParamItem `refreshable:"true"`

StorageType ParamItem `refreshable:"false"`
SimdType ParamItem `refreshable:"false"`
StorageType ParamItem `refreshable:"false"`
ManifestTransactionRetryLimit ParamItem `refreshable:"true"`
SimdType ParamItem `refreshable:"false"`

DiskWriteMode ParamItem `refreshable:"true"`
DiskWriteBufferSizeKb ParamItem `refreshable:"true"`
Expand Down Expand Up @@ -642,6 +643,15 @@ This configuration is only used by querynode and indexnode, it selects CPU instr
}
p.StorageType.Init(base.mgr)

p.ManifestTransactionRetryLimit = ParamItem{
Key: "common.storage.manifestTransactionRetryLimit",
Version: "2.6.10",
DefaultValue: "10",
Doc: "Maximum number of retry attempts for V3 storage manifest transaction commits on optimistic concurrency conflicts",
Export: true,
}
p.ManifestTransactionRetryLimit.Init(base.mgr)

p.HighPriorityThreadCoreCoefficient = ParamItem{
Key: "common.threadCoreCoefficient.highPriority",
Version: "2.0.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,15 @@ def _generate_data(self, nb=1000, dim=default_dim, min_elems=3, max_elems=10):
num_elems = rng.randint(min_elems, max_elems)
struct_array = []
for j in range(num_elems):
# Use a distinctive unit vector for row 0, element 0 so it
# never collides with any seed-generated vector and Top-1
# assertions reliably return row 0.
if i == 0 and j == 0:
emb = [1.0] + [0.0] * (dim - 1)
else:
emb = _seed_vector(i * 1000 + j, dim)
struct_array.append({
"embedding": _seed_vector(i * 1000 + j, dim),
"embedding": emb,
"int_val": i * 100 + j,
"str_val": f"row_{i}_elem_{j}",
"float_val": float(i + j * 0.1),
Expand Down Expand Up @@ -1207,8 +1214,12 @@ def _generate_data(self, nb=1000, dim=default_dim):
num_elems = rng.randint(3, 10)
struct_array = []
for j in range(num_elems):
if i == 0 and j == 0:
emb = [1.0] + [0.0] * (dim - 1)
else:
emb = _seed_vector(i * 1000 + j, dim)
struct_array.append({
"embedding": _seed_vector(i * 1000 + j, dim),
"embedding": emb,
"int_val": i * 100 + j,
"str_val": f"row_{i}_elem_{j}",
"float_val": float(i + j * 0.1),
Expand Down Expand Up @@ -2054,8 +2065,12 @@ def _generate_data(self, nb=500, dim=default_dim, add_bool=False):
num_elems = rng.randint(3, 10)
struct_array = []
for j in range(num_elems):
if i == 0 and j == 0:
emb = [1.0] + [0.0] * (dim - 1)
else:
emb = _seed_vector(i * 1000 + j, dim)
elem = {
"embedding": _seed_vector(i * 1000 + j, dim),
"embedding": emb,
"int_val": i * 100 + j,
"str_val": f"row_{i}_elem_{j}",
"float_val": float(i + j * 0.1),
Expand Down Expand Up @@ -3150,8 +3165,12 @@ def _generate_data(self, nb=500, dim=default_dim):
num_elems = rng.randint(3, 10)
struct_array = []
for j in range(num_elems):
if i == 0 and j == 0:
emb = [1.0] + [0.0] * (dim - 1)
else:
emb = _seed_vector(i * 1000 + j, dim)
struct_array.append({
"embedding": _seed_vector(i * 1000 + j, dim),
"embedding": emb,
"int_val": i * 100 + j,
"str_val": f"row_{i}_elem_{j}",
"color": COLORS[j % 3],
Expand Down Expand Up @@ -3500,8 +3519,12 @@ def _generate_data(self, nb=500, dim=default_dim):
num_elems = rng.randint(3, 8)
struct_array = []
for j in range(num_elems):
if i == 0 and j == 0:
emb = [1.0] + [0.0] * (dim - 1)
else:
emb = _seed_vector(i * 1000 + j, dim)
struct_array.append({
"embedding": _seed_vector(i * 1000 + j, dim),
"embedding": emb,
"int_val": i * 100 + j,
"color": COLORS[j % 3],
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,7 @@ def test_milvus_client_timestamptz_add_field_with_default_value(self):
# step 3: query the rows
for row in rows:
row[default_timestamp_field_name] = default_timestamp_value
rows = cf.convert_timestamptz(rows, default_timestamp_field_name, "UTC")
self.query(client, collection_name, filter=f"{default_primary_key_field_name} >= 0",
check_task=CheckTasks.check_query_results,
check_items={exp_res: rows,
Expand Down
Loading