|
2 | 2 | from src.utils.arguments import arg_parser |
3 | 3 | from src.utils.log import logger |
4 | 4 | from pytimeparse2 import parse |
| 5 | +from time import time |
5 | 6 | import requests |
6 | | -import time |
| 7 | + |
7 | 8 |
|
8 | 9 | prom_addr = arg_parser().get("prom.addr") |
| 10 | +running_tasks = False |
9 | 11 |
|
10 | 12 |
|
11 | | -def delete_series(policy_name: str, policy: dict) -> None: |
| 13 | +def delete_series(policy_name: str, policy: dict) -> bool: |
12 | 14 | """ |
13 | | - This function calls two Prometheus endpoints: |
14 | | - * POST /api/v1/admin/tsdb/delete_series |
15 | | - * POST /api/v1/admin/tsdb/clean_tombstones |
| 15 | + This function calls following Prometheus endpoint: |
| 16 | + POST /api/v1/admin/tsdb/delete_series |
16 | 17 | User-defined policies passed to this function |
17 | | - perform cleanup based on the specified policy settings. |
| 18 | + perform clean-up based on the specified policy settings. |
18 | 19 | """ |
19 | | - time_range = time.time() - parse(policy["keep_for"]) |
20 | | - start_time = time.time() |
| 20 | + time_range = time() - parse(policy["keep_for"]) |
21 | 21 | try: |
22 | 22 | r = requests.post( |
23 | 23 | f'{prom_addr}/api/v1/admin/tsdb/delete_series?match[]={policy["match"]}&end={time_range}') |
24 | 24 | except BaseException as e: |
25 | 25 | logger.error(e, extra={"policy_name": policy_name}) |
26 | 26 | else: |
27 | | - if r.status_code != 204: |
28 | | - logger.error(f"Failed to delete series, {r.json().get('error')}", extra={ |
29 | | - "status": r.status_code, "policy_name": policy_name}) |
30 | | - return |
31 | | - try: |
32 | | - r = requests.post( |
33 | | - f'{prom_addr}/api/v1/admin/tsdb/clean_tombstones') |
34 | | - except BaseException as e: |
35 | | - logger.error(e, extra={"policy_name": policy_name}) |
36 | | - return |
37 | | - else: |
38 | | - if r.status_code != 204: |
39 | | - logger.error(f"Failed to clean tombstones, {r.json().get('error')}", extra={ |
40 | | - "status": r.status_code, "policy_name": policy_name}) |
41 | | - return |
42 | | - exec_time = float("{:.2f}".format(time.time() - start_time)) |
43 | | - logger.debug("Task cleanup time-series has been successfully completed", |
44 | | - extra={"policy_name": policy_name, "exec_time": exec_time}) |
45 | | - return |
46 | | - |
47 | | - |
48 | | -def task_run_policies(): |
| 27 | + if r.status_code == 204: |
| 28 | + logger.debug("Task clean-up time-series has been successfully completed", |
| 29 | + extra={"policy_name": policy_name}) |
| 30 | + return True |
| 31 | + logger.error(f"Failed to delete series, {r.json().get('error')}", extra={ |
| 32 | + "status": r.status_code, "policy_name": policy_name}) |
| 33 | + return False |
| 34 | + |
| 35 | + |
| 36 | +def clean_tombstones() -> bool: |
| 37 | + """ |
| 38 | + This function calls following Prometheus endpoint: |
| 39 | + POST /api/v1/admin/tsdb/clean_tombstones |
| 40 | + Removes the deleted data from disk and |
| 41 | + cleans up the existing tombstones |
| 42 | + """ |
| 43 | + try: |
| 44 | + r = requests.post( |
| 45 | + f'{prom_addr}/api/v1/admin/tsdb/clean_tombstones') |
| 46 | + except BaseException as e: |
| 47 | + logger.error(e) |
| 48 | + else: |
| 49 | + if r.status_code == 204: |
| 50 | + return True |
| 51 | + logger.error(f"Failed to clean tombstones, {r.json().get('error')}", extra={ |
| 52 | + "status": r.status_code}) |
| 53 | + return False |
| 54 | + |
| 55 | + |
| 56 | +def run_policies() -> bool: |
49 | 57 | """ |
50 | 58 | This function loops over user-defined metrics lifecycle |
51 | | - policies and executes the cleanup job one by one |
| 59 | + policies and executes the clean-up job one by one |
52 | 60 | """ |
| 61 | + global running_tasks |
| 62 | + if running_tasks: |
| 63 | + logger.warning( |
| 64 | + "Cannot create a new task. Server is currently processing another task") |
| 65 | + return False |
| 66 | + |
53 | 67 | policies = load_policies() |
54 | 68 | if policies: |
55 | 69 | logger.debug( |
56 | 70 | f"Found {len(policies)} metrics lifecycle {'policies' if len(policies) > 1 else 'policy'}. " |
57 | | - f"Starting job to cleanup time-series.") |
| 71 | + f"Starting job to clean-up time-series.") |
| 72 | + running_tasks = True |
| 73 | + start_time = time() |
58 | 74 | for p in policies: |
59 | 75 | logger.debug( |
60 | | - "Task cleanup time-series is in progress", extra={ |
| 76 | + "Task clean-up series is in progress", extra={ |
61 | 77 | "policy_name": p, "match": policies[p]["match"], |
62 | 78 | "keep_for": policies[p]["keep_for"]}) |
63 | 79 | delete_series(policy_name=p, policy=policies[p]) |
| 80 | + clean_tombstones() |
| 81 | + exec_time = float("{:.2f}".format(time() - start_time)) |
| 82 | + running_tasks = False |
| 83 | + logger.debug( |
| 84 | + "Task clean-up series has been completed", extra={ |
| 85 | + "duration": exec_time}) |
| 86 | + return True |
0 commit comments