Skip to content
This repository was archived by the owner on Mar 20, 2023. It is now read-only.

Commit a02e35d

Browse files
committed
Fix task output_data on native pools with includes
- Resolves #313
1 parent 43fda94 commit a02e35d

File tree

1 file changed

+55
-29
lines changed

1 file changed

+55
-29
lines changed

convoy/data.py

Lines changed: 55 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,54 @@ def process_input_data(config, bxfile, spec, on_task=False):
291291
return None
292292

293293

294+
def _generate_batch_output_file_spec(
295+
storage_settings, container, saskey, seperator, condition, local_path,
296+
include):
297+
# type: (settings.StorageCredentialsSettings, str, str, str, str, str,
298+
# str) -> batchmodels.OutputFile
299+
"""Generate Batch output file spec with given local path and filter
300+
:param settings.StorageCredentialsSettings storage_settings:
301+
storage settings
302+
:param str container: container
303+
:param str saskey: sas key
304+
:param str separator: dir separator
305+
:param str condition: upload condition
306+
:param str local_path: task local path
307+
:param str include: include filter
308+
:rtype: batchmodels.OutputFile
309+
:return: Batch output file spec
310+
"""
311+
# set file pattern
312+
if local_path.endswith(seperator):
313+
fp = ''.join((local_path, include))
314+
else:
315+
fp = seperator.join((local_path, include))
316+
# set upload condition
317+
if condition == 'taskcompletion':
318+
buc = batchmodels.OutputFileUploadCondition.task_completion
319+
elif condition == 'taskfailure':
320+
buc = batchmodels.OutputFileUploadCondition.task_failure
321+
elif condition == 'tasksuccess':
322+
buc = batchmodels.OutputFileUploadCondition.task_success
323+
# generate spec
324+
outfile = batchmodels.OutputFile(
325+
file_pattern=fp,
326+
destination=batchmodels.OutputFileDestination(
327+
container=batchmodels.OutputFileBlobContainerDestination(
328+
path='',
329+
container_url='{}?{}'.format(
330+
storage.generate_blob_container_uri(
331+
storage_settings, container),
332+
saskey)
333+
)
334+
),
335+
upload_options=batchmodels.OutputFileUploadOptions(
336+
upload_condition=buc
337+
),
338+
)
339+
return outfile
340+
341+
294342
def _process_storage_output_data(config, native, is_windows, output_data):
295343
# type: (dict, bool, bool, dict) -> str
296344
"""Process output data to egress to Azure storage
@@ -335,8 +383,6 @@ def _process_storage_output_data(config, native, is_windows, output_data):
335383
includes = settings.data_include(xfer)
336384
excludes = settings.data_exclude(xfer)
337385
condition = settings.data_condition(xfer)
338-
# convert include/excludes into extra options
339-
filters = _convert_filter_to_blobxfer_option(includes, excludes)
340386
local_path = settings.data_local_path(xfer, True, task_wd=False)
341387
# auto replace container path for gluster with host path
342388
if (util.is_not_empty(gluster_container) and
@@ -351,34 +397,14 @@ def _process_storage_output_data(config, native, is_windows, output_data):
351397
else:
352398
sep = '/'
353399
if util.is_none_or_empty(includes):
354-
include = '**{}*'.format(sep)
355-
if not local_path.endswith(sep):
356-
fp = sep.join((local_path, include))
357-
else:
358-
fp = ''.join((local_path, include))
359-
if condition == 'taskcompletion':
360-
buc = batchmodels.OutputFileUploadCondition.task_completion
361-
elif condition == 'taskfailure':
362-
buc = batchmodels.OutputFileUploadCondition.task_failure
363-
elif condition == 'tasksuccess':
364-
buc = batchmodels.OutputFileUploadCondition.task_success
365-
of = batchmodels.OutputFile(
366-
file_pattern=fp,
367-
destination=batchmodels.OutputFileDestination(
368-
container=batchmodels.OutputFileBlobContainerDestination(
369-
path='',
370-
container_url='{}?{}'.format(
371-
storage.generate_blob_container_uri(
372-
storage_settings, container),
373-
saskey)
374-
)
375-
),
376-
upload_options=batchmodels.OutputFileUploadOptions(
377-
upload_condition=buc
378-
),
379-
)
380-
args.append(of)
400+
includes = ['**{}*'.format(sep)]
401+
for include in includes:
402+
args.append(_generate_batch_output_file_spec(
403+
storage_settings, container, saskey, sep, condition,
404+
local_path, include))
381405
else:
406+
# convert include/excludes into extra options
407+
filters = _convert_filter_to_blobxfer_option(includes, excludes)
382408
# construct argument
383409
# kind:encrypted:<sa:ep:saskey:remote_path>:local_path:eo
384410
creds = crypto.encrypt_string(

0 commit comments

Comments
 (0)