Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 95 additions & 14 deletions component/size_tracker/size_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,21 @@ type SizeTracker struct {
internal.BaseComponent
mountSize *MountSize
totalBucketCapacity uint64
displayCapacity uint64
serverCount uint64
evictionMode EvictionMode
bucketUsage uint64
statSizeOffset uint64
}

type EvictionMode int

const (
Normal EvictionMode = iota // 0
Overuse // 1
Emergency // 2
)

type SizeTrackerOptions struct {
JournalName string `config:"journal-name" yaml:"journal-name,omitempty"`
TotalBucketCapacity uint64 `config:"bucket-capacity-fallback" yaml:"bucket-capacity-fallback,omitempty"`
Expand All @@ -53,7 +66,13 @@ type SizeTrackerOptions struct {
const compName = "size_tracker"
const blockSize = int64(4096)
const defaultJournalName = "mount_size.dat"
const evictionThreshold = 0.95

// these usage thresholds determine when the eviction mode changes
const targetUtilization = 0.9 // 90%
const hysteresisMargin = 0.02 // 2%
const overuseThreshold = targetUtilization + hysteresisMargin // 92%
const emergencyThreshold = overuseThreshold + .05 // 97%
const bucketNormalizedThreshold = targetUtilization - hysteresisMargin // 88%

var _ internal.Component = &SizeTracker{}

Expand Down Expand Up @@ -99,7 +118,24 @@ func (st *SizeTracker) Configure(_ bool) error {
return fmt.Errorf("SizeTracker: config error [invalid config attributes]")
}

st.totalBucketCapacity = conf.TotalBucketCapacity * common.MbToBytes
if conf.TotalBucketCapacity != 0 {
// TODO: document these units
st.totalBucketCapacity = conf.TotalBucketCapacity * common.MbToBytes
// set display capacity
st.displayCapacity = st.totalBucketCapacity
if config.IsSet("libfuse.display-capacity-mb") {
var confDisplayCapacityMb uint64
err = config.UnmarshalKey("libfuse.display-capacity-mb", &confDisplayCapacityMb)
if err == nil {
st.displayCapacity = confDisplayCapacityMb * common.MbToBytes
} else {
log.Err("SizeTracker::Configure : Invalid display capacity")
}
}
// calculate server count
// round to the nearest whole number
st.serverCount = (st.totalBucketCapacity + st.displayCapacity/2) / st.displayCapacity
}

journalName := defaultJournalName
if config.IsSet(compName + ".journal-name") {
Expand Down Expand Up @@ -262,20 +298,44 @@ func (st *SizeTracker) StatFs() (*common.Statfs_t, bool, error) {

if err == nil && ret {
// Custom logic for use with Nx Plugin
// If the user is over the capacity limit set by Nx, then we need to prevent them from
// accidental overuse of their bucket. So we change our reporting to instead report
// the used capacity of the bucket to enable the VMS to start eviction
if float64(
stat.Blocks*uint64(blockSize),
) > evictionThreshold*float64(
st.totalBucketCapacity,
) {
log.Warn(
"SizeTracker::StatFs : changing from size_tracker size to S3 bucket size due to overuse of bucket",
)
blocks = stat.Blocks
// The Nx VMS evicts data until utilization is at 90% (of display capacity)
// Use a size offset to show the Nx eviction threshold at our desired utilization
// Only update the offset when bucket usage is updated
returnedBucketUsage := stat.Blocks * uint64(blockSize)
isBucketUsageUpdated := returnedBucketUsage != st.bucketUsage
if isBucketUsageUpdated {
// record the updated usage
st.bucketUsage = returnedBucketUsage
// convert everything to float64
bucketCapacity := float64(st.totalBucketCapacity)
bucketUsage := float64(returnedBucketUsage)
displayCapacity := float64(st.displayCapacity)
serverUsage := float64(st.mountSize.GetSize())
serverCount := float64(st.serverCount)
sizeOffset := float64(st.statSizeOffset)
nxEvictionThreshold := targetUtilization * displayCapacity
intendedCapacity := bucketCapacity / serverCount
// Use a finite state machine. The evictionMode is the state.
// calculate bucket usage and update eviction mode accordingly
st.updateState(bucketUsage / bucketCapacity)
switch st.evictionMode {
case Normal:
// the server count starts as the bucket capacity divided by the display capacity
// if the server count has been incremented, offset the tracked size
sizeOffset = nxEvictionThreshold - targetUtilization*intendedCapacity
case Overuse:
// drive to a utilization target below the bucketNormalizedThreshold
normalizationTargetFactor := bucketNormalizedThreshold - hysteresisMargin
sizeOffset = nxEvictionThreshold - normalizationTargetFactor*intendedCapacity
case Emergency:
// just report the whole bucket usage
sizeOffset = bucketUsage - serverUsage
}
st.statSizeOffset = uint64(max(0, sizeOffset))
}
}
// add the offset
blocks += st.statSizeOffset / uint64(blockSize)
}

stat := common.Statfs_t{
Expand Down Expand Up @@ -303,6 +363,27 @@ func (st *SizeTracker) StatFs() (*common.Statfs_t, bool, error) {
return &stat, true, nil
}

func (st *SizeTracker) updateState(bucketUsageFactor float64) {
switch st.evictionMode {
case Normal:
if bucketUsageFactor > overuseThreshold {
st.evictionMode = Overuse
}
case Overuse:
if bucketUsageFactor < bucketNormalizedThreshold {
st.evictionMode = Normal
} else if bucketUsageFactor > emergencyThreshold {
st.evictionMode = Emergency
// severe overuse strongly suggests an incorrect server count
st.serverCount++
}
case Emergency:
if bucketUsageFactor < bucketNormalizedThreshold {
st.evictionMode = Normal
}
}
}

func (st *SizeTracker) CommitData(opt internal.CommitDataOptions) error {
log.Trace("SizeTracker::CopyFromFile : %s", opt.Name)
var origSize int64
Expand Down
Loading
Loading