Skip to content

Commit b3512d5

Browse files
authored
Add obs data cloning job (#324)
* add job to copy obs data * include job file * preserve user flagged, add note about cron considerations * testing * extra coverage * coverage * last bit of diff coverage * update comments and correct test * consistent arg ordering, update comment
1 parent 2d7e892 commit b3512d5

File tree

6 files changed

+279
-0
lines changed

6 files changed

+279
-0
lines changed

sfa_api/admincli.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -457,3 +457,29 @@ def reference_persistence_job(
457457
'reference_persistence', name, user_id, cron_string, timeout,
458458
base_url=base_url)
459459
click.echo(f'Job created with id {id_}')
460+
461+
462+
@create_jobs.command('trial-data-copy')
463+
@with_default_options
464+
@base_url
465+
@timeout
466+
@name_arg
467+
@user_id_arg
468+
@cron_string
469+
@click.argument('copy_from', type=click.UUID)
470+
@click.argument('copy_to', type=click.UUID)
471+
def trial_data_copy_job(
472+
name, user_id, cron_string, timeout, copy_from, copy_to, base_url,
473+
**kwargs):
474+
"""
475+
Create a job to copy latest data from one observation to another.
476+
For short intervals, cron_string should schedule execution every
477+
x minutes where x is the observation interval length. For long
478+
intervals, cron string should should be set to execute shortly
479+
after data is posted to avoid introducing high latency.
480+
"""
481+
from sfa_api.jobs import create_job
482+
id_ = create_job(
483+
'trial_data_copy', name, user_id, cron_string, timeout,
484+
copy_to=str(copy_to), copy_from=str(copy_from), base_url=base_url)
485+
click.echo(f'Job created with id {id_}')

sfa_api/jobs.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from sfa_api.utils.auth0_info import exchange_refresh_token
2020
from sfa_api.utils.queuing import get_queue
2121
import sfa_api.utils.storage_interface as storage
22+
from sfa_api.utils.trial_jobs import copy_observation_data
2223

2324

2425
logger = logging.getLogger(__name__)
@@ -101,6 +102,8 @@ def create_job(job_type, name, user_id, cron_string, timeout=None, **kwargs):
101102
elif job_type in ('reference_persistence',
102103
'reference_probabilistic_persistence'):
103104
keys = ()
105+
elif job_type in ('trial_data_copy'):
106+
keys = ('copy_from', 'copy_to')
104107
else:
105108
raise ValueError(f'Job type {job_type} is not supported')
106109
params = {}
@@ -166,6 +169,9 @@ def execute_job(name, job_type, user_id, **kwargs):
166169
max_run_time = utcnow()
167170
return make_latest_probabilistic_persistence_forecasts(
168171
token, max_run_time, base_url=base_url)
172+
elif job_type == 'trial_data_copy':
173+
return copy_observation_data(
174+
token, kwargs['copy_from'], kwargs['copy_to'], base_url=base_url)
169175
else:
170176
raise ValueError(f'Job type {job_type} is not supported')
171177

sfa_api/tests/test_admincli.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -490,3 +490,14 @@ def test_reference_persistence_job(app_cli_runner, mocker, user_id):
490490
+ auth_args)
491491
assert result.exit_code == 0
492492
assert result.output == 'Job created with id jobid\n'
493+
494+
495+
def test_trial_data_job(app_cli_runner, mocker, user_id, observation_id):
496+
mocker.patch('sfa_api.jobs.create_job', return_value='jobid',
497+
autospec=True)
498+
result = app_cli_runner.invoke(
499+
admincli.trial_data_copy_job,
500+
['Val Job', user_id, '* * * * *', '--base-url', 'http://test',
501+
observation_id, observation_id] + auth_args)
502+
assert result.exit_code == 0
503+
assert result.output == 'Job created with id jobid\n'

