Skip to content

Commit 52c1bf5

Browse files
[taskserver] Handle concurrent uploads using a counter and locking mechanism and add new configs (TASK_SERVER_V2) (#3733)
## What changes were proposed in this pull request? - Handle concurrent uploads using a counter and locking mechnaism. The "upload_available_space" key is set in Redis when Hue boots up. It calculates the actual available /tmp space and stores it. Whenever a new file upload is started, a new key is created in the format "upload_<unique_uuid_hash>" and stores the upload file size in Redis. Also, a second key in the format "upload_<same_uuid>_timestamp" (stores the timestamp) is created, and we reduce the "upload_available_space" value by the new upload file size. When the upload completes, the "upload_<unique_uuid_hash>" and "upload_<same_uuid>_timestamp" keys are deleted, and the reserved space is given back to "upload_available_space". Assume we have 1GB free space: - `upload_available_space = 1GB` When file1 (100MB) upload is triggered: - `upload_<uuid1> = 100MB` - `upload_<uuid1>_timestamp = <timestamp>` - `upload_available_space = 1GB - 100MB = 900MB` When file2 (100MB) upload is triggered: - `upload_<uuid2> = 100MB` - `upload_<uuid2>_timestamp = <timestamp>` - `upload_available_space = 900MB - 100MB = 800MB` When the uploads are complete, the keys (`upload_<uuid1>`, `upload_<uuid1>_timestamp`) and (`upload_<uuid2>`, `upload_<uuid2>_timestamp`) are deleted, and `upload_available_space` is updated back: - `upload_available_space = 800MB + 100MB + 100MB = 1GB` In case of failed uploads, the same process is repeated for each retry made by the user. However, we would have leftover `upload_` keys for each retry since the keys will only be deleted on a successful upload. So we run a periodic job `cleanup_stale_uploads` to clean up these keys. This job runs every `CLEANUP_STALE_UPLOADS_IN_REDIS_PERIODIC_INTERVAL` minutes and deletes `upload_*` keys if the timestamp difference is greater than 60 minutes. - Moved task server configs to TASK_SERVER_V2. - Moved reserve, release upload space methods to filebrowser utils. New configs for periodic scheduling under task server - Display max_file_upload_size_limit on upload modal. Parse redis broker url from configs. - Changed task server configuration parsing from awk to grep. Set autorestart to false in redis and celery template. - Show task_server tab in Admin Server, based on task_server_v2 configs - Moving Uploaded chunk log message to Debug from Info - Setting celery default log level to info - Pull timezone from hue.ini. /tmp_cleaner job checks the timestamp of each file and deletes it based on timedelta=60mins ## How was this patch tested? Tested on local machine and using docker builds. Change-Id: Id167701873d10426c7f6e5064b3feb731966b2a7 Co-authored-by: Athithyaa Selvam <[email protected]>
1 parent d32163f commit 52c1bf5

File tree

30 files changed

+1013
-512
lines changed

30 files changed

+1013
-512
lines changed

apps/filebrowser/src/filebrowser/conf.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,8 @@
1818
import os
1919
import sys
2020

21-
from desktop.conf import ENABLE_DOWNLOAD
21+
from desktop.conf import ENABLE_DOWNLOAD, is_oozie_enabled
2222
from desktop.lib.conf import Config, coerce_bool
23-
from desktop.conf import is_oozie_enabled
2423

2524
if sys.version_info[0] > 2:
2625
from django.utils.translation import gettext_lazy as _
@@ -31,7 +30,7 @@
3130
key="max_snappy_decompression_size",
3231
help=_("Max snappy decompression size in bytes."),
3332
private=True,
34-
default=1024*1024*25,
33+
default=1024 * 1024 * 25,
3534
type=int)
3635

3736
ARCHIVE_UPLOAD_TEMPDIR = Config(
@@ -52,10 +51,12 @@
5251
type=int,
5352
help=_('Configure the maximum number of concurrent connections(chunks) for file uploads using the chunked file uploader.'))
5453

54+
5555
def get_desktop_enable_download():
5656
"""Get desktop enable_download default"""
5757
return ENABLE_DOWNLOAD.get()
5858

