Skip to content

Commit 21ff765

Browse files
ncwyuval-cloudinary
authored andcommitted
rc: add vfs/queue-set-expiry to adjust expiry of items in the VFS queue
1 parent 9a9ae93 commit 21ff765

File tree

5 files changed

+143
-0
lines changed

5 files changed

+143
-0
lines changed

vfs/rc.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/rclone/rclone/fs"
1212
"github.com/rclone/rclone/fs/cache"
1313
"github.com/rclone/rclone/fs/rc"
14+
"github.com/rclone/rclone/vfs/vfscache/writeback"
1415
)
1516

1617
const getVFSHelp = `
@@ -485,3 +486,63 @@ func rcQueue(ctx context.Context, in rc.Params) (out rc.Params, err error) {
485486
}
486487
return vfs.cache.Queue(), nil
487488
}
489+
490+
func init() {
491+
rc.Add(rc.Call{
492+
Path: "vfs/queue-set-expiry",
493+
Title: "Set the expiry time for an item queued for upload.",
494+
Help: strings.ReplaceAll(`
495+
496+
Use this to adjust the |expiry| time for an item in the upload queue.
497+
You will need to read the |id| of the item using |vfs/queue| before
498+
using this call.
499+
500+
You can then set |expiry| to a floating point number of seconds from
501+
now when the item is eligible for upload. If you want the item to be
502+
uploaded as soon as possible then set it to a large negative number (eg
503+
-1000000000). If you want the upload of the item to be delayed
504+
for a long time then set it to a large positive number.
505+
506+
Setting the |expiry| of an item which has already has started uploading
507+
will have no effect - the item will carry on being uploaded.
508+
509+
This will return an error if called with |--vfs-cache-mode| off or if
510+
the |id| passed is not found.
511+
512+
This takes the following parameters
513+
514+
- |fs| - select the VFS in use (optional)
515+
- |id| - a numeric ID as returned from |vfs/queue|
516+
- |expiry| - a new expiry time as floating point seconds
517+
518+
This returns an empty result on success, or an error.
519+
520+
`, "|", "`") + getVFSHelp,
521+
Fn: rcQueueSetExpiry,
522+
})
523+
}
524+
525+
func rcQueueSetExpiry(ctx context.Context, in rc.Params) (out rc.Params, err error) {
526+
vfs, err := getVFS(in)
527+
if err != nil {
528+
return nil, err
529+
}
530+
if vfs.cache == nil {
531+
return nil, rc.NewErrParamInvalid(errors.New("can't call this unless using the VFS cache"))
532+
}
533+
534+
// Read input values
535+
id, err := in.GetInt64("id")
536+
if err != nil {
537+
return nil, err
538+
}
539+
expiry, err := in.GetFloat64("expiry")
540+
if err != nil {
541+
return nil, err
542+
}
543+
544+
// Set expiry
545+
expiryTime := time.Now().Add(time.Duration(float64(time.Second) * expiry))
546+
err = vfs.cache.QueueSetExpiry(writeback.Handle(id), expiryTime)
547+
return nil, err
548+
}

vfs/vfscache/cache.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,11 @@ func (c *Cache) Queue() (out rc.Params) {
177177
return out
178178
}
179179

180+
// QueueSetExpiry updates the expiry of a single item in the upload queue
181+
func (c *Cache) QueueSetExpiry(id writeback.Handle, expiry time.Time) error {
182+
return c.writeback.SetExpiry(id, expiry)
183+
}
184+
180185
// createDir creates a directory path, along with any necessary parents
181186
func createDir(dir string) error {
182187
return file.MkdirAll(dir, 0700)

vfs/vfscache/cache_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -741,3 +741,13 @@ func TestCacheQueue(t *testing.T) {
741741
_, ok := queue.([]writeback.QueueInfo)
742742
require.True(t, ok)
743743
}
744+
745+
func TestCacheQueueSetExpiry(t *testing.T) {
746+
_, c := newTestCache(t)
747+
748+
// Check this returns the correct error when called so we know
749+
// it is plumbed in correctly. The actual tests are done in
750+
// writeback.
751+
err := c.QueueSetExpiry(123123, time.Now())
752+
assert.Equal(t, writeback.ErrorIDNotFound, err)
753+
}

vfs/vfscache/writeback/writeback.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -511,3 +511,26 @@ func (wb *WriteBack) Queue() []QueueInfo {
511511

512512
return items
513513
}
514+
515+
// ErrorIDNotFound is returned from SetExpiry when the item is not found
516+
var ErrorIDNotFound = errors.New("id not found in queue")
517+
518+
// SetExpiry sets the expiry time for an item in the writeback queue.
519+
//
520+
// id should be as returned from the Queue call
521+
//
522+
// If the item isn't found then it will return ErrorIDNotFound
523+
func (wb *WriteBack) SetExpiry(id Handle, expiry time.Time) error {
524+
wb.mu.Lock()
525+
defer wb.mu.Unlock()
526+
527+
wbItem, ok := wb.lookup[id]
528+
if !ok {
529+
return ErrorIDNotFound
530+
}
531+
532+
// Update the expiry with the user requested value
533+
wb.items._update(wbItem, expiry)
534+
wb._resetTimer()
535+
return nil
536+
}

vfs/vfscache/writeback/writeback_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -545,6 +545,50 @@ func TestWriteBackQueue(t *testing.T) {
545545
assert.Equal(t, []QueueInfo{}, queue)
546546
}
547547

548+
func TestWriteBackSetExpiry(t *testing.T) {
549+
wb, cancel := newTestWriteBack(t)
550+
defer cancel()
551+
552+
err := wb.SetExpiry(123123123, time.Now())
553+
assert.Equal(t, ErrorIDNotFound, err)
554+
555+
pi := newPutItem(t)
556+
557+
id := wb.Add(0, "one", 10, true, pi.put)
558+
wbItem := wb.lookup[id]
559+
560+
// get the expiry time with locking so we don't cause races
561+
getExpiry := func() time.Time {
562+
wb.mu.Lock()
563+
defer wb.mu.Unlock()
564+
return wbItem.expiry
565+
}
566+
567+
expiry := time.Until(getExpiry()).Seconds()
568+
assert.Greater(t, expiry, 0.0)
569+
assert.Less(t, expiry, 1.0)
570+
571+
newExpiry := time.Now().Add(100 * time.Second)
572+
require.NoError(t, wb.SetExpiry(wbItem.id, newExpiry))
573+
assert.Equal(t, newExpiry, getExpiry())
574+
575+
// This starts the transfer
576+
newExpiry = time.Now().Add(-100 * time.Second)
577+
require.NoError(t, wb.SetExpiry(wbItem.id, newExpiry))
578+
assert.Equal(t, newExpiry, getExpiry())
579+
580+
<-pi.started
581+
582+
expiry = time.Until(getExpiry()).Seconds()
583+
assert.LessOrEqual(t, expiry, -100.0)
584+
585+
pi.finish(nil) // transfer successful
586+
waitUntilNoTransfers(t, wb)
587+
588+
expiry = time.Until(getExpiry()).Seconds()
589+
assert.LessOrEqual(t, expiry, -100.0)
590+
}
591+
548592
// Test queuing more than fs.Config.Transfers
549593
func TestWriteBackMaxQueue(t *testing.T) {
550594
ctx := context.Background()

0 commit comments

Comments
 (0)