Skip to content

Commit 806491d

Browse files
committed
fix: reduce memory consumption of AREX.getJobOutput()
1 parent 39b047b commit 806491d

File tree

1 file changed

+13
-16
lines changed

1 file changed

+13
-16
lines changed

src/DIRAC/Resources/Computing/AREXComputingElement.py

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import os
2727
import json
2828
import requests
29+
import shutil
2930

3031
from DIRAC import S_OK, S_ERROR
3132
from DIRAC.Core.Security import Locations
@@ -144,7 +145,7 @@ def _urlJoin(self, command):
144145
"""
145146
return os.path.join(self.base_url, command)
146147

147-
def _request(self, method, query, params=None, data=None, headers=None, timeout=None):
148+
def _request(self, method, query, params=None, data=None, headers=None, timeout=None, stream=False):
148149
"""Perform a request and properly handle the results/exceptions.
149150
150151
:param str method: "post", "get", "put"
@@ -164,12 +165,7 @@ def _request(self, method, query, params=None, data=None, headers=None, timeout=
164165

165166
try:
166167
response = self.session.request(
167-
method,
168-
query,
169-
headers=headers,
170-
params=params,
171-
data=data,
172-
timeout=timeout,
168+
method, query, headers=headers, params=params, data=data, timeout=timeout, stream=stream
173169
)
174170
if not response.ok:
175171
return S_ERROR(f"Response: {response.status_code} - {response.reason}")
@@ -811,20 +807,21 @@ def getJobOutput(self, jobID, workingDirectory=None):
811807
query = self._urlJoin(os.path.join("jobs", job, "session", remoteOutput))
812808

813809
# Submit the GET request to retrieve outputs
814-
result = self._request("get", query)
810+
result = self._request("get", query, stream=True)
815811
if not result["OK"]:
816812
self.log.error("Error downloading", f"{remoteOutput} for {job}: {result['Message']}")
817813
return S_ERROR(f"Error downloading {remoteOutput} for {jobID}")
818814
response = result["Value"]
819-
outputContent = response.text
815+
816+
localOutput = os.path.join(workingDirectory, remoteOutput)
817+
with open(localOutput, "wb") as f:
818+
shutil.copyfileobj(response.raw, f)
820819

821820
if remoteOutput == f"{stamp}.out":
822-
stdout = outputContent
823-
elif remoteOutput == f"{stamp}.err":
824-
stderr = outputContent
825-
else:
826-
localOutput = os.path.join(workingDirectory, remoteOutput)
827-
with open(localOutput, "w") as f:
828-
f.write(outputContent)
821+
with open(localOutput) as f:
822+
stdout = f.read()
823+
if remoteOutput == f"{stamp}.err":
824+
with open(localOutput) as f:
825+
stderr = f.read()
829826

830827
return S_OK((stdout, stderr))

0 commit comments

Comments
 (0)