sfa_api/tests/test_jobs.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,11 @@ def test_convert_sql_job_to_rq_job_not_cron(sql_job, mocker):
180180
('reference_probabilistic_persistence',
181181
{},
182182
'sfa_api.jobs.make_latest_probabilistic_persistence_forecasts'),
183+
('trial_data_copy',
184+
{'base_url': 'https://',
185+
'copy_from': 'id1',
186+
'copy_to': 'id2'},
187+
'sfa_api.jobs.copy_observation_data')
183188
])
184189
def test_execute_job(jtype, params, func, mocker, userid):
185190
mocker.patch('sfa_api.jobs.exchange_token',
@@ -261,6 +266,11 @@ def my_err(job, *exc_info):
261266
('periodic_report', {'report_id': 'id'}),
262267
('reference_persistence', {'base_url': 'https://'}),
263268
('reference_probabilistic_persistence', {'base_url': 'https://'}),
269+
('trial_data_copy', {
270+
'base_url': 'https://',
271+
'copy_from': 'id1',
272+
'copy_to': 'id2'
273+
}),
264274
pytest.param('badtype', {}, marks=pytest.mark.xfail(
265275
strict=True, raises=ValueError)),
266276
pytest.param('daily_observation_validation', {}, marks=pytest.mark.xfail(
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
import pytest
2+
import pandas as pd
3+
from requests.exceptions import HTTPError
4+
5+
6+
from sfa_api.utils import trial_jobs
7+
8+
9+
source_index = pd.date_range(
10+
'2019-04-14T07:00Z',
11+
'2019-04-14T07:09Z',
12+
freq='1T'
13+
)
14+
15+
source_data = pd.DataFrame(
16+
[(x, x) for x in range(0, 10)],
17+
index=source_index,
18+
columns=['value', 'quality_flag']
19+
)
20+
21+
target_data = pd.DataFrame(
22+
[(0, 0)],
23+
index=source_index[:0],
24+
columns=['value', 'quality_flag']
25+
)
26+
27+
28+
@pytest.fixture()
29+
def mock_session(mocker):
30+
mock_sess = mocker.MagicMock()
31+
mock_sess.get_observation_time_range = mocker.MagicMock(
32+
return_value=(
33+
pd.Timestamp('2019-04-14T07:00Z'),
34+
pd.Timestamp('2019-04-14T07:00Z')
35+
)
36+
)
37+
mock_sess.get_observation_values = mocker.MagicMock(
38+
return_value=source_data[1:]
39+
)
40+
mock_sess.post_observation_values = mocker.MagicMock(return_value=None)
41+
42+
mocker.patch('sfa_api.utils.trial_jobs.APISession', return_value=mock_sess)
43+
return mock_sess
44+
45+
46+
@pytest.fixture()
47+
def mock_logging(mocker):
48+
mock_logger = mocker.MagicMock()
49+
mocker.patch(
50+
'sfa_api.utils.trial_jobs.logging.getLogger',
51+
return_value=mock_logger
52+
)
53+
return mock_logger
54+
55+
56+
def test_copy_data(mocker, mock_session, observation_id, mock_logging):
57+
trial_jobs.copy_observation_data("token", observation_id, observation_id)
58+
mock_logging.info.assert_called_with(
59+
"Copied %s points from obs %s to %s.", 9,
60+
observation_id, observation_id
61+
)
62+
post_args = mock_session.post_observation_values.call_args_list
63+
post_args_df = post_args[0][0][1]
64+
qfs = post_args_df['quality_flag'].values
65+
assert (qfs == [1, 0, 1, 0, 1, 0, 1, 0, 1]).all()
66+
67+
68+
def test_copy_data_no_copy_from_read(
69+
mocker, mock_session, observation_id):
70+
resp = mocker.MagicMock()
71+
resp.status_code = 404
72+
mock_session.get_observation_values = mocker.MagicMock(
73+
side_effect=HTTPError(response=resp)
74+
)
75+
with pytest.raises(ValueError) as e:
76+
trial_jobs.copy_observation_data(
77+
"token", observation_id, observation_id
78+
)
79+
assert "Read values failure for copy_from " in str(e.value)
80+
81+
82+
def test_copy_data_no_copy_to_read(
83+
mocker, mock_session, observation_id):
84+
resp = mocker.MagicMock()
85+
resp.status_code = 404
86+
mock_session.get_observation_time_range = mocker.MagicMock(
87+
side_effect=HTTPError(response=resp)
88+
)
89+
with pytest.raises(ValueError) as e:
90+
trial_jobs.copy_observation_data(
91+
"token", observation_id, observation_id
92+
)
93+
assert "Read values failure for copy_to " in str(e.value)
94+
95+
96+
def test_copy_data_no_copy_to_data(
97+
mocker, mock_session, mock_logging, observation_id):
98+
resp = mocker.MagicMock()
99+
resp.status_code = 404
100+
mock_session.get_observation_time_range = mocker.MagicMock(
101+
side_effect=[
102+
(pd.NaT, pd.NaT),
103+
(source_index[0], source_index[-1])
104+
]
105+
)
106+
trial_jobs.copy_observation_data(
107+
"token", observation_id, observation_id
108+
)
109+
# Mocked observation_values get will return full test data frame,
110+
# so check that get_observation_values was called with the expected
111+
# timestamp.
112+
mock_session.get_observation_values.assert_called_with(
113+
observation_id, source_index[-1], mocker.ANY
114+
)
115+
116+
117+
def test_copy_data_no_copy_to_write(
118+
mocker, mock_session, observation_id):
119+
resp = mocker.MagicMock()
120+
resp.status_code = 404
121+
mock_session.post_observation_values = mocker.MagicMock(
122+
side_effect=HTTPError(response=resp)
123+
)
124+
with pytest.raises(ValueError) as e:
125+
trial_jobs.copy_observation_data(
126+
"token", observation_id, observation_id
127+
)
128+
assert "Write values failure for copy_to " in str(e.value)
129+
130+
131+
def test_copy_data_nothing_to_copy(
132+
mocker, mock_session, observation_id, mock_logging):
133+
mock_session.get_observation_values = mocker.MagicMock(
134+
return_value=pd.DataFrame()
135+
)
136+
trial_jobs.copy_observation_data("token", observation_id, observation_id)
137+
mock_logging.info.assert_called_with(
138+
"No points to copy from %s to %s.", observation_id, observation_id
139+
)

sfa_api/utils/trial_jobs.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
import logging
2+
from requests.exceptions import HTTPError
3+
4+
import pandas as pd
5+
from solarforecastarbiter.io.api import APISession
6+
from solarforecastarbiter.validation.quality_mapping import (
7+
BITMASK_DESCRIPTION_DICT
8+
)
9+
10+
USER_FLAGGED = BITMASK_DESCRIPTION_DICT[1]["USER FLAGGED"]
11+
12+
13+
def copy_observation_data(token, copy_from, copy_to, base_url=None):
14+
"""Helper job to copy observation data from one observation to
15+
another.
16+
17+
Parameters
18+
----------
19+
token: str
20+
API token to use.
21+
copy_from: str
22+
UUID of the observation to copy data from. Must have read_values
23+
permission on this observation.
24+
copy_to: str
25+
UUID of the observation to copy data to. Must have write_values
26+
permission on this observation.
27+
base_url: str
28+
URL of the api instance to connect to.
29+
30+
Raises
31+
------
32+
ValueError
33+
If reading values from either observation fails, or writing to
34+
copy_to fails.
35+
"""
36+
logger = logging.getLogger(__name__)
37+
sess = APISession(token, base_url=base_url)
38+
# Get latest timestamps for copy_to
39+
try:
40+
latest_copy_to = sess.get_observation_time_range(copy_to)[1]
41+
except HTTPError as e:
42+
raise ValueError(
43+
"Copy observation failed. Read values failure for copy_to "
44+
f"observation with error code: {e.response.status_code}"
45+
)
46+
47+
try:
48+
# If no data in copy_to, start from latest copy_from value.
49+
# Otherwise adjust latest_copy_to forward one minute to avoid
50+
# overwriting data.
51+
if pd.isna(latest_copy_to):
52+
latest_copy_to = sess.get_observation_time_range(copy_from)[1]
53+
else:
54+
latest_copy_to += pd.Timedelta('1T')
55+
56+
# Get values for copy_from from latest copy_to to now.
57+
values_to_copy = sess.get_observation_values(
58+
copy_from,
59+
latest_copy_to,
60+
pd.Timestamp.utcnow()
61+
)
62+
except HTTPError as e:
63+
raise ValueError(
64+
"Copy observation failed. Read values failure for copy_from "
65+
f"observation with error code: {e.response.status_code}"
66+
)
67+
68+
if not values_to_copy.empty:
69+
# Need to reset quality_flag and retained user flagged data, so
70+
# take the & of current flags and USER_FLAGGED
71+
flags = values_to_copy['quality_flag'] & USER_FLAGGED
72+
values_to_copy['quality_flag'] = flags
73+
74+
try:
75+
sess.post_observation_values(copy_to, values_to_copy)
76+
except HTTPError as e:
77+
raise ValueError(
78+
"Copy observation failed. Write values failure for copy_to "
79+
f"observation with error code: {e.response.status_code}"
80+
)
81+
else:
82+
logger.info(
83+
"Copied %s points from obs %s to %s.",
84+
len(values_to_copy), copy_from, copy_to
85+
)
86+
else:
87+
logger.info("No points to copy from %s to %s.", copy_from, copy_to)

0 commit comments

Comments
 (0)