-
Notifications
You must be signed in to change notification settings - Fork 213
@tus/server: add GCS locker #616
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
netdown
wants to merge
21
commits into
tus:main
Choose a base branch
from
netdown:feat-gcs-locker
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 2 commits
Commits
Show all changes
21 commits
Select commit
Hold shift + click to select a range
667c849
add gcs locker
netdown 3804071
fix meta
netdown 6721adb
rework
netdown 8982256
fix
netdown d4cecf7
Merge branch 'main' into feat-gcs-locker
Murderlon c1e399d
Revert @google-cloud/storage major upgrade
Murderlon cc97479
Move GCSLocker into @tus/gcs-store
Murderlon 5a556cf
Use node:timers/promises
Murderlon b70e666
Upgrade @google-cloud/storage again and fix errors
Murderlon e98a459
Merge branch 'main' into feat-gcs-locker
Murderlon 05772bd
Fix after merge
Murderlon 0ce3a90
Fix
Murderlon dec6d39
Merge branch 'main' into feat-gcs-locker
Murderlon 88826ab
Handle newly passed abort signal
Murderlon e579e3a
Fix
Murderlon 167e49c
Reinit package-lock.json
Murderlon 4f54141
Fix type error
Murderlon b5e0bfb
lint
Murderlon 823076b
Add tests and fix implementation
Murderlon 6596f91
fixup! Add tests and fix implementation
Murderlon e627065
put back recursive GCSLock.take()
Murderlon File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,251 @@ | ||
import {ERRORS, Lock, Locker, RequestRelease} from '@tus/utils' | ||
import {Bucket, File} from '@google-cloud/storage' | ||
import EventEmitter from 'node:events' | ||
|
||
/** | ||
* Google Cloud Storage implementation of the Locker mechanism with support for distribution. | ||
* For general information regarding Locker, see MemoryLocker. | ||
* | ||
* Locking is based on separate .lock files created in a GCS bucket (presumably the same as the upload destination, but not necessarily). Locker distribution is accomplished through metadata of the lockfile. After a lock file is created, we regularly check if its metadata was modified by another process (i.e. to request releasing the resource). To avoid resources being locked forever, each lock is created with an expiration time (also stored as metadata). | ||
* | ||
* Lock file health - possible states of a lock file: | ||
* - non-existing (not locked) | ||
* - active (locked) | ||
* - requested to be released (locked, but should be released soon) | ||
* - expired (not locked) | ||
* | ||
* Acquiring a lock: | ||
* - If the lock file does not exist yet, create one with an expiration time and start watching it (see below) | ||
* - If the lock file already exists | ||
* -- If it has expired, treat it as non existing and overwrite it | ||
* -- If it is active, request releasing the resource by updating the lockfile's metadata, then retry locking with an exponential backoff | ||
* | ||
* Releasing a lock: | ||
* Stop the watcher and delete the lock file. | ||
* | ||
* Watching a lock (performed in every `watchInterval` ms): | ||
* - If the lock file does not exist anymore, stop the watcher | ||
* - If the lock file still exists, fetch its metadata | ||
* -- If there is a release request in the metadata, call the cancel handler and stop the watcher | ||
* -- If the lock has expired, call the cancel handler and stop the watcher | ||
* | ||
* (The process might be improved by introducing a secondary expiration time which gets updated by each watcher interval. This way we'll immediately know if the process which locked the resource has unexpectedly terminated and the resource should be released. Currently, only the `unlockTimeout` catches this scenario. However, this would introduce way more requests to GCS only for better handling of an extraordinary situation.) | ||
* | ||
*/ | ||
|
||
export interface GCSLockerOptions { | ||
/** | ||
* The bucket where the lock file will be created. No need to match the upload destination bucket. | ||
*/ | ||
bucket: Bucket | ||
/** | ||
* Maximum time (in milliseconds) to wait for an already existing lock to be released, else deny acquiring the lock. | ||
*/ | ||
acquireLockTimeout?: number | ||
/** | ||
* Maximum lifetime (in milliseconds) of a lock. Processes may unexpectedly quit, we need to make sure resources won't stay locked forever. Make sure this is a safe maximum, else the lock may be released while the resource is still being used. | ||
*/ | ||
unlockTimeout?: number | ||
/** | ||
* The amount of time (in milliseconds) to wait between lock file health checks. Larger interval results less requests to GCS, but generally more time to release a locked resource. Must be less than `acquireLockTimeout`. | ||
*/ | ||
watchInterval?: number | ||
} | ||
|
||
export class GCSLocker implements Locker { | ||
events: EventEmitter | ||
bucket: Bucket | ||
lockTimeout: number | ||
unlockTimeout: number | ||
watchInterval: number | ||
|
||
constructor(options: GCSLockerOptions) { | ||
this.events = new EventEmitter() | ||
this.bucket = options.bucket | ||
this.lockTimeout = options.acquireLockTimeout ?? 1000 * 30 | ||
this.unlockTimeout = options.unlockTimeout ?? 1000 * 600 | ||
this.watchInterval = options.watchInterval ?? 1000 * 10 | ||
|
||
if (this.watchInterval < this.lockTimeout) { | ||
throw new Error('watchInterval must be less than acquireLockTimeout') | ||
} | ||
} | ||
|
||
newLock(id: string) { | ||
return new GCSLock(id, this) | ||
} | ||
} | ||
|
||
class GCSLock implements Lock { | ||
constructor(private id: string, private locker: GCSLocker) {} | ||
|
||
async lock(requestRelease: RequestRelease): Promise<void> { | ||
const abortController = new AbortController() | ||
|
||
const lock = await Promise.race([ | ||
this.waitForLockTimeoutOrAbort(abortController.signal), | ||
this.acquireLock(requestRelease, abortController.signal), | ||
]) | ||
|
||
abortController.abort() | ||
|
||
if (!lock) { | ||
throw ERRORS.ERR_LOCK_TIMEOUT | ||
} | ||
} | ||
|
||
async unlock(): Promise<void> { | ||
const lockFile = new GCSLockFile(this.locker, this.id) | ||
|
||
if (!(await lockFile.isLocked())) { | ||
throw new Error('Releasing an unlocked lock!') | ||
} | ||
|
||
await lockFile.delete() | ||
} | ||
|
||
protected async acquireLock( | ||
cancelHandler: RequestRelease, | ||
signal: AbortSignal, | ||
attempt = 0 | ||
): Promise<boolean> { | ||
if (signal.aborted) { | ||
return false | ||
} | ||
|
||
const lockFile = new GCSLockFile(this.locker, this.id) | ||
|
||
if (!(await lockFile.isLocked())) { | ||
//The id is not locked yet - create a new lock file on GCS, start watching it | ||
await lockFile.write(cancelHandler) | ||
|
||
return true | ||
} else { | ||
//The id is already locked, we need to request releasing the resource | ||
await lockFile.requestRelease() | ||
|
||
//Try to acquire the lock again | ||
return await new Promise((resolve, reject) => { | ||
//On the first attempt, retry after current I/O operations are done, else use an exponential backoff | ||
const waitFn = (then: () => void) => | ||
attempt > 0 | ||
? setTimeout(then, (attempt * this.locker.watchInterval) / 3) | ||
: setImmediate(then) | ||
|
||
waitFn(() => { | ||
this.acquireLock(cancelHandler, signal, attempt + 1) | ||
.then(resolve) | ||
.catch(reject) | ||
}) | ||
}) | ||
} | ||
} | ||
|
||
protected waitForLockTimeoutOrAbort(signal: AbortSignal) { | ||
return new Promise<boolean>((resolve) => { | ||
Murderlon marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
const timeout = setTimeout(() => { | ||
resolve(false) | ||
}, this.locker.lockTimeout) | ||
|
||
const abortListener = () => { | ||
clearTimeout(timeout) | ||
signal.removeEventListener('abort', abortListener) | ||
resolve(false) | ||
} | ||
signal.addEventListener('abort', abortListener) | ||
}) | ||
} | ||
} | ||
|
||
class GCSLockFile { | ||
protected fileId: string | ||
protected lockFile: File | ||
protected unlockTimeout: number | ||
protected watchInterval: number | ||
protected watcher: NodeJS.Timeout | undefined | ||
|
||
constructor(locker: GCSLocker, fileId: string) { | ||
this.fileId = fileId | ||
this.lockFile = locker.bucket.file(`${fileId}.lock`) | ||
this.unlockTimeout = locker.unlockTimeout | ||
this.watchInterval = locker.watchInterval | ||
} | ||
|
||
/** | ||
* Check whether the resource is currently locked or not | ||
*/ | ||
public async isLocked() { | ||
//Check if file exists | ||
const exists = (await this.lockFile.exists())[0] | ||
if (!exists) { | ||
return false | ||
} | ||
|
||
//Check if file is not expired | ||
if (await this.hasExpired()) { | ||
return false | ||
} | ||
|
||
return true | ||
} | ||
|
||
/** | ||
* Write (create or update) the lockfile and start the watcher | ||
*/ | ||
public async write(cancelHandler: RequestRelease) { | ||
await this.lockFile.save('', {metadata: {exp: Date.now() + this.unlockTimeout}}) | ||
|
||
this.startWatcher(cancelHandler) | ||
} | ||
|
||
/** | ||
* Delete the lockfile and stop the watcher | ||
*/ | ||
public async delete() { | ||
clearInterval(this.watcher) | ||
await this.lockFile.delete() | ||
} | ||
|
||
/** | ||
* Request the release of the related resource | ||
*/ | ||
public async requestRelease() { | ||
await this.lockFile.setMetadata({unlockRequest: 1}) | ||
} | ||
|
||
/** | ||
* Check if the lockfile has already expired | ||
*/ | ||
protected async hasExpired(meta?: File['metadata']) { | ||
if (!meta) { | ||
try { | ||
meta = (await this.lockFile.getMetadata())[0] | ||
} catch (err) { | ||
return true | ||
} | ||
} | ||
const expDate = Date.parse(meta.timeCreated || '') | ||
return !expDate || expDate < Date.now() | ||
} | ||
|
||
/** | ||
* Start watching a lock file's health | ||
*/ | ||
protected startWatcher(cancelHandler: RequestRelease) { | ||
this.watcher = setInterval(async () => { | ||
if ((await this.lockFile.exists())[0]) { | ||
//Fetch lock metadata | ||
const meta = (await this.lockFile.getMetadata())[0] | ||
|
||
//Unlock if release was requested or unlock timed out | ||
if ('unlockRequest' in meta || (await this.hasExpired(meta))) { | ||
cancelHandler() | ||
clearInterval(this.watcher) | ||
} | ||
} else { | ||
//Lock is freed, terminate watcher | ||
clearInterval(this.watcher) | ||
} | ||
}, this.watchInterval) | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1,2 @@ | ||
export * from './MemoryLocker' | ||
export * from './GCSLocker' |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be nice if it also added random jitter.