Skip to content

Commit 01150f5

Browse files
Ken LippoldKen Lippold
authored andcommitted
HydroShare archival updates
1 parent 2d35845 commit 01150f5

File tree

2 files changed

+49
-46
lines changed

2 files changed

+49
-46
lines changed

etl/services/hydroshare_archival.py

Lines changed: 44 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -243,51 +243,58 @@ def run(self, user: Optional[User], uid: uuid.UUID, make_public=False):
243243
.all()
244244
)
245245

246-
datastream_file_names = []
246+
processing_levels = {}
247247

248-
processing_levels = list(
249-
set([datastream.processing_level.definition for datastream in datastreams])
250-
)
248+
for datastream in datastreams:
249+
if datastream.processing_level.definition in processing_levels:
250+
processing_levels[datastream.processing_level.definition].append(datastream)
251+
else:
252+
processing_levels[datastream.processing_level.definition] = [datastream]
251253

252254
with tempfile.TemporaryDirectory() as temp_dir:
253-
for processing_level in processing_levels:
255+
for processing_level, datastreams in processing_levels.items():
256+
processing_level_directory = f"{archive_folder}{processing_level}"
257+
processing_level_directory = re.sub(r"\s+", "_", processing_level_directory)
258+
254259
try:
255-
archive_resource.folder_delete(
256-
f"{archive_folder}{processing_level}"
257-
)
260+
archive_resource.folder_delete(processing_level_directory)
258261
except (Exception,):
259262
pass
260-
archive_sub_folder = f"{archive_folder}{processing_level}"
261-
archive_sub_folder = re.sub(r"\s+", "_", archive_sub_folder)
262-
archive_resource.folder_create(archive_sub_folder)
263+
264+
archive_resource.folder_create(processing_level_directory)
263265
os.mkdir(os.path.join(temp_dir, processing_level))
264-
for datastream in datastreams:
265-
temp_file_name = datastream.observed_property.code
266-
temp_file_index = 2
267-
while (
268-
f"{datastream.processing_level.definition}_{temp_file_name}"
269-
in datastream_file_names
270-
):
271-
temp_file_name = (
272-
f"{datastream.observed_property.code} - {str(temp_file_index)}"
266+
267+
datastream_files = []
268+
269+
for datastream in datastreams:
270+
file_name = f"{datastream.observed_property.code}.csv"
271+
file_index = 2
272+
273+
while file_name in datastream_files:
274+
file_name = (
275+
f"{datastream.observed_property.code}_{str(file_index)}.csv"
276+
)
277+
file_index += 1
278+
279+
datastream_files.append(file_name)
280+
281+
temp_file_path = os.path.join(
282+
temp_dir, processing_level, file_name
283+
)
284+
with open(temp_file_path, "w") as csv_file:
285+
for line in datastream_service.generate_csv(datastream):
286+
csv_file.write(line)
287+
288+
datastream_file_paths = [
289+
os.path.join(temp_dir, processing_level, datastream_file)
290+
for datastream_file in datastream_files
291+
]
292+
293+
if datastream_file_paths:
294+
archive_resource.file_upload(
295+
*datastream_file_paths,
296+
destination_path=processing_level_directory,
273297
)
274-
temp_file_index += 1
275-
datastream_file_names.append(
276-
f"{datastream.processing_level.definition}_{temp_file_name}"
277-
)
278-
temp_file_name = f"{temp_file_name}.csv"
279-
temp_file_path = os.path.join(
280-
temp_dir, datastream.processing_level.definition, temp_file_name
281-
)
282-
with open(temp_file_path, "w") as csv_file:
283-
for line in datastream_service.generate_csv(datastream):
284-
csv_file.write(line)
285-
dest_path = f"{archive_folder}{datastream.processing_level.definition}"
286-
dest_path = re.sub(r"\s+", "_", dest_path)
287-
archive_resource.file_upload(
288-
temp_file_path,
289-
destination_path=dest_path,
290-
)
291298

292299
if make_public is True:
293300
try:

sta/services/datastream.py

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -233,11 +233,7 @@ def delete(self, user: User, uid: uuid.UUID):
233233

234234
@staticmethod
235235
def generate_csv(datastream: Datastream):
236-
observations = (
237-
Observation.objects.filter(datastream=datastream)
238-
.only("phenomenon_time", "result", "quality_code")
239-
.order_by("phenomenon_time")
240-
)
236+
observations = Observation.objects.filter(datastream=datastream).order_by("phenomenon_time")
241237

242238
latitude = (
243239
round(datastream.thing.location.latitude, 6)
@@ -341,11 +337,11 @@ def generate_csv(datastream: Datastream):
341337

342338
yield "ResultTime,Result,ResultQualifiers\n"
343339

344-
for observation in observations.all():
345-
if observation.quality_code:
346-
yield f'{observation.phenomenon_time.isoformat()},{observation.result},"{observation.quality_code}"\n'
340+
for observation in observations.values_list("phenomenon_time", "result", "quality_code"):
341+
if observation[2]:
342+
yield f'{observation[0].isoformat()},{observation[1]},"{observation[2]}"\n'
347343
else:
348-
yield f"{observation.phenomenon_time.isoformat()},{observation.result},\n"
344+
yield f"{observation[0].isoformat()},{observation[1]},\n"
349345

350346
def get_csv(self, user: User, uid: uuid.UUID):
351347
datastream = self.get_datastream_for_action(user=user, uid=uid, action="view")

0 commit comments

Comments
 (0)