|
1 | | -import json |
2 | | -import re |
| 1 | +import logging |
3 | 2 | from collections import Callable, Iterable |
4 | | -from datetime import datetime, timedelta |
5 | 3 | from functools import lru_cache |
6 | | -from urllib import error, parse, request |
7 | 4 |
|
8 | | -from ruz.logging import log, logging |
9 | | -from ruz.schema import REQUEST_SCHEMA, RUZ_API_ENDPOINTS |
10 | | -from ruz.utils import (CHECK_EMAIL_ONLINE, RUZ_API_URL, RUZ_API_V, |
11 | | - USE_NONE_SAFE_VALUES, none_safe) |
| 5 | +from ruz.utils import get, get_formated_date, is_student |
12 | 6 |
|
13 | 7 |
|
14 | | -# ===== Common methods ===== |
15 | | - |
16 | | -def is_student(email: str) -> bool or None: |
17 | | - """ |
18 | | - Check email belongs to student |
19 | | -
|
20 | | - :param email, required - valid HSE email addres or domain. |
21 | | -
|
22 | | - Stutent's domain: @edu.hse.ru |
23 | | - HSE stuff' domain: @hse.ru |
24 | | - """ |
25 | | - email_domain = email.lower().split("@")[-1] |
26 | | - |
27 | | - if email_domain == "edu.hse.ru": |
28 | | - return True |
29 | | - elif email_domain == "hse.ru": |
30 | | - return False |
31 | | - |
32 | | - logging.error("Wrong HSE email domain: '%s'", email_domain) |
33 | | - |
34 | | - |
35 | | -def is_hse_email(email: str) -> bool: |
36 | | - """ |
37 | | - Check email is valid HSE corp. email |
38 | | -
|
39 | | - :param email, required - email address to check. |
40 | | - """ |
41 | | - if re.fullmatch(r"^[a-z0-9\._-]{3,}@(edu\.)?hse\.ru$", email.lower()): |
42 | | - return True |
43 | | - logging.debug("Incorrect HSE email '%s'.", email) |
44 | | - return False |
45 | | - |
46 | | - |
47 | | -def get_formated_date(day_bias: int or float=0) -> str: |
48 | | - """ |
49 | | - Return date in RUZ API compatible format |
50 | | -
|
51 | | - :param day_bias - number of day from now. |
52 | | - """ |
53 | | - return (datetime.now() + timedelta( |
54 | | - days=float(day_bias) |
55 | | - )).strftime("%Y.%m.%d") |
56 | | - |
57 | | - |
58 | | -@log() |
59 | | -def is_valid_hse_email(email: str) -> bool: |
60 | | - """ |
61 | | - Check email is valid via API endpoint call (schedule) |
62 | | -
|
63 | | - :param email - email address to check (for schedules only). |
64 | | - """ |
65 | | - @none_safe() |
66 | | - def request_schedule_api(**params) -> list or dict: |
67 | | - return request.urlopen(make_url( |
68 | | - "schedule", |
69 | | - email=email, |
70 | | - fromDate=get_formated_date(), |
71 | | - toDate=get_formated_date(1), |
72 | | - **params |
73 | | - )) |
74 | | - |
75 | | - email = email.strip().lower() |
76 | | - if not is_hse_email(email): |
77 | | - return False |
78 | | - |
79 | | - try: |
80 | | - response = request_schedule_api( |
81 | | - receiverType=1 if not is_student(email) else None |
82 | | - ) |
83 | | - del response |
84 | | - except (error.HTTPError, error.URLError) as err: |
85 | | - logging.debug("Email '%s' wasn't verified.\n%s", email, err) |
86 | | - return False |
87 | | - return True |
88 | | - |
89 | | - |
90 | | -# ===== Special methods ===== |
91 | | - |
92 | | -@log() |
93 | | -def is_valid_schema(endpoint: str, |
94 | | - check_email_online: bool=CHECK_EMAIL_ONLINE, |
95 | | - **params) -> bool: |
96 | | - """ |
97 | | - Check params fit schema for certain endpoint |
98 | | -
|
99 | | - :param endpoint - endpoint for request. |
100 | | - :param check_email_online - use is_valid_hse_email. |
101 | | - :param params - schema params. |
102 | | - """ |
103 | | - |
104 | | - if (endpoint == "schedule" and "lecturerOid" not in params and |
105 | | - "studentOid" not in params and "email" not in params and |
106 | | - "auditoriumOid" not in params): |
107 | | - logging.debug("One of the followed required: lecturer_id, " |
108 | | - "auditorium_id, student_id, email for " |
109 | | - "schedule endpoint.") |
110 | | - return False |
111 | | - |
112 | | - if params.get('email') is not None: |
113 | | - email = params['email'] |
114 | | - if not is_hse_email(email): |
115 | | - del email |
116 | | - return False |
117 | | - elif check_email_online and not is_valid_hse_email(email): |
118 | | - logging.warning("'%s' is not verified by API call.", email) |
119 | | - del email |
120 | | - |
121 | | - endpoint = RUZ_API_ENDPOINTS.get(endpoint) |
122 | | - if endpoint is None: |
123 | | - logging.warning("Can't find endpoint: '%s'.", endpoint) |
124 | | - del endpoint |
125 | | - return False |
126 | | - |
127 | | - schema = REQUEST_SCHEMA[endpoint] |
128 | | - for key, value in params.items(): |
129 | | - if key not in schema: |
130 | | - logging.warning("Can't find '%s' schema param: '%s'", |
131 | | - endpoint, key) |
132 | | - del schema, endpoint |
133 | | - return False |
134 | | - if not isinstance(value, schema[key]): |
135 | | - logging.warning("Expected {} for '{}'::'{}' got: {}", |
136 | | - schema[key], endpoint, key, type(value)) |
137 | | - del schema, endpoint |
138 | | - return False |
139 | | - del schema, endpoint |
140 | | - return True |
141 | | - |
142 | | - |
143 | | -@log() |
144 | | -def make_url(endpoint: str, **params) -> str: |
145 | | - """ |
146 | | - Creates URL for API requests |
147 | | -
|
148 | | - :param endpoint - endpoint for request. |
149 | | - :param params - request params. |
150 | | - """ |
151 | | - url = "".join((RUZ_API_URL, RUZ_API_ENDPOINTS[endpoint])) |
152 | | - if params: |
153 | | - return "?".join((url, parse.urlencode(params))) |
154 | | - return url |
155 | | - |
156 | | - |
157 | | -@none_safe() |
158 | | -@log() |
159 | | -def get(endpoint: str, |
160 | | - encoding: str="utf-8", |
161 | | - return_none_safe: bool=USE_NONE_SAFE_VALUES, |
162 | | - **params) -> (list, dict, None): |
163 | | - """ |
164 | | - Return requested data in JSON |
165 | | -
|
166 | | - Check request has correct schema. |
167 | | -
|
168 | | - :param endpoint - endpoint for request. |
169 | | - :param encoding - encoding for received data. |
170 | | - :param return_none_safe - return empty list on fallback. |
171 | | - :param params - requested params |
172 | | - """ |
173 | | - if not is_valid_schema(endpoint, **params): |
174 | | - return [] if return_none_safe else None |
175 | | - |
176 | | - url = make_url(endpoint, **params) |
177 | | - try: |
178 | | - response = request.urlopen(url) |
179 | | - return json.loads(response.read().decode(encoding)) |
180 | | - except (error.HTTPError, error.URLError) as err: |
181 | | - logging.warning("Can't get '%s'.\n%s", url, err) |
182 | | - return [] if return_none_safe else None |
183 | | - |
184 | | - |
185 | | -# ===== API methods ===== |
186 | | - |
187 | 8 | def schedules(emails: Iterable=None, |
188 | 9 | lecturer_ids: Iterable=None, |
189 | 10 | auditorium_ids: Iterable=None, |
@@ -385,8 +206,6 @@ def sub_groups(reset_cache: bool=False) -> list: |
385 | 206 | return get("subGroups") |
386 | 207 |
|
387 | 208 |
|
388 | | -# ===== Additional methods ===== |
389 | | - |
390 | 209 | def find_by_str(subject: str or Callable, |
391 | 210 | query: str, |
392 | 211 | by: str="name", |
@@ -440,4 +259,5 @@ def find_by_str(subject: str or Callable, |
440 | 259 | raise NotImplementedError(subject.__name__) |
441 | 260 |
|
442 | 261 | query = query.strip().lower() |
443 | | - return [el for el in subject(**params) if query in el[by].lower().strip()] |
| 262 | + return [el for el in subject(**params) |
| 263 | + if query in (el[by].lower().strip() if el[by] else "")] |
0 commit comments