59+
5960
SHOW_DOWNLOAD_BUTTON = Config(
6061
key="show_download_button",
6162
help=_("whether to show the download button in hdfs file browser."),
@@ -78,7 +79,7 @@ def get_desktop_enable_download():
7879

7980
REDIRECT_DOWNLOAD = Config(
8081
key="redirect_download",
81-
help=_("Redirect client to WebHdfs or S3 for file download. Note: Turning this on will "\
82+
help=_("Redirect client to WebHdfs or S3 for file download. Note: Turning this on will "
8283
"override notebook/redirect_whitelist for user selected file downloads on WebHdfs & S3."),
8384
type=coerce_bool,
8485
default=False)
@@ -96,6 +97,11 @@ def get_desktop_enable_download():
9697
help=_('A limit on a file size (bytes) that can be uploaded to a filesystem. '
9798
'A value of -1 means there will be no limit.'))
9899

100+
101+
def max_file_size_upload_limit():
102+
return MAX_FILE_SIZE_UPLOAD_LIMIT.get()
103+
104+
99105
FILE_DOWNLOAD_CACHE_CONTROL = Config(
100106
key="file_download_cache_control",
101107
type=str,

apps/filebrowser/src/filebrowser/tasks.py

Lines changed: 101 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -14,26 +14,29 @@
1414
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1515
# See the License for the specific language governing permissions and
1616
# limitations under the License.
17-
import logging
18-
import subprocess
1917
import os
20-
2118
import shutil
22-
import psutil
19+
import logging
20+
import subprocess
2321
from datetime import datetime, timedelta
2422

25-
from celery.utils.log import get_task_logger
23+
import pytz
24+
import psutil
2625
from celery import states
27-
from django.db import transaction
26+
from celery.utils.log import get_task_logger
2827
from django.http import HttpRequest
2928
from django.utils import timezone
3029

3130
from desktop.auth.backend import rewrite_user
3231
from desktop.celery import app
33-
from desktop.conf import TASK_SERVER
32+
from desktop.conf import TASK_SERVER_V2
3433
from desktop.lib import fsmanager
35-
from useradmin.models import User
34+
from filebrowser.utils import release_reserved_space_for_file_uploads, reserve_space_for_file_uploads
35+
36+
if hasattr(TASK_SERVER_V2, 'get') and TASK_SERVER_V2.ENABLED.get():
37+
from desktop.settings import TIME_ZONE, initialize_free_disk_space_in_redis, parse_broker_url
3638
from filebrowser.views import UPLOAD_CLASSES
39+
from useradmin.models import User
3740

3841
LOG_TASK = get_task_logger(__name__)
3942
LOG = logging.getLogger()
@@ -52,24 +55,34 @@
5255
states.IGNORED: 'ignored'
5356
}
5457

58+
5559
@app.task
5660
def error_handler(request, exc, traceback):
5761
print('Task {0} raised exception: {1!r}\n{2!r}'.format(
5862
request.id, exc, traceback))
5963

64+
6065
@app.task()
6166
def upload_file_task(**kwargs):
6267
task_id = kwargs.get("qquuid")
68+
file_size = kwargs.get("qqtotalfilesize")
6369
user_id = kwargs["user_id"]
6470
scheme = kwargs["scheme"]
6571
postdict = kwargs.get("postdict", None)
6672
request = _get_request(postdict=postdict, user_id=user_id, scheme=scheme)
6773
kwargs["username"] = request.user.username
6874
kwargs["task_name"] = "fileupload"
6975
kwargs["state"] = "STARTED"
70-
now = timezone.now()
71-
kwargs["task_start"] = now.strftime("%Y-%m-%dT%H:%M:%S")
76+
kwargs["task_start"] = timezone.now().astimezone(pytz.timezone(TIME_ZONE)).strftime("%Y-%m-%dT%H:%M:%S")
77+
kwargs["started"] = timezone.now().astimezone(pytz.timezone(TIME_ZONE)).strftime("%Y-%m-%dT%H:%M:%S")
7278
upload_file_task.update_state(task_id=task_id, state='STARTED', meta=kwargs)
79+
80+
# Reserve space for upload
81+
if not reserve_space_for_file_uploads(task_id, file_size):
82+
kwargs["state"] = "FAILURE"
83+
upload_file_task.update_state(task_id=task_id, state='FAILURE', meta=kwargs)
84+
raise Exception("Insufficient space for upload")
85+
7386
try:
7487
upload_class = UPLOAD_CLASSES.get(kwargs["scheme"])
7588
_fs = upload_class(request, args=[], **kwargs)
@@ -82,11 +95,13 @@ def upload_file_task(**kwargs):
8295
raise Exception(f"Upload failed %s" % err)
8396

8497
kwargs["state"] = "SUCCESS"
85-
kwargs["started"] = now.strftime("%Y-%m-%dT%H:%M:%S")
86-
kwargs["task_end"] = now.strftime("%Y-%m-%dT%H:%M:%S")
98+
kwargs["task_end"] = timezone.now().astimezone(pytz.timezone(TIME_ZONE)).strftime("%Y-%m-%dT%H:%M:%S.%f")
99+
kwargs["progress"] = "100%"
87100
upload_file_task.update_state(task_id=task_id, state='SUCCESS', meta=kwargs)
101+
release_reserved_space_for_file_uploads(task_id)
88102
return None
89103

104+
90105
def _get_request(postdict=None, user_id=None, scheme=None):
91106
request = HttpRequest()
92107
request.POST = postdict
@@ -100,19 +115,19 @@ def _get_request(postdict=None, user_id=None, scheme=None):
100115

101116
return request
102117

118+
103119
@app.task()
104120
def document_cleanup_task(**kwargs):
105121
keep_days = kwargs.get('keep_days')
106122
task_id = kwargs.get("qquuid")
107-
now = timezone.now()
108123

109124
kwargs["username"] = kwargs["username"]
110125
kwargs["task_name"] = "document_cleanup"
111126
kwargs["state"] = "STARTED"
112127
kwargs["parameters"] = keep_days
113128
kwargs["task_id"] = task_id
114129
kwargs["progress"] = "0%"
115-
kwargs["task_start"] = now.strftime("%Y-%m-%dT%H:%M:%S")
130+
kwargs["task_start"] = timezone.now().astimezone(pytz.timezone(TIME_ZONE)).strftime("%Y-%m-%dT%H:%M:%S")
116131

117132
try:
118133
INSTALL_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..', '..'))
@@ -121,63 +136,109 @@ def document_cleanup_task(**kwargs):
121136
kwargs["state"] = "SUCCESS"
122137
LOG.info(f"Document_cleanup_task completed successfully.")
123138
except Exception as err:
139+
kwargs["state"] = "FAILURE"
124140
document_cleanup_task.update_state(task_id=task_id, state='FAILURE', meta=kwargs)
125141
raise Exception(f"Upload failed %s" % err)
126142

127143
kwargs["state"] = "SUCCESS"
128144
kwargs["progress"] = "100%"
129-
kwargs["task_end"] = now.strftime("%Y-%m-%dT%H:%M:%S")
145+
kwargs["task_end"] = timezone.now().astimezone(pytz.timezone(TIME_ZONE)).strftime("%Y-%m-%dT%H:%M:%S.%f")
130146
document_cleanup_task.update_state(task_id=task_id, state='SUCCESS', meta=kwargs)
131147

132148
return None
133149

150+
134151
@app.task()
135152
def check_disk_usage_and_clean_task(**kwargs):
136153
task_id = kwargs.get("qquuid")
137-
default_cleanup_threshold = 90
138-
cleanup_threshold = kwargs.get('cleanup_threshold', default_cleanup_threshold)
154+
cleanup_threshold = kwargs.get('cleanup_threshold', TASK_SERVER_V2.DISK_USAGE_CLEANUP_THRESHOLD)
139155
username = kwargs.get("username", "celery_scheduler")
140-
now = timezone.now()
156+
now = timezone.now().astimezone(pytz.timezone(TIME_ZONE))
141157
kwargs = {
142158
"username": username,
143159
"task_name": "tmp_cleanup",
144160
"task_id": task_id,
145161
"parameters": cleanup_threshold,
146162
"progress": "0%",
147-
"task_start": now.strftime("%Y-%m-%dT%H:%M:%S")
163+
"task_start": now.strftime("%Y-%m-%dT%H:%M:%S.%f"),
148164
}
149165

166+
check_disk_usage_and_clean_task.update_state(task_id=task_id, state=states.STARTED, meta=kwargs)
167+
168+
def delete_old_files(directory, current_time, time_delta):
169+
for root, dirs, files in os.walk(directory):
170+
for file in files:
171+
file_path = os.path.join(root, file)
172+
try:
173+
file_modified_time = datetime.fromtimestamp(os.path.getmtime(file_path))
174+
file_modified_time = timezone.make_aware(file_modified_time, timezone.get_default_timezone())
175+
if file_modified_time < current_time - time_delta:
176+
if os.path.isfile(file_path) or os.path.islink(file_path):
177+
os.unlink(file_path)
178+
LOG.info(f"Deleted file {file_path}")
179+
except PermissionError:
180+
LOG.warning(f"Permission denied: unable to delete {file_path}")
181+
except Exception as err:
182+
check_disk_usage_and_clean_task.update_state(task_id=task_id, state=states.FAILURE, meta=kwargs)
183+
raise Exception(f"Failed to delete {file_path}. Reason: {err}")
184+
finally:
185+
kwargs["task_end"] = timezone.now().astimezone(pytz.timezone(TIME_ZONE)).strftime("%Y-%m-%dT%H:%M:%S.%f")
186+
150187
disk_usage = psutil.disk_usage('/')
151188
if disk_usage.percent >= int(cleanup_threshold):
152189
LOG.info(f"Disk usage is above {cleanup_threshold}%, cleaning up /tmp directory...")
153190
tmp_dir = '/tmp'
154-
for filename in os.listdir(tmp_dir):
155-
file_path = os.path.join(tmp_dir, filename)
156-
try:
157-
file_modified_time = datetime.fromtimestamp(os.path.getmtime(file_path))
158-
# Make file_modified_time timezone-aware
159-
file_modified_time = timezone.make_aware(file_modified_time, timezone.get_default_timezone())
160-
if file_modified_time < now - timedelta(minutes=15):
161-
if os.path.isfile(file_path) or os.path.islink(file_path):
162-
os.unlink(file_path)
163-
elif os.path.isdir(file_path):
164-
shutil.rmtree(file_path)
165-
LOG.info(f"Deleted {file_path}")
166-
except PermissionError:
167-
LOG.warning(f"Permission denied: unable to delete {file_path}")
168-
except Exception as err:
169-
check_disk_usage_and_clean_task.update_state(task_id=task_id, state='FAILURE', meta=kwargs)
170-
raise Exception(f"Failed to delete {file_path}. Reason: {err}")
171-
191+
delete_old_files(tmp_dir, now, timedelta(minutes=TASK_SERVER_V2.DISK_USAGE_AND_CLEAN_TASK_TIME_DELTA.get()))
172192
kwargs["progress"] = "100%"
173-
check_disk_usage_and_clean_task.update_state(task_id=task_id, state='SUCCESS', meta=kwargs)
193+
kwargs["task_end"] = timezone.now().astimezone(pytz.timezone(TIME_ZONE)).strftime("%Y-%m-%dT%H:%M:%S.%f")
194+
check_disk_usage_and_clean_task.update_state(task_id=task_id, state=states.SUCCESS, meta=kwargs)
174195
LOG.info("/tmp directory cleaned.")
175-
176196
else:
177197
kwargs["progress"] = "100%"
178-
check_disk_usage_and_clean_task.update_state(task_id=task_id, state='SUCCESS', meta=kwargs)
198+
kwargs["task_end"] = timezone.now().astimezone(pytz.timezone(TIME_ZONE)).strftime("%Y-%m-%dT%H:%M:%S.%f")
199+
check_disk_usage_and_clean_task.update_state(task_id=task_id, state=states.SUCCESS, meta=kwargs)
179200
LOG.info(f"Disk usage is {disk_usage.percent}%, no need to clean up.")
180201

181202
# Get available disk space after cleanup
182203
free_space = psutil.disk_usage('/tmp').free
183204
return {'free_space': free_space}
205+
206+
207+
@app.task()
208+
def cleanup_stale_uploads(**kwargs):
209+
timedelta_minutes = kwargs.get("timeout_minutes", TASK_SERVER_V2.CLEANUP_STALE_UPLOADS_TASK_TIME_DELTA.get())
210+
username = kwargs.get("username", "celery_scheduler")
211+
task_id = kwargs.get("qquuid")
212+
kwargs = {
213+
"username": username,
214+
"task_name": "cleanup_stale_uploads",
215+
"task_id": task_id,
216+
"parameters": timedelta_minutes,
217+
"progress": "0%",
218+
"task_start": timezone.now().astimezone(pytz.timezone(TIME_ZONE)).strftime("%Y-%m-%dT%H:%M:%S"),
219+
}
220+
cleanup_stale_uploads.update_state(task_id=task_id, state=states.STARTED, meta=kwargs)
221+
redis_client = parse_broker_url(TASK_SERVER_V2.BROKER_URL.get())
222+
try:
223+
current_time = int(datetime.now().timestamp())
224+
225+
for key in redis_client.scan_iter('upload__*'):
226+
timestamp_key = f'{key.decode()}_timestamp'
227+
timestamp = redis_client.get(timestamp_key)
228+
229+
if timestamp:
230+
timestamp = int(timestamp)
231+
if current_time - timestamp > timedelta_minutes * 60:
232+
file_size = int(redis_client.get(key))
233+
redis_client.incrby('upload_available_space', file_size)
234+
redis_client.delete(key)
235+
redis_client.delete(timestamp_key)
236+
initialize_free_disk_space_in_redis()
237+
kwargs["progress"] = "100%"
238+
kwargs["task_end"] = timezone.now().astimezone(pytz.timezone(TIME_ZONE)).strftime("%Y-%m-%dT%H:%M:%S.%f")
239+
cleanup_stale_uploads.update_state(task_id=task_id, state=states.SUCCESS, meta=kwargs)
240+
except Exception as e:
241+
cleanup_stale_uploads.update_state(task_id=task_id, state=states.FAILURE, meta={"error": str(e)})
242+
LOG.exception("Failed to cleanup stale uploads: %s", str(e))
243+
finally:
244+
redis_client.close()

apps/filebrowser/src/filebrowser/templates/listdir.mako

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ ${ fb_components.menubar() }
168168
<a class="btn fileToolbarBtn" title="${_('Upload files')}" data-bind="visible: !inTrash(), css: {'disabled': isS3Root()}, click: function(){ if (!isS3Root()) { uploadFile(false) }}"><i class="fa fa-arrow-circle-o-up"></i> ${_('Upload')}</a>
169169
<!-- /ko -->
170170
<!-- ko if: isTaskServerEnabled -->
171-
<a class="btn fileToolbarBtn" title="${_('Select a file to upload. The file will first be saved locally and then automatically transferred to the designated file system (e.g., S3, Azure) in the background. The upload modal closes immediately after the file is queued, allowing you to continue working. A notification, \'File upload scheduled. Please check the task server page for progress,\' will confirm the upload has started. This feature is especially useful for large files, as it eliminates the need to wait for the upload to complete.')}" data-bind="visible: !inTrash(), css: {'disabled': isS3Root()}, click: function(){ if (!isS3Root()) { uploadFile(true) }}"><i class="fa fa-arrow-circle-o-up"></i> ${_('Schedule Upload')}</a>
171+
<a class="btn fileToolbarBtn" title="${_('Select a file to upload. The file will first be saved locally and then automatically transferred to the designated file system (e.g., S3, Azure) in the background. The upload modal closes immediately after the file is queued, allowing you to continue working. A notification, \'File upload scheduled. Please check the task server page for progress,\' will confirm the upload has started. This feature is especially useful for large files, as it eliminates the need to wait for the upload to complete.')}" data-bind="visible: !inTrash(), css: {'disabled': isS3Root()}, click: function(){ if (!isS3Root()) { checkAndDisplayAvailableSpace(); uploadFile(true); }}"><i class="fa fa-arrow-circle-o-up"></i> ${_('Schedule Upload')}</a>
172172
<!-- /ko -->
173173
<!-- /ko -->
174174
<!-- ko if: isGS -->
@@ -179,7 +179,7 @@ ${ fb_components.menubar() }
179179
<a class="btn fileToolbarBtn" title="${_('Upload files')}" data-bind="visible: !inTrash(), css: {'disabled': isABFSRoot()}, click: function(){ if (!isABFSRoot()) { uploadFile(false) }}"><i class="fa fa-arrow-circle-o-up"></i> ${_('Upload')}</a>
180180
<!-- /ko -->
181181
<!-- ko if: isTaskServerEnabled -->
182-
<a class="btn fileToolbarBtn" title="${_('Select a file to upload. The file will first be saved locally and then automatically transferred to the designated file system (e.g., S3, Azure) in the background. The upload modal closes immediately after the file is queued, allowing you to continue working. A notification, \'File upload scheduled. Please check the task server page for progress,\' will confirm the upload has started. This feature is especially useful for large files, as it eliminates the need to wait for the upload to complete.')}" data-bind="visible: !inTrash(), css: {'disabled': isABFSRoot()}, click: function(){ if (!isABFSRoot()) { uploadFile(true) }}"><i class="fa fa-arrow-circle-o-up"></i> ${_('Schedule Upload')}</a>
182+
<a class="btn fileToolbarBtn" title="${_('Select a file to upload. The file will first be saved locally and then automatically transferred to the designated file system (e.g., S3, Azure) in the background. The upload modal closes immediately after the file is queued, allowing you to continue working. A notification, \'File upload scheduled. Please check the task server page for progress,\' will confirm the upload has started. This feature is especially useful for large files, as it eliminates the need to wait for the upload to complete.')}" data-bind="visible: !inTrash(), css: {'disabled': isABFSRoot()}, click: function(){ if (!isABFSRoot()) { checkAndDisplayAvailableSpace(); uploadFile(true); }}"><i class="fa fa-arrow-circle-o-up"></i> ${_('Schedule Upload')}</a>
183183
<!-- /ko -->
184184
<!-- /ko -->
185185
<!-- ko ifnot: isS3() || isGS() || isABFS() -->
@@ -192,7 +192,7 @@ ${ fb_components.menubar() }
192192
<!-- /ko -->
193193
<!-- ko if: isTaskServerEnabled -->
194194
<div id="upload-dropdown" class="btn-group" style="vertical-align: middle">
195-
<a data-hue-analytics="filebrowser:upload-btn-click" href="javascript: void(0)" class="btn upload-link dropdown-toggle" title="${_('Select a file to upload. The file will first be saved locally and then automatically transferred to the designated file system (e.g., S3, Azure) in the background. The upload modal closes immediately after the file is queued, allowing you to continue working. A notification, \'File upload scheduled. Please check the task server page for progress,\' will confirm the upload has started. This feature is especially useful for large files, as it eliminates the need to wait for the upload to complete.')}" data-bind="click: function() { uploadFile(true); }, visible: !inTrash(), css: {'disabled': (isOFS() && (isOFSRoot() || isOFSServiceID() || isOFSVol()))}">
195+
<a data-hue-analytics="filebrowser:upload-btn-click" href="javascript: void(0)" class="btn upload-link dropdown-toggle" title="${_('Select a file to upload. The file will first be saved locally and then automatically transferred to the designated file system (e.g., S3, Azure) in the background. The upload modal closes immediately after the file is queued, allowing you to continue working. A notification, \'File upload scheduled. Please check the task server page for progress,\' will confirm the upload has started. This feature is especially useful for large files, as it eliminates the need to wait for the upload to complete.')}" data-bind="click: function() { checkAndDisplayAvailableSpace(); uploadFile(true);}, visible: !inTrash(), css: {'disabled': (isOFS() && (isOFSRoot() || isOFSServiceID() || isOFSVol()))}">
196196
<i class="fa fa-arrow-circle-o-up"></i> ${_('Schedule Upload')}
197197
</a>
198198
</div>

0 commit comments

Comments
 (0)