|
| 1 | +"""Helpers used by pythonanywhere helper scripts.""" |
| 2 | + |
| 3 | +import logging |
| 4 | +import sys |
| 5 | + |
| 6 | +from schema import And, Or, Schema, SchemaError, Use |
| 7 | + |
| 8 | +from pythonanywhere.snakesay import snakesay |
| 9 | +from pythonanywhere.task import Task |
| 10 | + |
| 11 | +logger = logging.getLogger(__name__) |
| 12 | + |
| 13 | +# fmt: off |
| 14 | +tabulate_formats = [ |
| 15 | + "plain", "simple", "github", "grid", "fancy_grid", "pipe", "orgtbl", "jira", |
| 16 | + "presto", "psql", "rst", "mediawiki", "moinmoin", "youtrack", "html", "latex", |
| 17 | + "latex_raw", "latex_booktabs", "textile", |
| 18 | +] |
| 19 | +# fmt: on |
| 20 | + |
| 21 | + |
| 22 | +class ScriptSchema(Schema): |
| 23 | + """Extends `Schema` adapting it to PA scripts validation strategies. |
| 24 | +
|
| 25 | + Adds predefined schemata as class variables to be used in scripts' |
| 26 | + validation schemas as well as `validate_user_input` method which acts |
| 27 | + as `Schema.validate` but returns a dictionary with converted keys |
| 28 | + ready to be used as function keyword arguments, e.g. validated |
| 29 | + arguments {"--foo": bar, "<baz>": qux} will be be converted to |
| 30 | + {"foo": bar, "baz": qux}. Additional conversion rules may be added as |
| 31 | + dictionary passed to `validate_user_input` :method: as `conversions` |
| 32 | + :param:. |
| 33 | +
|
| 34 | + Use :method:`ScriptSchema.validate_user_input` to obtain kwarg |
| 35 | + dictionary.""" |
| 36 | + |
| 37 | + # class variables are used in task scripts schemata: |
| 38 | + boolean = Or(None, bool) |
| 39 | + hour = Or(None, And(Use(int), lambda h: 0 <= h <= 23), error="--hour has to be in 0..23") |
| 40 | + id_multi = Or([], And(lambda y: [x.isdigit() for x in y], error="<id> has to be integer")) |
| 41 | + id_required = And(Use(int), error="<id> has to be an integer") |
| 42 | + minute_required = And(Use(int), lambda m: 0 <= m <= 59, error="--minute has to be in 0..59") |
| 43 | + minute = Or(None, minute_required) |
| 44 | + string = Or(None, str) |
| 45 | + tabulate_format = Or( |
| 46 | + None, |
| 47 | + And(str, lambda f: f in tabulate_formats), |
| 48 | + error="--format should match one of: {}".format(", ".join(tabulate_formats)), |
| 49 | + ) |
| 50 | + |
| 51 | + replacements = {"--": "", "<": "", ">": ""} |
| 52 | + |
| 53 | + def convert(self, string): |
| 54 | + """Removes cli argument notation characters ('--', '<', '>' etc.). |
| 55 | +
|
| 56 | + :param string: cli argument key to be converted to fit Python |
| 57 | + argument syntax.""" |
| 58 | + |
| 59 | + for key, value in self.replacements.items(): |
| 60 | + string = string.replace(key, value) |
| 61 | + return string |
| 62 | + |
| 63 | + def validate_user_input(self, arguments, *, conversions=None): |
| 64 | + """Calls `Schema.validate` on provided `arguments`. |
| 65 | +
|
| 66 | + Returns dictionary with keys converted by |
| 67 | + `ScriptSchema.convert` :method: to be later used as kwarg |
| 68 | + arguments. Universal rules for conversion are stored in |
| 69 | + `replacements` class variable and may be updated using |
| 70 | + `conversions` kwarg. Use optional `conversions` :param: to add |
| 71 | + custom replacement rules. |
| 72 | +
|
| 73 | + :param arguments: dictionary of cli arguments provided be |
| 74 | + (e.g.) `docopt` |
| 75 | + :param conversions: dictionary of additional rules to |
| 76 | + `self.replacements`""" |
| 77 | + |
| 78 | + if conversions: |
| 79 | + self.replacements.update(conversions) |
| 80 | + |
| 81 | + try: |
| 82 | + self.validate(arguments) |
| 83 | + return {self.convert(key): val for key, val in arguments.items()} |
| 84 | + except SchemaError as e: |
| 85 | + logger.warning(snakesay(str(e))) |
| 86 | + sys.exit(1) |
| 87 | + |
| 88 | + |
| 89 | +def get_logger(set_info=False): |
| 90 | + """Sets logger for 'pythonanywhere' package. |
| 91 | +
|
| 92 | + Returns `logging.Logger` instance with no message formatting which |
| 93 | + will stream to stdout. With `set_info` :param: set to `True` |
| 94 | + logger defines `logging.INFO` level otherwise it leaves default |
| 95 | + `logging.WARNING`. |
| 96 | +
|
| 97 | + To toggle message visibility in scripts use `logger.info` calls |
| 98 | + and switch `set_info` value accordingly. |
| 99 | +
|
| 100 | + :param set_info: boolean (defaults to False)""" |
| 101 | + |
| 102 | + logging.basicConfig(format="%(message)s", stream=sys.stdout) |
| 103 | + logger = logging.getLogger("pythonanywhere") |
| 104 | + if set_info: |
| 105 | + logger.setLevel(logging.INFO) |
| 106 | + else: |
| 107 | + logger.setLevel(logging.WARNING) |
| 108 | + return logger |
| 109 | + |
| 110 | + |
| 111 | +def get_task_from_id(task_id, no_exit=False): |
| 112 | + """Get `Task.from_id` instance representing existing task. |
| 113 | +
|
| 114 | + :param task_id: integer (should be a valid task id) |
| 115 | + :param no_exit: if (default) False sys.exit will be called when |
| 116 | + exception is caught""" |
| 117 | + |
| 118 | + try: |
| 119 | + return Task.from_id(task_id) |
| 120 | + except Exception as e: |
| 121 | + logger.warning(snakesay(str(e))) |
| 122 | + if not no_exit: |
| 123 | + sys.exit(1) |
0 commit comments