diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 76145e6b..7c11183b 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -66,8 +66,8 @@ jobs: run: | python3 version.py pyinstaller pros.spec - pyinstaller --onefile pros/cli/compile_commands/intercept-cc.py --name=intercept-cc - pyinstaller --onefile pros/cli/compile_commands/intercept-cc.py --name=intercept-c++ + pyinstaller --onefile pros/cli/compile_commands/intercept_cc.py --name=intercept-cc + pyinstaller --onefile pros/cli/compile_commands/intercept_cc.py --name=intercept-c++ if: matrix.os != 'macos-latest' - name: Run Pyinstaller MacOS @@ -77,8 +77,8 @@ jobs: pip3 install -e ./charset_normalizer python3 version.py pyinstaller pros-macos.spec - pyinstaller --onefile pros/cli/compile_commands/intercept-cc.py --name=intercept-cc --target-arch=universal2 - pyinstaller --onefile pros/cli/compile_commands/intercept-cc.py --name=intercept-c++ --target-arch=universal2 + pyinstaller --onefile pros/cli/compile_commands/intercept_cc.py --name=intercept-cc --target-arch=universal2 + pyinstaller --onefile pros/cli/compile_commands/intercept_cc.py --name=intercept-c++ --target-arch=universal2 if: matrix.os == 'macos-latest' - name: Package Everything Up diff --git a/.pylintrc b/.pylintrc index 8d811779..e9df67ae 100644 --- a/.pylintrc +++ b/.pylintrc @@ -1,9 +1,61 @@ [MASTER] +# Should enable these +# C0114: Missing module docstring +# C0115: Missing class docstring +# C0116: Missing function or method docstring +# C0415: Import outside toplevel (import-outside-toplevel) +# E1120: No value for argument in function call +# E1123: Unexpected argument in function call +# C0209: Consider using an f-string +# W0621: Redefining name from outer scope +# W0614: Unused import from wildcard import +# W0401: Wildcard import +# R1725: Super with arguments +# W0237: Renamed arguments +# W0613: Unused argument +# W0511: TODO +# W1514: Using open without explicitly specifying an encoding +# E1101: Nonexistent member +# E0401: Unable to import +# W0212: Access to a protected member +# W0101: Unreachable code + +# Performance optimization +# W1203: Use % formatting in logging functions and pass the parameters as arguments +# W1202: Use % formatting in logging functions and pass the parameters as arguments +# W1201: Specify string format arguments as logging function parameters + +# Fixed by auto formatter +# C0411: Wrongly ordered import +# C0412: Ungrouped imports +# C0301: Line too long + +# Might be difficult to refactor in some cases +# W0718: Broad exception caught +# W0702: No exception type(s) specified + + + + + +# Should disable these for now, would require too much refactoring +# R0903: Too few public methods +# R0913: Too many arguments +# R0914: Too many local variables +# R0912: Too many branches +# R0911: Too many return statements +# R0902: Too many instance attributes +# R0915: Too many statements +# R0904: Too many public methods +# C0302: Too many lines in module +# W0603: Using the global statement +# R0401: Cyclic import -- Seems to be platform specific + max-line-length = 120 -disable = C0114, C0115, C0116, R0903, C0415, R1705, R0913, W1203, R1729, E1120, E1123, C0209, R1710, W0621, C0121, - W0614, W0401, W1202, C0117, W0718, R0205, R0402, R0914, R1725, R1735, C0411, W0237, W0702, W0223, W0613, - W0108, R0912, R0911, W0511, E1136, R0902, W0611, C0412, C0103, C0301, R1732, R0915, W1514, R1718, W1510, - E0602, W1309, C0325, E1101, R1714, R0916, W0719, R1734, E1133, W1201, W0107, W3101, W0640, C0201, W1113, - W0246, W0622, W0221, E1111, R1720, W0221, R1723, E0102, W0201, E0203, E0401, W0602, W0212, W0707, R0904, - W0101, C0302, E0110, W0603, R1701, W0106, R1721, W0601, R0401, W0612, W1406, C0303, R1731 +disable = C0114, C0115, C0116, R0903, C0415, R0913, W1203, E1120, E1123, C0209, W0621, + W0614, W0401, W1202, W0718, R0914, R1725, C0411, W0237, W0702, W0613, + R0912, R0911, W0511, R0902, C0412, C0301, R0915, W1514, + E1101, W1201, + E0401, W0212, R0904, W0101, + C0302, W0603, R0401 diff --git a/pros/cli/click_classes.py b/pros/cli/click_classes.py index 27dbf73f..4ab301d2 100644 --- a/pros/cli/click_classes.py +++ b/pros/cli/click_classes.py @@ -32,8 +32,8 @@ def format_commands(self, ctx, formatter): if hasattr(cmd, 'hidden') and cmd.hidden: continue - help = cmd.short_help or '' - rows.append((subcommand, help)) + help_text = cmd.short_help or '' + rows.append((subcommand, help_text)) if rows: with formatter.section('Commands'): @@ -65,9 +65,10 @@ class PROSCommand(PROSFormatted, click.Command): pass +# Seems to be unused? class PROSMultiCommand(PROSFormatted, click.MultiCommand): - def get_command(self, ctx, cmd_name): - super().get_command(ctx, cmd_name) + def get_command(self, ctx: click.Context, cmd_name: str) -> Optional[click.Command]: + pass class PROSOption(click.Option): @@ -78,13 +79,13 @@ def __init__(self, *args, hidden: bool = False, group: str = None, **kwargs): def get_help_record(self, ctx): if hasattr(self, 'hidden') and self.hidden: - return + return None return super().get_help_record(ctx) class PROSDeprecated(click.Option): def __init__(self, *args, replacement: str = None, **kwargs): kwargs['help'] = "This option has been deprecated." - if not replacement==None: + if replacement is not None: kwargs['help'] += " Its replacement is '--{}'".format(replacement) super(PROSDeprecated, self).__init__(*args, **kwargs) self.group = "Deprecated" @@ -92,7 +93,7 @@ def __init__(self, *args, replacement: str = None, **kwargs): self.to_use = replacement self.arg = args[0][len(args[0])-1] self.msg = "The '{}' {} has been deprecated. Please use '--{}' instead." - if replacement==None: + if replacement is None: self.msg = self.msg.split(".")[0]+"." def type_cast_value(self, ctx, value): @@ -103,7 +104,7 @@ def type_cast_value(self, ctx, value): class PROSGroup(PROSFormatted, click.Group): def __init__(self, *args, **kwargs): super(PROSGroup, self).__init__(*args, **kwargs) - self.cmd_dict = dict() + self.cmd_dict = {} def command(self, *args, aliases=None, **kwargs): aliases = aliases or [] @@ -118,7 +119,7 @@ def decorator(f): return decorator - def group(self, aliases=None, *args, **kwargs): + def group(self, *args, aliases=None, **kwargs): aliases = aliases or [] def decorator(f): @@ -140,7 +141,7 @@ def get_command(self, ctx, cmd_name): # fall back to guessing matches = {x for x in self.list_commands(ctx) if x.startswith(cmd_name)} - matches.union({x for x in self.cmd_dict.keys() if x.startswith(cmd_name)}) + matches.union({x for x in self.cmd_dict if x.startswith(cmd_name)}) if len(matches) == 1: return super(PROSGroup, self).get_command(ctx, matches.pop()) return None @@ -159,8 +160,8 @@ def invoke(self, *args, **kwargs): super(PROSCommandCollection, self).invoke(*args, **kwargs) except ClickException as e: click.echo("PROS-CLI Version: {}".format(get_version())) - isProject = p.find_project("") - if (isProject): #check if there is a project + is_project = p.find_project("") + if is_project: #check if there is a project curr_proj = p() click.echo("PROS-Kernel Version: {}".format(curr_proj.kernel)) raise e diff --git a/pros/cli/common.py b/pros/cli/common.py index 8d2ca312..5cdebd91 100644 --- a/pros/cli/common.py +++ b/pros/cli/common.py @@ -71,7 +71,7 @@ def callback(ctx: click.Context, param: click.core.Parameter, value: Any): def logfile_option(f: Union[click.Command, Callable]): def callback(ctx: click.Context, param: click.core.Parameter, value: Any): if value is None or value[0] is None: - return None + return ctx.ensure_object(dict) level = None if isinstance(value[1], str): @@ -138,8 +138,7 @@ def callback(ctx: click.Context, param: click.Parameter, value: bool): add_tag('no-analytics',value) if value: echo("Not sending analytics for this command.\n") - analytics.useAnalytics = False - pass + analytics.use_analytics = False decorator = click.option('--no-analytics', expose_value=False, is_flag=True, default=False, is_eager=True, help="Don't send analytics for this command.", callback=callback, cls=PROSOption, hidden=True)(f) decorator.__name__ = f.__name__ @@ -191,12 +190,11 @@ def callback(ctx: click.Context, param: click.Parameter, value: str): if project_path is None: if allow_none: return None - elif required: + if required: raise click.UsageError(f'{os.path.abspath(value or ".")} is not inside a PROS project. ' f'Execute this command from within a PROS project or specify it ' f'with --project project/path') - else: - return None + return None return c.Project(project_path) @@ -231,12 +229,12 @@ def pros_root(f): return decorator -def resolve_v5_port(port: Optional[str], type: str, quiet: bool = False) -> Tuple[Optional[str], bool]: +def resolve_v5_port(port: Optional[str], p_type: str, quiet: bool = False) -> Tuple[Optional[str], bool]: """ Detect serial ports that can be used to interact with a V5. Returns a tuple of (port?, is_joystick). port will be None if no ports are - found, and is_joystick is False unless type == 'user' and the port is + found, and is_joystick is False unless p_type == 'user' and the port is determined to be a controller. This is useful in e.g. pros.cli.terminal:terminal where the communication protocol is different for wireless interaction. @@ -246,7 +244,7 @@ def resolve_v5_port(port: Optional[str], type: str, quiet: bool = False) -> Tupl # not a joystick. is_joystick = False if not port: - ports = find_v5_ports(type) + ports = find_v5_ports(p_type) logger(__name__).debug('Ports: {}'.format(';'.join([str(p.__dict__) for p in ports]))) if len(ports) == 0: if not quiet: @@ -268,7 +266,7 @@ def resolve_v5_port(port: Optional[str], type: str, quiet: bool = False) -> Tupl return None, False else: port = ports[0].device - is_joystick = type == 'user' and 'Controller' in ports[0].description + is_joystick = p_type == 'user' and 'Controller' in ports[0].description logger(__name__).info('Automatically selected {}'.format(port)) return port, is_joystick diff --git a/pros/cli/compile_commands/intercept-cc.py b/pros/cli/compile_commands/intercept_cc.py similarity index 100% rename from pros/cli/compile_commands/intercept-cc.py rename to pros/cli/compile_commands/intercept_cc.py diff --git a/pros/cli/conductor.py b/pros/cli/conductor.py index 0db800d3..60f556bc 100644 --- a/pros/cli/conductor.py +++ b/pros/cli/conductor.py @@ -1,7 +1,7 @@ import os.path from itertools import groupby -import pros.common.ui as ui +from pros.common import ui import pros.conductor as c from pros.cli.common import * from pros.conductor.templates import ExternalTemplate @@ -22,7 +22,6 @@ def conductor(): Visit https://pros.cs.purdue.edu/v5/cli/conductor.html to learn more """ - pass @conductor.command(aliases=['download'], short_help='Fetch/Download a remote template', @@ -78,6 +77,7 @@ def fetch(query: c.BaseTemplate): # whether the arguments are for the template or for the depot, so they share them logger(__name__).debug(f'Additional depot and template args: {query.metadata}') c.Conductor().fetch_template(depot, template, **query.metadata) + return 0 @conductor.command(context_settings={'ignore_unknown_options': True}) @@ -320,12 +320,12 @@ def info_project(project: c.Project, ls_upgrades): report = ProjectReport(project) _conductor = c.Conductor() if ls_upgrades: + import semantic_version as semver for template in report.project['templates']: - import semantic_version as semver templates = _conductor.resolve_templates(c.BaseTemplate.create_query(name=template["name"], version=f'>{template["version"]}', target=project.target)) - template["upgrades"] = sorted({t.version for t in templates}, key=lambda v: semver.Version(v), reverse=True) + template["upgrades"] = sorted({t.version for t in templates}, key=semver.Version, reverse=True) ui.finalize('project-report', report) diff --git a/pros/cli/conductor_utils.py b/pros/cli/conductor_utils.py index cb22cffc..580a95aa 100644 --- a/pros/cli/conductor_utils.py +++ b/pros/cli/conductor_utils.py @@ -6,7 +6,7 @@ from typing import * import click -import pros.common.ui as ui +from pros.common import ui import pros.conductor as c from pros.common.utils import logger from pros.conductor.templates import ExternalTemplate @@ -77,7 +77,7 @@ def get_matching_files(globs: List[str]) -> Set[str]: _path = os.path.normpath(path) + os.path.sep for g in [g for g in globs if glob.has_magic(g)]: files = glob.glob(f'{path}/{g}', recursive=True) - files = filter(lambda f: os.path.isfile(f), files) + files = filter(os.path.isfile, files) files = [os.path.normpath(os.path.normpath(f).split(_path)[-1]) for f in files] matching_files.extend(files) @@ -161,11 +161,11 @@ def purge_template(query: c.BaseTemplate, force): beta_templates = cond.resolve_templates(query, allow_online=False, beta=True) if len(templates) == 0: click.echo('No matching templates were found matching the spec.') - return 0 + return t_list = [t.identifier for t in templates] + [t.identifier for t in beta_templates] click.echo(f'The following template(s) will be removed {t_list}') if len(templates) > 1 and not force: - click.confirm(f'Are you sure you want to remove multiple templates?', abort=True) + click.confirm('Are you sure you want to remove multiple templates?', abort=True) for template in templates: if isinstance(template, c.LocalTemplate): cond.purge_template(template) diff --git a/pros/cli/interactive.py b/pros/cli/interactive.py index 634f1b2f..bc852ec6 100644 --- a/pros/cli/interactive.py +++ b/pros/cli/interactive.py @@ -3,7 +3,6 @@ import click import pros.conductor as c from .common import PROSGroup, default_options, project_option, pros_root -from pros.ga.analytics import analytics @pros_root def interactive_cli(): @@ -21,7 +20,7 @@ def interactive(): @default_options def new_project(directory): from pros.common.ui.interactive.renderers import MachineOutputRenderer - from pros.conductor.interactive.NewProjectModal import NewProjectModal + from pros.conductor.interactive.new_project_modal import NewProjectModal app = NewProjectModal(directory=directory) MachineOutputRenderer(app).run() @@ -31,7 +30,7 @@ def new_project(directory): @default_options def update_project(project: Optional[c.Project]): from pros.common.ui.interactive.renderers import MachineOutputRenderer - from pros.conductor.interactive.UpdateProjectModal import UpdateProjectModal + from pros.conductor.interactive.update_project_modal import UpdateProjectModal app = UpdateProjectModal(project) MachineOutputRenderer(app).run() diff --git a/pros/cli/main.py b/pros/cli/main.py index 8e4d6725..e44a8f9d 100644 --- a/pros/cli/main.py +++ b/pros/cli/main.py @@ -10,14 +10,13 @@ import ctypes import sys -import pros.common.ui as ui +from pros.common import ui import pros.common.ui.log from pros.cli.click_classes import * from pros.cli.common import default_options, root_commands from pros.common.utils import get_version, logger from pros.ga.analytics import analytics -import jsonpickle import pros.cli.build import pros.cli.conductor import pros.cli.conductor_utils @@ -46,14 +45,14 @@ ] if getattr(sys, 'frozen', False): - exe_file = sys.executable + EXE_FILE = sys.executable else: - exe_file = __file__ + EXE_FILE = __file__ -if os.path.exists(os.path.join(os.path.dirname(exe_file), os.pardir, os.pardir, '.git')): +if os.path.exists(os.path.join(os.path.dirname(EXE_FILE), os.pardir, os.pardir, '.git')): root_sources.append('test') -if os.path.exists(os.path.join(os.path.dirname(exe_file), os.pardir, os.pardir, '.git')): +if os.path.exists(os.path.join(os.path.dirname(EXE_FILE), os.pardir, os.pardir, '.git')): import pros.cli.test for root_source in root_sources: @@ -88,9 +87,9 @@ def version(ctx: click.Context, param, value): def use_analytics(ctx: click.Context, param, value): - if value == None: + if value is None: return - touse = not analytics.useAnalytics + touse = not analytics.use_analytics if str(value).lower().startswith("t"): touse = True elif str(value).lower().startswith("f"): @@ -100,9 +99,9 @@ def use_analytics(ctx: click.Context, param, value): ctx.exit(0) ctx.ensure_object(dict) analytics.set_use(touse) - ui.echo(f'Analytics usage set to: {analytics.useAnalytics}') + ui.echo(f'Analytics usage set to: {analytics.use_analytics}') ctx.exit(0) - + def use_early_access(ctx: click.Context, param, value): if value is None: return diff --git a/pros/cli/misc_commands.py b/pros/cli/misc_commands.py index d212a2fc..115bbcc6 100644 --- a/pros/cli/misc_commands.py +++ b/pros/cli/misc_commands.py @@ -1,4 +1,4 @@ -import pros.common.ui as ui +from pros.common import ui from pros.cli.common import * from pros.ga.analytics import analytics @@ -20,7 +20,7 @@ def upgrade(force_check, no_install): with ui.Notification(): ui.echo('The "pros upgrade" command is currently non-functioning. Did you mean to run "pros c upgrade"?', color='yellow') - return # Dead code below + return None # Dead code below analytics.send("upgrade") from pros.upgrade import UpgradeManager @@ -37,6 +37,6 @@ def upgrade(force_check, no_install): ui.finalize('upgradeInfo', manifest) if not no_install: if not manager.can_perform_upgrade: - ui.logger(__name__).error(f'This manifest cannot perform the upgrade.') + ui.logger(__name__).error('This manifest cannot perform the upgrade.') return -3 ui.finalize('upgradeComplete', manager.perform_upgrade()) diff --git a/pros/cli/terminal.py b/pros/cli/terminal.py index a44b89d5..048c498d 100644 --- a/pros/cli/terminal.py +++ b/pros/cli/terminal.py @@ -6,7 +6,7 @@ import sys import pros.conductor as c -import pros.serial.devices as devices +from pros.serial import devices from pros.serial.ports import DirectPort from pros.common.utils import logger from .common import default_options, resolve_v5_port, resolve_cortex_port, pros_root @@ -44,7 +44,6 @@ def terminal(port: str, backend: str, **kwargs): Note: share backend is not yet implemented. """ analytics.send("terminal") - from pros.serial.devices.vex.v5_user_device import V5UserDevice from pros.serial.terminal import Terminal is_v5_user_joystick = False if port == 'default': @@ -73,7 +72,7 @@ def terminal(port: str, backend: str, **kwargs): if backend == 'share': raise NotImplementedError('Share backend is not yet implemented') # ser = SerialSharePort(port) - elif is_v5_user_joystick: + if is_v5_user_joystick: logger(__name__).debug("it's a v5 joystick") ser = V5WirelessPort(port) else: @@ -85,10 +84,10 @@ def terminal(port: str, backend: str, **kwargs): device = devices.vex.V5UserDevice(ser) term = Terminal(device, request_banner=kwargs.pop('request_banner', True)) - class TerminalOutput(object): + class TerminalOutput: def __init__(self, file): self.terminal = sys.stdout - self.log = open(file, 'a') + self.log = open(file, 'a') # pylint: disable=consider-using-with def write(self, data): self.terminal.write(data) self.log.write(data) @@ -118,3 +117,4 @@ def end(self): output.end() term.join() logger(__name__).info('CLI Main Thread Dying') + return 0 diff --git a/pros/cli/test.py b/pros/cli/test.py index f19ac9a8..7608c53f 100644 --- a/pros/cli/test.py +++ b/pros/cli/test.py @@ -1,5 +1,5 @@ from pros.common.ui.interactive.renderers import MachineOutputRenderer -from pros.conductor.interactive.NewProjectModal import NewProjectModal +from pros.conductor.interactive.new_project_modal import NewProjectModal from .common import default_options, pros_root diff --git a/pros/cli/upload.py b/pros/cli/upload.py index 05cc1476..12d7b457 100644 --- a/pros/cli/upload.py +++ b/pros/cli/upload.py @@ -1,7 +1,6 @@ -from sys import exit -from unicodedata import name +import sys -import pros.common.ui as ui +from pros.common import ui import pros.conductor as c from .common import * @@ -54,9 +53,9 @@ def upload(path: Optional[str], project: Optional[c.Project], port: str, **kwarg automatically detected based on the target (or as supplied by the PROS project) """ analytics.send("upload") - import pros.serial.devices.vex as vex + from pros.serial.devices import vex from pros.serial.ports import DirectPort - kwargs['ide_version'] = project.kernel if not project==None else "None" + kwargs['ide_version'] = project.kernel if project is not None else "None" kwargs['ide'] = 'PROS' if path is None or os.path.isdir(path): if project is None: @@ -69,7 +68,7 @@ def upload(path: Optional[str], project: Optional[c.Project], port: str, **kwarg kwargs['remote_name'] = project.name # apply upload_options as a template - options = dict(**project.upload_options) + options = {**project.upload_options} if 'port' in options and port is None: port = options.get('port', None) if 'slot' in options and kwargs.get('slot', None) is None: @@ -127,11 +126,11 @@ def upload(path: Optional[str], project: Optional[c.Project], port: str, **kwarg } after_upload_default = 'screen' #Determine which FTCompleteOption to assign to run_after - if kwargs['after']==None: + if kwargs['after'] is None: kwargs['after']=after_upload_default if kwargs['run_after']: kwargs['after']='run' - elif kwargs['run_screen']==False and not kwargs['run_after']: + elif not kwargs['run_screen'] and not kwargs['run_after']: kwargs['after']='none' kwargs['run_after'] = action_to_kwarg[kwargs['after']] kwargs.pop('run_screen') @@ -155,7 +154,7 @@ def upload(path: Optional[str], project: Optional[c.Project], port: str, **kwarg device.write_program(pf, **kwargs) except Exception as e: logger(__name__).exception(e, exc_info=True) - exit(1) + sys.exit(1) @upload_cli.command('lsusb', aliases=['ls-usb', 'ls-devices', 'lsdev', 'list-usb', 'list-devices']) @click.option('--target', type=click.Choice(['v5', 'cortex']), default=None, required=False) @@ -167,7 +166,7 @@ def ls_usb(target): analytics.send("ls-usb") from pros.serial.devices.vex import find_v5_ports, find_cortex_ports - class PortReport(object): + class PortReport: def __init__(self, header: str, ports: List[Any], machine_header: Optional[str] = None): self.header = header self.ports = [{'device': p.device, 'desc': p.description} for p in ports] @@ -182,9 +181,8 @@ def __getstate__(self): def __str__(self): if len(self.ports) == 0: return f'There are no connected {self.header}' - else: - port_str = "\n".join([f"{p['device']} - {p['desc']}" for p in self.ports]) - return f'{self.header}:\n{port_str}' + port_str = "\n".join([f"{p['device']} - {p['desc']}" for p in self.ports]) + return f'{self.header}:\n{port_str}' result = [] if target == 'v5' or target is None: diff --git a/pros/cli/v5_utils.py b/pros/cli/v5_utils.py index a6fe0eec..11882caf 100644 --- a/pros/cli/v5_utils.py +++ b/pros/cli/v5_utils.py @@ -36,6 +36,7 @@ def status(port: str): print('CPU0 F/W version:', device.status['cpu0_version']) print('CPU1 SDK version:', device.status['cpu1_version']) print('System ID: 0x{:x}'.format(device.status['system_id'])) + return 0 @v5.command('ls-files') @@ -59,6 +60,7 @@ def ls_files(port: str, vid: int, options: int): c = device.get_dir_count(vid=vid, options=options) for i in range(0, c): print(device.get_file_metadata_by_idx(i)) + return 0 @v5.command(hidden=True) @@ -83,6 +85,7 @@ def read_file(file_name: str, port: str, vid: int, source: str): device = V5Device(ser) device.read_file(file=click.get_binary_stream('stdout'), remote_file=file_name, vid=vid, target=source) + return 0 @v5.command(hidden=True) @@ -108,6 +111,7 @@ def write_file(file, port: str, remote_file: str, **kwargs): ser = DirectPort(port) device = V5Device(ser) device.write_file(file=file, remote_file=remote_file or os.path.basename(file.name), **kwargs) + return 0 @v5.command('rm-file') @@ -131,6 +135,7 @@ def rm_file(file_name: str, port: str, vid: int, erase_all: bool): ser = DirectPort(port) device = V5Device(ser) device.erase_file(file_name, vid=vid, erase_all=erase_all) + return 0 @v5.command('cat-metadata') @@ -152,6 +157,7 @@ def cat_metadata(file_name: str, port: str, vid: int): ser = DirectPort(port) device = V5Device(ser) print(device.get_file_metadata_by_name(file_name, vid=vid)) + return 0 @v5.command('rm-program') @click.argument('slot') @@ -173,6 +179,7 @@ def rm_program(slot: int, port: str, vid: int): device = V5Device(ser) device.erase_file(f'{base_name}.ini', vid=vid) device.erase_file(f'{base_name}.bin', vid=vid) + return 0 @v5.command('rm-all') @click.argument('port', required=False, default=None) @@ -197,6 +204,7 @@ def rm_all(port: str, vid: int): files.append(device.get_file_metadata_by_idx(i)['filename']) for file in files: device.erase_file(file, vid=vid) + return 0 @v5.command(short_help='Run a V5 Program') @@ -221,6 +229,7 @@ def run(slot: str, port: str): ser = DirectPort(port) device = V5Device(ser) device.execute_program_file(file, run=True) + return 0 @v5.command(short_help='Stop a V5 Program') @@ -240,6 +249,7 @@ def stop(port: str): ser = DirectPort(port) device = V5Device(ser) device.execute_program_file('', run=False) + return 0 @v5.command(short_help='Take a screen capture of the display') @@ -275,7 +285,7 @@ def capture(file_name: str, port: str, force: bool = False): if file_name == '-': # Send the data to stdout to allow for piping print(i_data, end='') - return + return 0 if not file_name.endswith('.png'): file_name += '.png' @@ -290,6 +300,7 @@ def capture(file_name: str, port: str, force: bool = False): w.write(file_, i_data) print(f'Saved screen capture to {file_name}') + return 0 @v5.command('set-variable', aliases=['sv', 'set', 'set_variable'], short_help='Set a kernel variable on a connected V5 device') @click.argument('variable', type=click.Choice(['teamnumber', 'robotname']), required=True) @@ -297,12 +308,12 @@ def capture(file_name: str, port: str, force: bool = False): @click.argument('port', type=str, default=None, required=False) @default_options def set_variable(variable, value, port): - import pros.serial.devices.vex as vex + from pros.serial.devices import vex from pros.serial.ports import DirectPort # Get the connected v5 device port = resolve_v5_port(port, 'system')[0] - if port == None: + if port is None: return device = vex.V5Device(DirectPort(port)) actual_value = device.kv_write(variable, value).decode() @@ -313,12 +324,12 @@ def set_variable(variable, value, port): @click.argument('port', type=str, default=None, required=False) @default_options def read_variable(variable, port): - import pros.serial.devices.vex as vex + from pros.serial.devices import vex from pros.serial.ports import DirectPort # Get the connected v5 device port = resolve_v5_port(port, 'system')[0] - if port == None: + if port is None: return device = vex.V5Device(DirectPort(port)) value = device.kv_read(variable).decode() diff --git a/pros/common/sentry.py b/pros/common/sentry.py index 6c0c8690..8934f51f 100644 --- a/pros/common/sentry.py +++ b/pros/common/sentry.py @@ -2,7 +2,7 @@ import click -import pros.common.ui as ui +from pros.common import ui if TYPE_CHECKING: from sentry_sdk import Client, Hub, Scope # noqa: F401, flake8 issue with "if TYPE_CHECKING" @@ -10,34 +10,33 @@ from pros.config.cli_config import CliConfig # noqa: F401, flake8 issue, flake8 issue with "if TYPE_CHECKING" cli_config: 'CliConfig' = None -force_prompt_off = False +FORCE_PROMPT_OFF = False SUPPRESSED_EXCEPTIONS = [PermissionError, click.Abort] def disable_prompt(): - global force_prompt_off - force_prompt_off = True + global FORCE_PROMPT_OFF + FORCE_PROMPT_OFF = True def prompt_to_send(event: Dict[str, Any], hint: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]: """ Asks the user for permission to send data to Sentry """ - global cli_config with ui.Notification(): if cli_config is None or (cli_config.offer_sentry is not None and not cli_config.offer_sentry): - return - if force_prompt_off: + return None + if FORCE_PROMPT_OFF: ui.logger(__name__).debug('Sentry prompt was forced off through click option') - return + return None if 'extra' in event and not event['extra'].get('sentry', True): ui.logger(__name__).debug('Not sending candidate event because event was tagged with extra.sentry = False') - return + return None if 'exc_info' in hint and (not getattr(hint['exc_info'][1], 'sentry', True) or any(isinstance(hint['exc_info'][1], t) for t in SUPPRESSED_EXCEPTIONS)): ui.logger(__name__).debug('Not sending candidate event because exception was tagged with sentry = False') - return + return None if not event['tags']: - event['tags'] = dict() + event['tags'] = {} extra_text = '' if 'message' in event: @@ -57,8 +56,8 @@ def prompt_to_send(event: Dict[str, Any], hint: Optional[Dict[str, Any]]) -> Opt ui.echo(f'Want to get updates? Visit https://pros.cs.purdue.edu/report.html?event={event["event_id"]}') return event - else: - ui.echo('Not sending bug report.') + ui.echo('Not sending bug report.') + return None def add_context(obj: object, override_handlers: bool = True, key: str = None) -> None: @@ -113,7 +112,7 @@ def add_tag(key: str, value: str): def register(cfg: Optional['CliConfig'] = None): - global cli_config, client + global cli_config if cfg is None: from pros.config.cli_config import cli_config as get_cli_config cli_config = get_cli_config() diff --git a/pros/common/ui/__init__.py b/pros/common/ui/__init__.py index 24fcc71d..1e55c982 100644 --- a/pros/common/ui/__init__.py +++ b/pros/common/ui/__init__.py @@ -6,8 +6,8 @@ from ..utils import * -_last_notify_value = 0 -_current_notify_value = 0 +_LAST_NOTIFY_VALUE = 0 +_CURRENT_NOTIFY_VALUE = 0 _machine_pickler = jsonpickle.JSONBackend() @@ -17,8 +17,7 @@ def _machineoutput(obj: Dict[str, Any]): def _machine_notify(method: str, obj: Dict[str, Any], notify_value: Optional[int]): if notify_value is None: - global _current_notify_value - notify_value = _current_notify_value + notify_value = _CURRENT_NOTIFY_VALUE obj['type'] = f'notify/{method}' obj['notify_value'] = notify_value _machineoutput(obj) @@ -32,6 +31,7 @@ def echo(text: Any, err: bool = False, nl: bool = True, notify_value: int = None return _machine_notify('echo', {'text': str(text) + ('\n' if nl else '')}, notify_value) else: return click.echo(str(text), nl=nl, err=err, color=color) + return None def confirm(text: str, default: bool = False, abort: bool = False, prompt_suffix: bool = ': ', @@ -39,7 +39,7 @@ def confirm(text: str, default: bool = False, abort: bool = False, prompt_suffix log: str = None): add_breadcrumb(message=text, category='confirm') if ismachineoutput(): - from pros.common.ui.interactive.ConfirmModal import ConfirmModal + from pros.common.ui.interactive.confirm_modal import ConfirmModal from pros.common.ui.interactive.renderers import MachineOutputRenderer app = ConfirmModal(text, abort, title, log) @@ -52,7 +52,7 @@ def confirm(text: str, default: bool = False, abort: bool = False, prompt_suffix def prompt(text, default=None, hide_input=False, - confirmation_prompt=False, type=None, + confirmation_prompt=False, value_type=None, value_proc=None, prompt_suffix=': ', show_default=True, err=False): if ismachineoutput(): @@ -60,8 +60,9 @@ def prompt(text, default=None, hide_input=False, pass else: return click.prompt(text, default=default, hide_input=hide_input, confirmation_prompt=confirmation_prompt, - type=type, value_proc=value_proc, prompt_suffix=prompt_suffix, show_default=show_default, + type=value_type, value_proc=value_proc, prompt_suffix=prompt_suffix, show_default=show_default, err=err) + return None def progressbar(iterable: Iterable = None, length: int = None, label: str = None, show_eta: bool = True, @@ -70,8 +71,7 @@ def progressbar(iterable: Iterable = None, length: int = None, label: str = None info_sep: str = ' ', width: int = 36): if ismachineoutput(): return _MachineOutputProgressBar(**locals()) - else: - return click.progressbar(**locals()) + return click.progressbar(**locals()) def finalize(method: str, data: Union[str, Dict, object, List[Union[str, Dict, object, Tuple]]], @@ -89,7 +89,7 @@ def finalize(method: str, data: Union[str, Dict, object, List[Union[str, Dict, o human_readable = '' elif isinstance(data[0], str): human_readable = '\n'.join(data) - elif isinstance(data[0], dict) or isinstance(data[0], object): + elif isinstance(data[0], (dict, object)): if hasattr(data[0], '__str__'): human_readable = '\n'.join([str(d) for d in data]) else: @@ -120,9 +120,8 @@ def finalize(method: str, data: Union[str, Dict, object, List[Union[str, Dict, o class _MachineOutputProgressBar(_click_ProgressBar): def __init__(self, *args, **kwargs): - global _current_notify_value - kwargs['file'] = open(os.devnull, 'w', encoding='UTF-8') - self.notify_value = kwargs.pop('notify_value', _current_notify_value) + kwargs['file'] = open(os.devnull, 'w', encoding='UTF-8') # pylint: disable=consider-using-with + self.notify_value = kwargs.pop('notify_value', _CURRENT_NOTIFY_VALUE) super(_MachineOutputProgressBar, self).__init__(*args, **kwargs) def __del__(self): @@ -136,24 +135,23 @@ def render_progress(self): _machine_notify('progress', obj, self.notify_value) -class Notification(object): +class Notification: def __init__(self, notify_value: Optional[int] = None): - global _last_notify_value + global _LAST_NOTIFY_VALUE if not notify_value: - notify_value = _last_notify_value + 1 - if notify_value > _last_notify_value: - _last_notify_value = notify_value + notify_value = _LAST_NOTIFY_VALUE + 1 + _LAST_NOTIFY_VALUE = max(_LAST_NOTIFY_VALUE, notify_value) self.notify_value = notify_value self.old_notify_values = [] def __enter__(self): - global _current_notify_value - self.old_notify_values.append(_current_notify_value) - _current_notify_value = self.notify_value + global _CURRENT_NOTIFY_VALUE + self.old_notify_values.append(_CURRENT_NOTIFY_VALUE) + _CURRENT_NOTIFY_VALUE = self.notify_value def __exit__(self, exc_type, exc_val, exc_tb): - global _current_notify_value - _current_notify_value = self.old_notify_values.pop() + global _CURRENT_NOTIFY_VALUE + _CURRENT_NOTIFY_VALUE = self.old_notify_values.pop() class EchoPipe(threading.Thread): @@ -165,27 +163,27 @@ def __init__(self, err: bool = False, ctx: Optional[click.Context] = None): self.is_err = err threading.Thread.__init__(self) self.daemon = False - self.fdRead, self.fdWrite = os.pipe() - self.pipeReader = os.fdopen(self.fdRead, encoding='UTF-8') + self.fd_read, self.fd_write = os.pipe() + self.pipe_reader = os.fdopen(self.fd_read, encoding='UTF-8') self.start() def fileno(self): """Return the write file descriptor of the pipe """ - return self.fdWrite + return self.fd_write def run(self): """Run the thread, logging everything. """ - for line in iter(self.pipeReader.readline, ''): + for line in iter(self.pipe_reader.readline, ''): echo(line.strip('\n'), ctx=self.click_ctx, err=self.is_err) - self.pipeReader.close() + self.pipe_reader.close() def close(self): """Close the write end of the pipe. """ - os.close(self.fdWrite) + os.close(self.fd_write) __all__ = ['finalize', 'echo', 'confirm', 'prompt', 'progressbar', 'EchoPipe'] diff --git a/pros/common/ui/interactive/application.py b/pros/common/ui/interactive/application.py index 0db8dfaf..c2328a31 100644 --- a/pros/common/ui/interactive/application.py +++ b/pros/common/ui/interactive/application.py @@ -72,11 +72,11 @@ def __getstate__(self): """ Returns the dictionary representation of this Application """ - return dict( - etype=Application.get_hierarchy(self.__class__), - elements=[e.__getstate__() for e in self.build()], - uuid=self.uuid - ) + return { + "etype": Application.get_hierarchy(self.__class__), + "elements": [e.__getstate__() for e in self.build()], + "uuid": self.uuid + } class Modal(Application[P], Generic[P]): @@ -134,15 +134,15 @@ def __getstate__(self): extra_state = {} if self.description is not None: extra_state['description'] = self.description - return dict( + return { **super(Modal, self).__getstate__(), **extra_state, - title=self.title, - will_abort=self.will_abort, - confirm_button=self.confirm_button, - cancel_button=self.cancel_button, - can_confirm=self.can_confirm - ) + "title": self.title, + "will_abort": self.will_abort, + "confirm_button": self.confirm_button, + "cancel_button": self.cancel_button, + "can_confirm": self.can_confirm + } def _confirm(self, *args, **kwargs): """ diff --git a/pros/common/ui/interactive/components/button.py b/pros/common/ui/interactive/components/button.py index a3716158..746c6e27 100644 --- a/pros/common/ui/interactive/components/button.py +++ b/pros/common/ui/interactive/components/button.py @@ -17,8 +17,8 @@ def on_clicked(self, *handlers: Callable, **kwargs): return self.on('clicked', *handlers, **kwargs) def __getstate__(self) -> dict: - return dict( + return { **super(Button, self).__getstate__(), - text=self.text, - uuid=self.uuid - ) + "text": self.text, + "uuid": self.uuid + } diff --git a/pros/common/ui/interactive/components/component.py b/pros/common/ui/interactive/components/component.py index 158fc0bc..8ffe3759 100644 --- a/pros/common/ui/interactive/components/component.py +++ b/pros/common/ui/interactive/components/component.py @@ -4,7 +4,7 @@ from pros.common.ui.interactive.parameters.validatable_parameter import ValidatableParameter -class Component(object): +class Component: """ A Component is the basic building block of something to render to users. @@ -29,9 +29,9 @@ def get_hierarchy(cls, base: type) -> Optional[List[str]]: return None def __getstate__(self) -> Dict: - return dict( - etype=Component.get_hierarchy(self.__class__) - ) + return { + "etype": Component.get_hierarchy(self.__class__) + } P = TypeVar('P', bound=Parameter) @@ -52,12 +52,12 @@ def __getstate__(self): reason = self.parameter.is_valid_reason() if reason: extra_state['valid_reason'] = self.parameter.is_valid_reason() - return dict( + return { **super(ParameterizedComponent, self).__getstate__(), **extra_state, - value=self.parameter.value, - uuid=self.parameter.uuid, - ) + "value": self.parameter.value, + "uuid": self.parameter.uuid, + } class BasicParameterizedComponent(ParameterizedComponent[P], Generic[P]): @@ -70,7 +70,7 @@ def __init__(self, label: AnyStr, parameter: P): self.label = label def __getstate__(self): - return dict( + return { **super(BasicParameterizedComponent, self).__getstate__(), - text=self.label, - ) + "text": self.label, + } diff --git a/pros/common/ui/interactive/components/container.py b/pros/common/ui/interactive/components/container.py index 8b8615f4..23ece2f8 100644 --- a/pros/common/ui/interactive/components/container.py +++ b/pros/common/ui/interactive/components/container.py @@ -1,4 +1,4 @@ -from typing import * +from typing import AnyStr, Optional, Union from pros.common.ui.interactive.parameters import BooleanParameter from .component import Component @@ -26,8 +26,8 @@ def __getstate__(self): extra_state['title'] = self.title if self.description is not None: extra_state['description'] = self.description - return dict( + return { **super(Container, self).__getstate__(), **extra_state, - elements=[e.__getstate__() for e in self.elements] - ) + "elements": [e.__getstate__() for e in self.elements] + } diff --git a/pros/common/ui/interactive/components/input.py b/pros/common/ui/interactive/components/input.py index 8d35b5e8..c543de53 100644 --- a/pros/common/ui/interactive/components/input.py +++ b/pros/common/ui/interactive/components/input.py @@ -16,10 +16,10 @@ def __getstate__(self) -> dict: extra_state = {} if self.placeholder is not None: extra_state['placeholder'] = self.placeholder - return dict( + return { **super(InputBox, self).__getstate__(), **extra_state, - ) + } class FileSelector(InputBox[P], Generic[P]): diff --git a/pros/common/ui/interactive/components/input_groups.py b/pros/common/ui/interactive/components/input_groups.py index 93171cfd..607e59c3 100644 --- a/pros/common/ui/interactive/components/input_groups.py +++ b/pros/common/ui/interactive/components/input_groups.py @@ -4,10 +4,10 @@ class DropDownBox(BasicParameterizedComponent[OptionParameter]): def __getstate__(self): - return dict( + return { **super(DropDownBox, self).__getstate__(), - options=self.parameter.options - ) + "options": self.parameter.options + } class ButtonGroup(DropDownBox): diff --git a/pros/common/ui/interactive/components/label.py b/pros/common/ui/interactive/components/label.py index 8b060300..3559f9d5 100644 --- a/pros/common/ui/interactive/components/label.py +++ b/pros/common/ui/interactive/components/label.py @@ -8,17 +8,16 @@ def __init__(self, text: AnyStr): self.text = text def __getstate__(self): - return dict( + return { **super(Label, self).__getstate__(), - text=self.text - ) + "text": self.text + } class VerbatimLabel(Label): """ Should be displayed with a monospace font """ - pass class Spinner(Label): diff --git a/pros/common/ui/interactive/ConfirmModal.py b/pros/common/ui/interactive/confirm_modal.py similarity index 91% rename from pros/common/ui/interactive/ConfirmModal.py rename to pros/common/ui/interactive/confirm_modal.py index d4c59235..6c46f036 100644 --- a/pros/common/ui/interactive/ConfirmModal.py +++ b/pros/common/ui/interactive/confirm_modal.py @@ -14,11 +14,11 @@ def __init__(self, text: str, abort: bool = False, title: AnyStr = 'Please confi super().__init__(title, will_abort=abort, confirm_button='Yes', cancel_button='No', description=text) self.log = log - def confirm(self): + def confirm(self, *args, **kwargs): self.set_return(True) self.exit() - def cancel(self): + def cancel(self, *args, **kwargs): self.set_return(False) super(ConfirmModal, self).cancel() diff --git a/pros/common/ui/interactive/observable.py b/pros/common/ui/interactive/observable.py index ec8b0855..647bb3aa 100644 --- a/pros/common/ui/interactive/observable.py +++ b/pros/common/ui/interactive/observable.py @@ -6,7 +6,7 @@ from pros.common import logger -_uuid_table = dict() # type: Dict[str, Observable] +_uuid_table = {} # type: Dict[str, Observable] class Observable(observable.Observable): diff --git a/pros/common/ui/interactive/parameters/misc_parameters.py b/pros/common/ui/interactive/parameters/misc_parameters.py index f19edba9..83b2dc92 100644 --- a/pros/common/ui/interactive/parameters/misc_parameters.py +++ b/pros/common/ui/interactive/parameters/misc_parameters.py @@ -25,15 +25,14 @@ def update(self, new_value): class RangeParameter(ValidatableParameter[int]): - def __init__(self, initial_value: int, range: Tuple[int, int]): + def __init__(self, initial_value: int, value_range: Tuple[int, int]): super().__init__(initial_value) - self.range = range + self.value_range = value_range def validate(self, value: T): - if self.range[0] <= value <= self.range[1]: + if self.value_range[0] <= value <= self.value_range[1]: return True - else: - return f'{value} is not within [{self.range[0]}, {self.range[1]}]' + return f'{value} is not within [{self.value_range[0]}, {self.value_range[1]}]' def update(self, new_value): super(RangeParameter, self).update(int(new_value)) diff --git a/pros/common/ui/interactive/parameters/validatable_parameter.py b/pros/common/ui/interactive/parameters/validatable_parameter.py index ceafd59f..2bcf1e22 100644 --- a/pros/common/ui/interactive/parameters/validatable_parameter.py +++ b/pros/common/ui/interactive/parameters/validatable_parameter.py @@ -20,7 +20,7 @@ def __init__(self, initial_value: T, allow_invalid_input: bool = True, """ super().__init__(initial_value) self.allow_invalid_input = allow_invalid_input - self.validate_lambda = validate or (lambda v: bool(v)) + self.validate_lambda = validate or bool def validate(self, value: T) -> Union[bool, str]: return self.validate_lambda(value) @@ -29,8 +29,7 @@ def is_valid(self, value: T = None) -> bool: rv = self.validate(value if value is not None else self.value) if isinstance(rv, bool): return rv - else: - return False + return False def is_valid_reason(self, value: T = None) -> Optional[str]: rv = self.validate(value if value is not None else self.value) diff --git a/pros/common/ui/interactive/renderers/__init__.py b/pros/common/ui/interactive/renderers/__init__.py index b032cb28..e759f9fe 100644 --- a/pros/common/ui/interactive/renderers/__init__.py +++ b/pros/common/ui/interactive/renderers/__init__.py @@ -1 +1 @@ -from .MachineOutputRenderer import MachineOutputRenderer +from .machine_output_renderer import MachineOutputRenderer diff --git a/pros/common/ui/interactive/renderers/MachineOutputRenderer.py b/pros/common/ui/interactive/renderers/machine_output_renderer.py similarity index 93% rename from pros/common/ui/interactive/renderers/MachineOutputRenderer.py rename to pros/common/ui/interactive/renderers/machine_output_renderer.py index 4bb5eddb..a22cbe79 100644 --- a/pros/common/ui/interactive/renderers/MachineOutputRenderer.py +++ b/pros/common/ui/interactive/renderers/machine_output_renderer.py @@ -6,30 +6,24 @@ from pros.common import ui from pros.common.ui.interactive.observable import Observable -from .Renderer import Renderer +from .renderer import Renderer from ..application import Application current: List['MachineOutputRenderer'] = [] def _push_renderer(renderer: 'MachineOutputRenderer'): - global current - stack: List['MachineOutputRenderer'] = current stack.append(renderer) def _remove_renderer(renderer: 'MachineOutputRenderer'): - global current - stack: List['MachineOutputRenderer'] = current if renderer in stack: stack.remove(renderer) def _current_renderer() -> Optional['MachineOutputRenderer']: - global current - stack: List['MachineOutputRenderer'] = current return stack[-1] if len(stack) > 0 else None @@ -39,8 +33,6 @@ def _current_renderer() -> Optional['MachineOutputRenderer']: class MachineOutputRenderer(Renderer[P], Generic[P]): def __init__(self, app: Application[P]): - global current - super().__init__(app) self.alive = False self.thread = None @@ -50,7 +42,7 @@ def __init__(self, app: Application[P]): def on_redraw(): self.render(self.app) - app.on_exit(lambda: self.stop()) + app.on_exit(self.stop) @staticmethod def get_line(): @@ -89,7 +81,7 @@ def stop(self): if current_thread() != self.thread: ui.logger(__name__).debug(f'Interrupting render thread of {self.app}') - while not self.stop_sem.acquire(timeout=0.1): + while not self.stop_sem.acquire(timeout=0.1): # pylint: disable=consider-using-with self.wake_me() ui.logger(__name__).debug(f'Broadcasting stop {self.app}') diff --git a/pros/common/ui/interactive/renderers/Renderer.py b/pros/common/ui/interactive/renderers/renderer.py similarity index 100% rename from pros/common/ui/interactive/renderers/Renderer.py rename to pros/common/ui/interactive/renderers/renderer.py diff --git a/pros/common/ui/log.py b/pros/common/ui/log.py index 8202ef95..57c94dc6 100644 --- a/pros/common/ui/log.py +++ b/pros/common/ui/log.py @@ -48,5 +48,4 @@ class PROSLogFormatter(logging.Formatter): def formatException(self, ei): if not isdebug(): return '\n'.join(super().formatException(ei).split('\n')[-3:]) - else: - return super().formatException(ei) + return super().formatException(ei) diff --git a/pros/common/utils.py b/pros/common/utils.py index d74d9a2d..a786db13 100644 --- a/pros/common/utils.py +++ b/pros/common/utils.py @@ -13,7 +13,8 @@ @lru_cache(1) def get_version(): try: - ver = open(os.path.join(os.path.dirname(__file__), '..', '..', 'version')).read().strip() + with open(os.path.join(os.path.dirname(__file__), '..', '..', 'version')) as version_file: + ver = version_file.read().strip() if ver is not None: return ver except: @@ -33,6 +34,7 @@ def get_version(): else: import pros.cli.main module = pros.cli.main.__name__ + # pylint: disable=not-an-iterable for dist in pkg_resources.working_set: scripts = dist.get_entry_map().get('console_scripts') or {} for _, entry_point in iter(scripts.items()): @@ -51,8 +53,7 @@ def retries_wrapper(*args, n_retries: int = retry, **kwargs): except Exception as e: if n_retries > 0: return retries_wrapper(*args, n_retries=n_retries - 1, **kwargs) - else: - raise e + raise e return retries_wrapper @@ -78,8 +79,7 @@ def ismachineoutput(ctx: click.Context = None) -> bool: ctx.ensure_object(dict) assert isinstance(ctx.obj, dict) return ctx.obj.get('machine_output', False) - else: - return False + return False def get_pros_dir(): @@ -90,15 +90,16 @@ def with_click_context(func): ctx = click.get_current_context(silent=True) if not ctx or not isinstance(ctx, click.Context): return func - else: - def _wrap(*args, **kwargs): - with ctx: - try: - return func(*args, **kwargs) - except BaseException as e: - logger(__name__).exception(e) - return _wrap + def _wrap(*args, **kwargs): + with ctx: + try: + return func(*args, **kwargs) + except BaseException as e: + logger(__name__).exception(e) + return None + + return _wrap def download_file(url: str, ext: Optional[str] = None, desc: Optional[str] = None) -> Optional[str]: @@ -114,7 +115,7 @@ def download_file(url: str, ext: Optional[str] = None, desc: Optional[str] = Non # from rfc6266_parser import parse_requests_response import re - response = requests.get(url, stream=True) + response = requests.get(url, stream=True, timeout=10) if response.status_code == 200: filename: str = url.rsplit('/', 1)[-1] if 'Content-Disposition' in response.headers.keys(): diff --git a/pros/conductor/conductor.py b/pros/conductor/conductor.py index 4d82cd30..70488b47 100644 --- a/pros/conductor/conductor.py +++ b/pros/conductor/conductor.py @@ -1,16 +1,15 @@ import errno import os.path import shutil -from enum import Enum from pathlib import Path import sys from typing import * -import re import click from semantic_version import Spec, Version from pros.common import * +from pros.common import ui from pros.conductor.project import TemplateAction from pros.conductor.project.template_resolution import InvalidTemplateException from pros.config import Config @@ -26,8 +25,8 @@ """ # TBD? Currently, EarlyAccess value is stored in config file class ReleaseChannel(Enum): - Stable = 'stable' - Beta = 'beta' + STABLE = 'stable' + BETA = 'beta' """ def is_pathname_valid(pathname: str) -> bool: @@ -59,7 +58,7 @@ def is_pathname_valid(pathname: str) -> bool: # Check for emojis # https://stackoverflow.com/a/62898106/11177720 ranges = [ - (ord(u'\U0001F300'), ord(u"\U0001FAF6")), # 127744, 129782 + (ord('\U0001F300'), ord("\U0001FAF6")), # 127744, 129782 (126980, 127569), (169, 174), (8205, 12953) @@ -69,10 +68,9 @@ def is_pathname_valid(pathname: str) -> bool: for range_min, range_max in ranges: if range_min <= char_code <= range_max: return False - except TypeError as exc: + except TypeError: return False - else: - return True + return True class Conductor(Config): """ @@ -185,7 +183,7 @@ def purge_template(self, template: LocalTemplate): def resolve_templates(self, identifier: Union[str, BaseTemplate], allow_online: bool = True, allow_offline: bool = True, force_refresh: bool = False, unique: bool = True, **kwargs) -> List[BaseTemplate]: - results = list() if not unique else set() + results = [] if not unique else set() kernel_version = kwargs.get('kernel_version', None) if kwargs.get('early_access', None) is not None: use_early_access = kwargs.get('early_access', False) @@ -196,7 +194,7 @@ def resolve_templates(self, identifier: Union[str, BaseTemplate], allow_online: else: query = identifier if allow_offline: - offline_results = list() + offline_results = [] if use_early_access: offline_results.extend(filter(lambda t: t.satisfies(query, kernel_version=kernel_version), self.early_access_local_templates)) @@ -255,7 +253,7 @@ def resolve_template(self, identifier: Union[str, BaseTemplate], **kwargs) -> Op # there's a local template satisfying the query if len(local_templates) > 1: # This should never happen! Conductor state must be invalid - raise Exception(f'Multiple local templates satisfy {query.identifier}!') + raise RuntimeError(f'Multiple local templates satisfy {query.identifier}!') return local_templates[0] # prefer pros-mainline template second @@ -284,26 +282,26 @@ def apply_template(self, project: Project, identifier: Union[str, BaseTemplate], # warn and prompt user if upgrading to PROS 4 or downgrading to PROS 3 if template.name == 'kernel': - isProject = Project.find_project("") - if isProject: + is_project = Project.find_project("") + if is_project: curr_proj = Project() if curr_proj.kernel: if template.version[0] == '4' and curr_proj.kernel[0] == '3': - confirm = ui.confirm(f'Warning! Upgrading project to PROS 4 will cause breaking changes. ' - f'Do you still want to upgrade?') + confirm = ui.confirm('Warning! Upgrading project to PROS 4 will cause breaking changes. ' + 'Do you still want to upgrade?') if not confirm: raise dont_send( - InvalidTemplateException(f'Not upgrading')) + InvalidTemplateException('Not upgrading')) if template.version[0] == '3' and curr_proj.kernel[0] == '4': - confirm = ui.confirm(f'Warning! Downgrading project to PROS 3 will cause breaking changes. ' - f'Do you still want to downgrade?') + confirm = ui.confirm('Warning! Downgrading project to PROS 3 will cause breaking changes. ' + 'Do you still want to downgrade?') if not confirm: raise dont_send( - InvalidTemplateException(f'Not downgrading')) + InvalidTemplateException('Not downgrading')) elif not project.use_early_access and template.version[0] == '3' and not self.warn_early_access: - confirm = ui.confirm(f'PROS 4 is now in early access. ' - f'Please use the --early-access flag if you would like to use it.\n' - f'Do you want to use PROS 4 instead?') + confirm = ui.confirm('PROS 4 is now in early access. ' + 'Please use the --early-access flag if you would like to use it.\n' + 'Do you want to use PROS 4 instead?') self.warn_early_access = True if confirm: # use pros 4 project.use_early_access = True @@ -311,7 +309,8 @@ def apply_template(self, project: Project, identifier: Union[str, BaseTemplate], kwargs['version'] = '>=0' kwargs['early_access'] = True # Recall the function with early access enabled - return self.apply_template(project, identifier, **kwargs) + self.apply_template(project, identifier, **kwargs) + return self.save() if not isinstance(template, LocalTemplate): @@ -321,19 +320,21 @@ def apply_template(self, project: Project, identifier: Union[str, BaseTemplate], logger(__name__).info(str(project)) valid_action = project.get_template_actions(template) - if valid_action == TemplateAction.NotApplicable: + if valid_action == TemplateAction.NOT_APPLICABLE: raise dont_send( InvalidTemplateException(f'{template.identifier} is not applicable to {project}', reason=valid_action) ) - if force \ - or (valid_action == TemplateAction.Upgradable and upgrade_ok) \ - or (valid_action == TemplateAction.Installable and install_ok) \ - or (valid_action == TemplateAction.Downgradable and downgrade_ok): + should_apply = force \ + or (valid_action == TemplateAction.UPGRADABLE and upgrade_ok) \ + or (valid_action == TemplateAction.INSTALLABLE and install_ok) \ + or (valid_action == TemplateAction.DOWNGRADABLE and downgrade_ok) + + if should_apply: project.apply_template(template, force_system=kwargs.pop('force_system', False), force_user=kwargs.pop('force_user', False), remove_empty_directories=kwargs.pop('remove_empty_directories', False)) ui.finalize('apply', f'Finished applying {template.identifier} to {project.location}') - elif valid_action != TemplateAction.AlreadyInstalled: + elif valid_action != TemplateAction.ALREADY_INSTALLED: raise dont_send( InvalidTemplateException(f'Could not install {template.identifier} because it is {valid_action.name},' f' and that is not allowed.', reason=valid_action) @@ -360,20 +361,20 @@ def new_project(self, path: str, no_default_libs: bool = False, **kwargs) -> Pro kwargs["early_access"] = use_early_access if kwargs["version_source"]: # If true, then the user has not specified a version if not use_early_access and self.warn_early_access: - ui.echo(f"PROS 4 is now in early access. " - f"If you would like to use it, use the --early-access flag.") + ui.echo("PROS 4 is now in early access. " + "If you would like to use it, use the --early-access flag.") elif not use_early_access and not self.warn_early_access: - confirm = ui.confirm(f'PROS 4 is now in early access. ' - f'Please use the --early-access flag if you would like to use it.\n' - f'Do you want to use PROS 4 instead?') + confirm = ui.confirm('PROS 4 is now in early access. ' + 'Please use the --early-access flag if you would like to use it.\n' + 'Do you want to use PROS 4 instead?') self.warn_early_access = True if confirm: use_early_access = True kwargs['early_access'] = True elif use_early_access: - ui.echo(f'Early access is enabled. Using PROS 4.') + ui.echo('Early access is enabled. Using PROS 4.') elif use_early_access: - ui.echo(f'Early access is enabled.') + ui.echo('Early access is enabled.') if not is_pathname_valid(str(Path(path).absolute())): raise dont_send(ValueError('Project path contains invalid characters.')) diff --git a/pros/conductor/depots/depot.py b/pros/conductor/depots/depot.py index 364d312f..8487b10d 100644 --- a/pros/conductor/depots/depot.py +++ b/pros/conductor/depots/depot.py @@ -1,13 +1,13 @@ from datetime import datetime, timedelta from typing import * -import pros.common.ui as ui +from pros.common import ui from pros.common import logger from pros.config.cli_config import cli_config from ..templates import BaseTemplate, Template -class Depot(object): +class Depot: def __init__(self, name: str, location: str, config: Dict[str, Any] = None, update_frequency: timedelta = timedelta(minutes=1), config_schema: Dict[str, Dict[str, Any]] = None): diff --git a/pros/conductor/depots/http_depot.py b/pros/conductor/depots/http_depot.py index dc7e3a25..3ba4358b 100644 --- a/pros/conductor/depots/http_depot.py +++ b/pros/conductor/depots/http_depot.py @@ -4,7 +4,7 @@ import jsonpickle -import pros.common.ui as ui +from pros.common import ui from pros.common import logger from pros.common.utils import download_file from .depot import Depot @@ -35,7 +35,7 @@ def fetch_template(self, template: BaseTemplate, destination: str, **kwargs): def update_remote_templates(self, **_): import requests - response = requests.get(self.location) + response = requests.get(self.location, timeout=10) if response.status_code == 200: self.remote_templates = jsonpickle.decode(response.text) else: diff --git a/pros/conductor/interactive/__init__.py b/pros/conductor/interactive/__init__.py index 89f1e51c..c70e4700 100644 --- a/pros/conductor/interactive/__init__.py +++ b/pros/conductor/interactive/__init__.py @@ -1,4 +1,4 @@ -from .NewProjectModal import NewProjectModal -from .UpdateProjectModal import UpdateProjectModal +from .new_project_modal import NewProjectModal +from .update_project_modal import UpdateProjectModal from .parameters import ExistingProjectParameter, NonExistentProjectParameter diff --git a/pros/conductor/interactive/NewProjectModal.py b/pros/conductor/interactive/new_project_modal.py similarity index 100% rename from pros/conductor/interactive/NewProjectModal.py rename to pros/conductor/interactive/new_project_modal.py diff --git a/pros/conductor/interactive/parameters.py b/pros/conductor/interactive/parameters.py index 7b0da738..d3c4e02f 100644 --- a/pros/conductor/interactive/parameters.py +++ b/pros/conductor/interactive/parameters.py @@ -64,7 +64,7 @@ def _update_versions(self): if self.name.value in self.options: self.version = p.OptionParameter( self.version.value if self.version else None, - list(sorted(self.options[self.name.value].keys(), reverse=True, key=lambda v: Version(v))) + list(sorted(self.options[self.name.value].keys(), reverse=True, key=Version)) ) if self.version.value not in self.version.options: diff --git a/pros/conductor/interactive/UpdateProjectModal.py b/pros/conductor/interactive/update_project_modal.py similarity index 89% rename from pros/conductor/interactive/UpdateProjectModal.py rename to pros/conductor/interactive/update_project_modal.py index 9cb5124e..0feb2633 100644 --- a/pros/conductor/interactive/UpdateProjectModal.py +++ b/pros/conductor/interactive/update_project_modal.py @@ -7,7 +7,7 @@ from pros.common import ui from pros.common.ui.interactive import application, components, parameters from pros.conductor import BaseTemplate, Conductor, Project -from pros.conductor.project.ProjectTransaction import ProjectTransaction +from pros.conductor.project.project_transaction import ProjectTransaction from .components import TemplateListingComponent from .parameters import ExistingProjectParameter, TemplateParameter @@ -25,9 +25,9 @@ def is_processing(self, value: bool): def _generate_transaction(self) -> ProjectTransaction: transaction = ProjectTransaction(self.project, self.conductor) - apply_kwargs = dict( - force_apply=self.force_apply_parameter.value - ) + apply_kwargs = { + "force_apply": self.force_apply_parameter.value + } if self.name.value != self.project.name: transaction.change_name(self.name.value) if self.project.template_is_applicable(self.current_kernel.value, **apply_kwargs): @@ -92,17 +92,18 @@ def project_changed(self, new_project: ExistingProjectParameter): self.current_kernel = TemplateParameter( None, options=sorted( - {t for t in self.conductor.resolve_templates(self.project.templates['kernel'].as_query())}, + set(self.conductor.resolve_templates(self.project.templates['kernel'].as_query())), key=lambda v: Version(v.version), reverse=True ) ) self.current_templates = [ TemplateParameter( None, - options=sorted({ - t - for t in self.conductor.resolve_templates(t.as_query()) - }, key=lambda v: Version(v.version), reverse=True) + options=sorted( + set(self.conductor.resolve_templates(t.as_query())), + key=lambda v: Version(v.version), + reverse=True + ) ) for t in self.project.templates.values() if t.name != 'kernel' @@ -129,9 +130,9 @@ def build(self) -> Generator[components.Component, None, None]: assert self.project is not None yield components.Label(f'Modify your {self.project.target} project.') yield components.InputBox('Project Name', self.name) - yield TemplateListingComponent(self.current_kernel, editable=dict(version=True), removable=False) + yield TemplateListingComponent(self.current_kernel, editable={"version": True}, removable=False) yield components.Container( - *(TemplateListingComponent(t, editable=dict(version=True), removable=True) for t in + *(TemplateListingComponent(t, editable={"version": True}, removable=True) for t in self.current_templates), *(TemplateListingComponent(t, editable=True, removable=True) for t in self.new_templates), self.add_template_button, diff --git a/pros/conductor/project/__init__.py b/pros/conductor/project/__init__.py index a0acbc61..c6da6b94 100644 --- a/pros/conductor/project/__init__.py +++ b/pros/conductor/project/__init__.py @@ -7,10 +7,11 @@ from typing import * from pros.common import * +from pros.common import ui from pros.common.ui import EchoPipe from pros.conductor.project.template_resolution import TemplateAction from pros.config.config import Config, ConfigNotFoundException -from .ProjectReport import ProjectReport +from .project_report import ProjectReport from ..templates import BaseTemplate, LocalTemplate, Template from ..transaction import Transaction @@ -68,37 +69,36 @@ def all_files(self) -> Set[str]: def get_template_actions(self, template: BaseTemplate) -> TemplateAction: ui.logger(__name__).debug(template) if template.target != self.target: - return TemplateAction.NotApplicable + return TemplateAction.NOT_APPLICABLE from semantic_version import Spec, Version if template.name != 'kernel' and Version(self.kernel) not in Spec(template.supported_kernels or '>0'): if template.name in self.templates.keys(): - return TemplateAction.AlreadyInstalled - return TemplateAction.NotApplicable + return TemplateAction.ALREADY_INSTALLED + return TemplateAction.NOT_APPLICABLE for current in self.templates.values(): if template.name != current.name: continue if template > current: - return TemplateAction.Upgradable + return TemplateAction.UPGRADABLE if template == current: - return TemplateAction.AlreadyInstalled + return TemplateAction.ALREADY_INSTALLED if current > template: - return TemplateAction.Downgradable + return TemplateAction.DOWNGRADABLE - if any([template > current for current in self.templates.values()]): - return TemplateAction.Upgradable - else: - return TemplateAction.Installable + if any(template > current for current in self.templates.values()): + return TemplateAction.UPGRADABLE + return TemplateAction.INSTALLABLE def template_is_installed(self, query: BaseTemplate) -> bool: - return self.get_template_actions(query) == TemplateAction.AlreadyInstalled + return self.get_template_actions(query) == TemplateAction.ALREADY_INSTALLED def template_is_upgradeable(self, query: BaseTemplate) -> bool: - return self.get_template_actions(query) == TemplateAction.Upgradable + return self.get_template_actions(query) == TemplateAction.UPGRADABLE def template_is_applicable(self, query: BaseTemplate, force_apply: bool = False) -> bool: ui.logger(__name__).debug(query.target) return self.get_template_actions(query) in ( - TemplateAction.ForcedApplicable if force_apply else TemplateAction.UnforcedApplicable) + TemplateAction.FORCED_APPLICABLE if force_apply else TemplateAction.UNFORCED_APPLICABLE) def apply_template(self, template: LocalTemplate, force_system: bool = False, force_user: bool = False, remove_empty_directories: bool = False): @@ -114,7 +114,7 @@ def apply_template(self, template: LocalTemplate, force_system: bool = False, fo transaction = Transaction(self.location, set(self.all_files)) installed_user_files = set() for lib_name, lib in self.templates.items(): - if lib_name == template.name or lib.name == template.name: + if template.name in (lib_name, lib.name): logger(__name__).debug(f'{lib} is already installed') logger(__name__).debug(lib.system_files) logger(__name__).debug(lib.user_files) @@ -144,8 +144,8 @@ def new_user_filter(new_file: str) -> bool: src/opcontrol.c and src/opcontrol.cpp are friends because they have the same stem src/opcontrol.c and include/opcontrol.h are not because they are in different directories """ - return not any([(os.path.normpath(file) in transaction.effective_state) for file in template.user_files if - os.path.splitext(file)[0] == os.path.splitext(new_file)[0]]) + return not any((os.path.normpath(file) in transaction.effective_state) for file in template.user_files if + os.path.splitext(file)[0] == os.path.splitext(new_file)[0]) if force_user: new_user_files = template.real_user_files @@ -153,7 +153,7 @@ def new_user_filter(new_file: str) -> bool: new_user_files = filter(new_user_filter, template.real_user_files) transaction.extend_add(new_user_files, template.location) - if any([file in transaction.effective_state for file in template.system_files]) and not force_system: + if any(file in transaction.effective_state for file in template.system_files) and not force_system: confirm(f'Some required files for {template.identifier} already exist in the project. ' f'Overwrite the existing files?', abort=True) transaction.extend_add(template.system_files, template.location) @@ -167,7 +167,7 @@ def remove_template(self, template: Template, remove_user: bool = False, remove_ if not self.template_is_installed(template): raise ValueError(f'{template.identifier} is not installed on this project.') if template.name == 'kernel': - raise ValueError(f'Cannot remove the kernel template. Maybe create a new project?') + raise ValueError('Cannot remove the kernel template. Maybe create a new project?') real_template = LocalTemplate(orig=template, location=self.location) transaction = Transaction(self.location, set(self.all_files)) @@ -203,7 +203,7 @@ def __str__(self): def kernel(self): if 'kernel' in self.templates: return self.templates['kernel'].version - elif hasattr(self.__dict__, 'kernel'): + if hasattr(self.__dict__, 'kernel'): return self.__dict__['kernel'] return '' @@ -211,7 +211,7 @@ def kernel(self): def output(self): if 'kernel' in self.templates: return self.templates['kernel'].metadata['output'] - elif hasattr(self.__dict__, 'output'): + if hasattr(self.__dict__, 'output'): return self.__dict__['output'] return 'bin/output.bin' @@ -227,6 +227,7 @@ def make(self, build_args: List[str]): make_cmd = os.path.join(os.environ.get('PROS_TOOLCHAIN'), 'bin', 'make.exe') else: make_cmd = 'make' + # pylint: disable=consider-using-with stdout_pipe = EchoPipe() stderr_pipe = EchoPipe(err=True) process=None @@ -235,7 +236,7 @@ def make(self, build_args: List[str]): stdout=stdout_pipe, stderr=stderr_pipe) except Exception as e: if not os.environ.get('PROS_TOOLCHAIN'): - ui.logger(__name__).warn("PROS toolchain not found! Please ensure the toolchain is installed correctly and your environment variables are set properly.\n") + ui.logger(__name__).warning("PROS toolchain not found! Please ensure the toolchain is installed correctly and your environment variables are set properly.\n") ui.logger(__name__).error(f"ERROR WHILE CALLING '{make_cmd}' WITH EXCEPTION: {str(e)}\n",extra={'sentry':False}) stdout_pipe.close() stderr_pipe.close() @@ -256,7 +257,7 @@ def make_scan_build(self, build_args: Tuple[str], cdb_file: Optional[Union[str, if sandbox: import tempfile - td = tempfile.TemporaryDirectory() + td = tempfile.TemporaryDirectory() # pylint: disable=consider-using-with td_path = td.name.replace("\\", "/") build_args = [*build_args, f'BINDIR={td_path}'] @@ -292,7 +293,7 @@ def libscanbuild_capture(args: argparse.Namespace) -> Tuple[int, Iterable[Compil exit_code = run_build(args.build, env=environment, stdout=pipe, stderr=pipe, cwd=self.directory) except Exception as e: if not os.environ.get('PROS_TOOLCHAIN'): - ui.logger(__name__).warn("PROS toolchain not found! Please ensure the toolchain is installed correctly and your environment variables are set properly.\n") + ui.logger(__name__).warning("PROS toolchain not found! Please ensure the toolchain is installed correctly and your environment variables are set properly.\n") ui.logger(__name__).error(f"ERROR WHILE CALLING '{make_cmd}' WITH EXCEPTION: {str(e)}\n",extra={'sentry':False}) if not suppress_output: pipe.close() @@ -329,7 +330,7 @@ def libscanbuild_capture(args: argparse.Namespace) -> Tuple[int, Iterable[Compil if os.environ.get('PROS_TOOLCHAIN'): env['PATH'] = os.path.join(os.environ.get('PROS_TOOLCHAIN'), 'bin') + os.pathsep + env['PATH'] cc_sysroot = subprocess.run([make_cmd, 'cc-sysroot'], env=env, stdout=subprocess.PIPE, - stderr=subprocess.PIPE, cwd=self.directory) + stderr=subprocess.PIPE, cwd=self.directory, check=False) lines = str(cc_sysroot.stderr.decode()).splitlines() + str(cc_sysroot.stdout.decode()).splitlines() lines = [l.strip() for l in lines] cc_sysroot_includes = [] @@ -344,7 +345,7 @@ def libscanbuild_capture(args: argparse.Namespace) -> Tuple[int, Iterable[Compil if copy: cc_sysroot_includes.append(f'-isystem{line}') cxx_sysroot = subprocess.run([make_cmd, 'cxx-sysroot'], env=env, stdout=subprocess.PIPE, - stderr=subprocess.PIPE, cwd=self.directory) + stderr=subprocess.PIPE, cwd=self.directory, check=False) lines = str(cxx_sysroot.stderr.decode()).splitlines() + str(cxx_sysroot.stdout.decode()).splitlines() lines = [l.strip() for l in lines] cxx_sysroot_includes = [] @@ -359,7 +360,7 @@ def libscanbuild_capture(args: argparse.Namespace) -> Tuple[int, Iterable[Compil if copy: cxx_sysroot_includes.append(f'-isystem{line}') new_entries, entries = itertools.tee(entries, 2) - new_sources = set([e.source for e in entries]) + new_sources = {e.source for e in entries} if not cdb_file: cdb_file = os.path.join(self.directory, 'compile_commands.json') if isinstance(cdb_file, str) and os.path.isfile(cdb_file): @@ -394,7 +395,7 @@ def entry_map(entry: Compilation): entries = itertools.chain(old_entries, new_entries) json_entries = list(map(entry_map, entries)) if isinstance(cdb_file, str): - cdb_file = open(cdb_file, 'w') + cdb_file = open(cdb_file, 'w') # pylint: disable=consider-using-with import json json.dump(json_entries, cdb_file, sort_keys=True, indent=4) diff --git a/pros/conductor/project/ProjectReport.py b/pros/conductor/project/project_report.py similarity index 96% rename from pros/conductor/project/ProjectReport.py rename to pros/conductor/project/project_report.py index 75d2ff3a..f33d4792 100644 --- a/pros/conductor/project/ProjectReport.py +++ b/pros/conductor/project/project_report.py @@ -1,7 +1,7 @@ import os.path -class ProjectReport(object): +class ProjectReport: def __init__(self, project: 'Project'): self.project = { "target": project.target, diff --git a/pros/conductor/project/ProjectTransaction.py b/pros/conductor/project/project_transaction.py similarity index 85% rename from pros/conductor/project/ProjectTransaction.py rename to pros/conductor/project/project_transaction.py index edae1330..04bdb6f8 100644 --- a/pros/conductor/project/ProjectTransaction.py +++ b/pros/conductor/project/project_transaction.py @@ -4,12 +4,12 @@ import zipfile from typing import * -import pros.common.ui as ui +from pros.common import ui import pros.conductor as c from pros.conductor.project.template_resolution import InvalidTemplateException, TemplateAction -class Action(object): +class Action: def execute(self, conductor: c.Conductor, project: c.Project) -> None: raise NotImplementedError() @@ -32,36 +32,35 @@ def execute(self, conductor: c.Conductor, project: c.Project): try: conductor.apply_template(project, self.template, **self.apply_kwargs) except InvalidTemplateException as e: - if e.reason != TemplateAction.AlreadyInstalled or not self.suppress_already_installed: + if e.reason != TemplateAction.ALREADY_INSTALLED or not self.suppress_already_installed: raise e - else: - ui.logger(__name__).warning(str(e)) + ui.logger(__name__).warning(str(e)) def describe(self, conductor: c.Conductor, project: c.Project): action = project.get_template_actions(conductor.resolve_template(self.template)) - if action == TemplateAction.NotApplicable: + if action == TemplateAction.NOT_APPLICABLE: return f'{self.template.identifier} cannot be applied to project!' - if action == TemplateAction.Installable: + if action == TemplateAction.INSTALLABLE: return f'{self.template.identifier} will installed to project.' - if action == TemplateAction.Downgradable: + if action == TemplateAction.DOWNGRADABLE: return f'Project will be downgraded to {self.template.identifier} from' \ f' {project.templates[self.template.name].version}.' - if action == TemplateAction.Upgradable: + if action == TemplateAction.UPGRADABLE: return f'Project will be upgraded to {self.template.identifier} from' \ f' {project.templates[self.template.name].version}.' - if action == TemplateAction.AlreadyInstalled: + if action == TemplateAction.ALREADY_INSTALLED: if self.apply_kwargs.get('force_apply'): return f'{self.template.identifier} will be re-applied.' - elif self.suppress_already_installed: + if self.suppress_already_installed: return f'{self.template.identifier} will not be re-applied.' - else: - return f'{self.template.identifier} cannot be applied to project because it is already installed.' + return f'{self.template.identifier} cannot be applied to project because it is already installed.' + return None def can_execute(self, conductor: c.Conductor, project: c.Project) -> bool: action = project.get_template_actions(conductor.resolve_template(self.template)) - if action == TemplateAction.AlreadyInstalled: + if action == TemplateAction.ALREADY_INSTALLED: return self.apply_kwargs.get('force_apply') or self.suppress_already_installed - return action in [TemplateAction.Installable, TemplateAction.Downgradable, TemplateAction.Upgradable] + return action in [TemplateAction.INSTALLABLE, TemplateAction.DOWNGRADABLE, TemplateAction.UPGRADABLE] class RemoveTemplateAction(Action): @@ -77,8 +76,7 @@ def execute(self, conductor: c.Conductor, project: c.Project): except ValueError as e: if not self.suppress_not_removable: raise e - else: - ui.logger(__name__).warning(str(e)) + ui.logger(__name__).warning(str(e)) def describe(self, conductor: c.Conductor, project: c.Project) -> str: return f'{self.template.identifier} will be removed' @@ -102,7 +100,7 @@ def can_execute(self, conductor: c.Conductor, project: c.Project): return True -class ProjectTransaction(object): +class ProjectTransaction: def __init__(self, project: c.Project, conductor: Optional[c.Conductor] = None): self.project = project self.conductor = conductor or c.Conductor() @@ -135,7 +133,7 @@ def execute(self): raise ValueError('Action did not complete successfully') ui.echo('All actions performed successfully') except Exception as e: - ui.logger(__name__).warning(f'Failed to perform transaction, restoring project to previous state') + ui.logger(__name__).warning('Failed to perform transaction, restoring project to previous state') with zipfile.ZipFile(tfn) as zf: with ui.progressbar(zf.namelist(), label=f'Restoring {self.project.name} from {tfn}') as pb: @@ -166,8 +164,7 @@ def describe(self) -> str: f'- {a.describe(self.conductor, self.project)}' for a in self.actions ) - else: - return 'No actions necessary.' + return 'No actions necessary.' def can_execute(self) -> bool: return all(a.can_execute(self.conductor, self.project) for a in self.actions) diff --git a/pros/conductor/project/template_resolution.py b/pros/conductor/project/template_resolution.py index f9b9aeb0..62b14a50 100644 --- a/pros/conductor/project/template_resolution.py +++ b/pros/conductor/project/template_resolution.py @@ -2,14 +2,14 @@ class TemplateAction(Flag): - NotApplicable = auto() - Installable = auto() - Upgradable = auto() - AlreadyInstalled = auto() - Downgradable = auto() + NOT_APPLICABLE = auto() + INSTALLABLE = auto() + UPGRADABLE = auto() + ALREADY_INSTALLED = auto() + DOWNGRADABLE = auto() - UnforcedApplicable = Installable | Upgradable | Downgradable - ForcedApplicable = UnforcedApplicable | AlreadyInstalled + UNFORCED_APPLICABLE = INSTALLABLE | UPGRADABLE | DOWNGRADABLE + FORCED_APPLICABLE = UNFORCED_APPLICABLE | ALREADY_INSTALLED class InvalidTemplateException(Exception): diff --git a/pros/conductor/templates/base_template.py b/pros/conductor/templates/base_template.py index 95a19064..2b993cf2 100644 --- a/pros/conductor/templates/base_template.py +++ b/pros/conductor/templates/base_template.py @@ -5,7 +5,7 @@ from pros.common import ui -class BaseTemplate(object): +class BaseTemplate: def __init__(self, **kwargs): self.name: str = None self.version: str = None @@ -40,7 +40,7 @@ def satisfies(self, query: 'BaseTemplate', kernel_version: Union[str, Version] = # Find the intersection of the keys in the template's metadata with the keys in the query metadata # This is what allows us to throw all arguments into the query metadata (from the CLI, e.g. those intended # for the depot or template application hints) - if any([self.metadata[k] != query.metadata[k] for k in keys_intersection]): + if any(self.metadata[k] != query.metadata[k] for k in keys_intersection): return False return True @@ -53,21 +53,19 @@ def __gt__(self, other): if isinstance(other, BaseTemplate): # TODO: metadata comparison return self.name == other.name and Version(self.version) > Version(other.version) - else: - return False + return False def __eq__(self, other): if isinstance(other, BaseTemplate): return self.identifier == other.identifier - else: - return super().__eq__(other) + return super().__eq__(other) def __hash__(self): return self.identifier.__hash__() def as_query(self, version='>0', metadata=False, **kwargs): if isinstance(metadata, bool) and not metadata: - metadata = dict() + metadata = {} return BaseTemplate(orig=self, version=version, metadata=metadata, **kwargs) @property diff --git a/pros/conductor/templates/external_template.py b/pros/conductor/templates/external_template.py index ce08662e..77017169 100644 --- a/pros/conductor/templates/external_template.py +++ b/pros/conductor/templates/external_template.py @@ -12,7 +12,7 @@ def __init__(self, file: str, **kwargs): if os.path.isdir(file): file = os.path.join(file, 'template.pros') elif zipfile.is_zipfile(file): - self.tf = tempfile.NamedTemporaryFile(delete=False) + self.tf = tempfile.NamedTemporaryFile(delete=False) # pylint: disable=consider-using-with with zipfile.ZipFile(file) as zf: with zf.open('template.pros') as zt: self.tf.write(zt.read()) diff --git a/pros/conductor/transaction.py b/pros/conductor/transaction.py index 0fcb05d7..a79d8c4e 100644 --- a/pros/conductor/transaction.py +++ b/pros/conductor/transaction.py @@ -2,11 +2,11 @@ import shutil from typing import * -import pros.common.ui as ui +from pros.common import ui from pros.common import logger -class Transaction(object): +class Transaction: def __init__(self, location: str, current_state: Set[str]): self._add_files: Set[str] = set() self._rm_files: Set[str] = set() diff --git a/pros/config/cli_config.py b/pros/config/cli_config.py index 8c962047..0e4fe101 100644 --- a/pros/config/cli_config.py +++ b/pros/config/cli_config.py @@ -21,6 +21,7 @@ def __init__(self, file=None): self.override_use_build_compile_commands: Optional[bool] = None self.offer_sentry: Optional[bool] = None self.ga: Optional[dict] = None + self.cached_upgrade: Optional[Tuple[datetime, 'UpgradeManifestV1']] = None super(CliConfig, self).__init__(file) def needs_online_fetch(self, last_fetch: datetime) -> bool: @@ -31,17 +32,17 @@ def use_build_compile_commands(self): if self.override_use_build_compile_commands is not None: return self.override_use_build_compile_commands paths = [os.path.join('~', '.pros-atom'), os.path.join('~', '.pros-editor')] - return any([os.path.exists(os.path.expanduser(p)) for p in paths]) + return any(os.path.exists(os.path.expanduser(p)) for p in paths) def get_upgrade_manifest(self, force: bool = False) -> Optional['UpgradeManifestV1']: from pros.upgrade.manifests.upgrade_manifest_v1 import UpgradeManifestV1 # noqa: F811 - if not force and not self.needs_online_fetch(self.cached_upgrade[0]): + if not force and self.cached_upgrade is not None and not self.needs_online_fetch(self.cached_upgrade[0]): return self.cached_upgrade[1] pros.common.logger(__name__).info('Fetching upgrade manifest...') import requests import jsonpickle - r = requests.get('https://purduesigbots.github.io/pros-mainline/cli-updates.json') + r = requests.get('https://purduesigbots.github.io/pros-mainline/cli-updates.json', timeout=10) pros.common.logger(__name__).debug(r) if r.status_code == 200: try: @@ -52,10 +53,9 @@ def get_upgrade_manifest(self, force: bool = False) -> Optional['UpgradeManifest pros.common.logger(__name__).debug(self.cached_upgrade[1]) self.save() return self.cached_upgrade[1] - else: - pros.common.logger(__name__).warning(f'Failed to fetch CLI updates because status code: {r.status_code}') - pros.common.logger(__name__).debug(r) - return None + pros.common.logger(__name__).warning(f'Failed to fetch CLI updates because status code: {r.status_code}') + pros.common.logger(__name__).debug(r) + return None def cli_config() -> CliConfig: diff --git a/pros/config/config.py b/pros/config/config.py index 984b668a..1b66cad1 100644 --- a/pros/config/config.py +++ b/pros/config/config.py @@ -10,7 +10,7 @@ def __init__(self, message, *args, **kwargs): self.message = message -class Config(object): +class Config: """ A configuration object that's capable of being saved as a JSON object """ @@ -51,9 +51,7 @@ def __init__(self, file, error_on_decode=False): logger(__name__).error(f'Error parsing {file}') logger(__name__).exception(e) raise e - else: - logger(__name__).debug(e) - pass + logger(__name__).debug(e) # obvious elif os.path.isdir(file): raise ValueError('{} must be a file, not a directory'.format(file)) @@ -65,8 +63,7 @@ def __init__(self, file, error_on_decode=False): if error_on_decode: logger(__name__).exception(e) raise e - else: - logger(__name__).debug('Failed to save {} ({})'.format(file, e)) + logger(__name__).debug('Failed to save {} ({})'.format(file, e)) from pros.common.sentry import add_context add_context(self) diff --git a/pros/ga/analytics.py b/pros/ga/analytics.py index 247e6b31..88b57902 100644 --- a/pros/ga/analytics.py +++ b/pros/ga/analytics.py @@ -1,13 +1,10 @@ -import json -from os import path import uuid -import requests from requests_futures.sessions import FuturesSession import random from concurrent.futures import as_completed -url = 'https://www.google-analytics.com/collect' -agent = 'pros-cli' +URL = 'https://www.google-analytics.com/collect' +AGENT = 'pros-cli' """ PROS ANALYTICS CLASS @@ -28,23 +25,23 @@ def __init__(self): self.cli_config.save() self.sent = False #Variables that the class will use - self.gaID = self.cli_config.ga['ga_id'] - self.useAnalytics = self.cli_config.ga['enabled'] - self.uID = self.cli_config.ga['u_id'] - self.pendingRequests = [] + self.ga_id = self.cli_config.ga['ga_id'] + self.use_analytics = self.cli_config.ga['enabled'] + self.u_id = self.cli_config.ga['u_id'] + self.pending_requests = [] def send(self,action): - if not self.useAnalytics or self.sent: + if not self.use_analytics or self.sent: return self.sent=True # Prevent Send from being called multiple times try: #Payload to be sent to GA, idk what some of them are but it works payload = { 'v': 1, - 'tid': self.gaID, + 'tid': self.ga_id, 'aip': 1, 'z': random.random(), - 'cid': self.uID, + 'cid': self.u_id, 't': 'event', 'ec': 'action', 'ea': action, @@ -56,11 +53,11 @@ def send(self,action): session = FuturesSession() #Send payload to GA servers - future = session.post(url=url, + future = session.post(url=URL, data=payload, - headers={'User-Agent': agent}, + headers={'User-Agent': AGENT}, timeout=5.0) - self.pendingRequests.append(future) + self.pending_requests.append(future) except Exception: from pros.cli.common import logger @@ -68,13 +65,13 @@ def send(self,action): def set_use(self, value: bool): #Sets if GA is being used or not - self.useAnalytics = value - self.cli_config.ga['enabled'] = self.useAnalytics + self.use_analytics = value + self.cli_config.ga['enabled'] = self.use_analytics self.cli_config.save() def process_requests(self): responses = [] - for future in as_completed(self.pendingRequests): + for future in as_completed(self.pending_requests): try: response = future.result() @@ -88,7 +85,7 @@ def process_requests(self): print("Something went wrong while sending analytics!") - self.pendingRequests.clear() + self.pending_requests.clear() return responses diff --git a/pros/jinx/server.py b/pros/jinx/server.py index 31f848cf..ac87968a 100644 --- a/pros/jinx/server.py +++ b/pros/jinx/server.py @@ -1,6 +1,6 @@ from pros.serial.devices import StreamDevice -class JinxServer(object): +class JinxServer: def __init__(self, device: StreamDevice): self.device = device diff --git a/pros/serial/__init__.py b/pros/serial/__init__.py index 0177d021..2ad4861b 100644 --- a/pros/serial/__init__.py +++ b/pros/serial/__init__.py @@ -6,8 +6,8 @@ def bytes_to_str(arr): arr = bytes(arr) if hasattr(arr, '__iter__'): return ''.join('{:02X} '.format(x) for x in arr).strip() - else: # actually just a single byte - return '0x{:02X}'.format(arr) + # actually just a single byte + return '0x{:02X}'.format(arr) def decode_bytes_to_str(data: Union[bytes, bytearray], encoding: str = 'utf-8', errors: str = 'strict') -> str: diff --git a/pros/serial/devices/generic_device.py b/pros/serial/devices/generic_device.py index 0e139fc8..ac500e2d 100644 --- a/pros/serial/devices/generic_device.py +++ b/pros/serial/devices/generic_device.py @@ -1,7 +1,7 @@ from ..ports import BasePort -class GenericDevice(object): +class GenericDevice: def __init__(self, port: BasePort): self.port = port diff --git a/pros/serial/devices/system_device.py b/pros/serial/devices/system_device.py index 6511c4cd..aa2351ae 100644 --- a/pros/serial/devices/system_device.py +++ b/pros/serial/devices/system_device.py @@ -3,7 +3,7 @@ from pros.conductor import Project -class SystemDevice(object): +class SystemDevice: def upload_project(self, project: Project, **kwargs): raise NotImplementedError diff --git a/pros/serial/devices/vex/cortex_device.py b/pros/serial/devices/vex/cortex_device.py index 02dbfe0f..7e57d767 100644 --- a/pros/serial/devices/vex/cortex_device.py +++ b/pros/serial/devices/vex/cortex_device.py @@ -22,7 +22,7 @@ def find_cortex_ports(): class CortexDevice(VEXDevice, SystemDevice): - class SystemStatus(object): + class SystemStatus: def __init__(self, data: Tuple[bytes, ...]): self.joystick_firmware = data[0:2] self.robot_firmware = data[2:4] @@ -39,13 +39,13 @@ def __str__(self): f'{self.joystick_battery:1.2f} V' class SystemStatusFlags(IntFlag): - DL_MODE = (1 << 0) - TETH_VN2 = (1 << 2) - FCS_CONNECT = (1 << 3) - TETH_USB = (1 << 4) - DIRECT_USB = (1 << 5) - FCS_AUTON = (1 << 6) - FCS_DISABLE = (1 << 7) + DL_MODE = 1 << 0 + TETH_VN2 = 1 << 2 + FCS_CONNECT = 1 << 3 + TETH_USB = 1 << 4 + DIRECT_USB = 1 << 5 + FCS_AUTON = 1 << 6 + FCS_DISABLE = 1 << 7 TETH_BITS = DL_MODE | TETH_VN2 | TETH_USB @@ -89,7 +89,7 @@ def upload_project(self, project: Project, **kwargs): with output_path.open(mode='rb') as pf: return self.write_program(pf, **kwargs) - def write_program(self, file: typing.BinaryIO, **kwargs): + def write_program(self, file: typing.BinaryIO, quirk: int = 0, **kwargs): action_string = '' if hasattr(file, 'name'): action_string += f' {Path(file.name).name}' @@ -103,10 +103,9 @@ def write_program(self, file: typing.BinaryIO, **kwargs): self.send_to_download_channel() bootloader = self.expose_bootloader() - rv = bootloader.write_program(file, **kwargs) + bootloader.write_program(file, **kwargs) ui.finalize('upload', f'Finished uploading {action_string}') - return rv @retries def query_system(self) -> SystemStatus: diff --git a/pros/serial/devices/vex/crc.py b/pros/serial/devices/vex/crc.py index 2e4270d7..abc7fa6e 100644 --- a/pros/serial/devices/vex/crc.py +++ b/pros/serial/devices/vex/crc.py @@ -13,7 +13,7 @@ def __init__(self, size: int, polynomial: int): if crc_accumulator & (1 << (self._size - 1)): crc_accumulator = (crc_accumulator << 1) ^ self._polynomial else: - crc_accumulator = (crc_accumulator << 1) + crc_accumulator = crc_accumulator << 1 self._table.append(crc_accumulator) def compute(self, data: Iterable[int], accumulator: int = 0): diff --git a/pros/serial/devices/vex/stm32_device.py b/pros/serial/devices/vex/stm32_device.py index eecfdc47..64f0b764 100644 --- a/pros/serial/devices/vex/stm32_device.py +++ b/pros/serial/devices/vex/stm32_device.py @@ -6,8 +6,9 @@ from functools import reduce from typing import * -import pros.common.ui as ui +from pros.common import ui from pros.common import logger, retries +from pros.conductor import Project from pros.serial import bytes_to_str from pros.serial.devices.vex import VEXCommError from pros.serial.ports import BasePort @@ -147,7 +148,7 @@ def erase_memory(self, page_numbers: List[int]): if not self.commands[6] == 0x43: raise VEXCommError('Standard erase not supported on this device (only extended erase)') assert 0 < len(page_numbers) <= 255 - assert all([0 <= p <= 255 for p in page_numbers]) + assert all(0 <= p <= 255 for p in page_numbers) self._txrx_command(0x43) self._txrx_command(bytes([len(page_numbers) - 1, *page_numbers])) @@ -157,7 +158,7 @@ def extended_erase(self, page_numbers: List[int]): if not self.commands[6] == 0x44: raise IOError('Extended erase not supported on this device (only standard erase)') assert 0 < len(page_numbers) < 0xfff0 - assert all([0 <= p <= 0xffff for p in page_numbers]) + assert all(0 <= p <= 0xffff for p in page_numbers) self._txrx_command(0x44) self._txrx_command(bytes([len(page_numbers) - 1, *struct.pack(f'>{len(page_numbers)}H', *page_numbers)])) @@ -189,3 +190,6 @@ def _txrx_command(self, command: Union[int, bytes], timeout: float = 0.01, check if data[0] == self.ACK_BYTE: return raise VEXCommError(f"Device never ACK'd to {command}", command) + + def upload_project(self, project: Project, **kwargs): + pass diff --git a/pros/serial/devices/vex/v5_device.py b/pros/serial/devices/vex/v5_device.py index 24483eb0..800c0d3e 100644 --- a/pros/serial/devices/vex/v5_device.py +++ b/pros/serial/devices/vex/v5_device.py @@ -28,7 +28,7 @@ from .vex_device import VEXDevice from ..system_device import SystemDevice -int_str = Union[int, str] +IntStr = Union[int, str] def find_v5_ports(p_type: str): @@ -37,9 +37,9 @@ def filter_vex_ports(p): p.name is not None and ('VEX' in p.name or 'V5' in p.name) def filter_v5_ports(p, locations, names): - return (p.location is not None and any([p.location.endswith(l) for l in locations])) or \ - (p.name is not None and any([n in p.name for n in names])) or \ - (p.description is not None and any([n in p.description for n in names])) + return (p.location is not None and any(p.location.endswith(l) for l in locations)) or \ + (p.name is not None and any(n in p.name for n in names)) or \ + (p.description is not None and any(n in p.description for n in names)) def filter_v5_ports_mac(p, device): return (p.device is not None and p.device.endswith(device)) @@ -67,10 +67,9 @@ def filter_v5_ports_mac(p, device): if len(user_ports) == len(system_ports) and len(user_ports) > 0: if p_type.lower() == 'user': return user_ports - elif p_type.lower() == 'system': + if p_type.lower() == 'system': return system_ports + joystick_ports - else: - raise ValueError(f'Invalid port type specified: {p_type}') + raise ValueError(f'Invalid port type specified: {p_type}') # None of the typical filters worked, so if there are only two ports, then the lower one is always* # the USER? port (*always = I haven't found a guarantee) @@ -82,14 +81,13 @@ def natural_key(chunk: str): ports = sorted(ports, key=lambda p: natural_key(p.device)) if p_type.lower() == 'user': return [ports[1]] - elif p_type.lower() == 'system': + if p_type.lower() == 'system': # check if ports contain the word Brain in the description and return that port for port in ports: if "Brain" in port.description: return [port] return [ports[0], *joystick_ports] - else: - raise ValueError(f'Invalid port type specified: {p_type}') + raise ValueError(f'Invalid port type specified: {p_type}') # these can now also be used as user ports if len(joystick_ports) > 0: # and p_type.lower() == 'system': return joystick_ports @@ -137,7 +135,7 @@ class FTCompleteOptions(IntEnum): VEX_CRC16 = CRC(16, 0x1021) # CRC-16-CCIT VEX_CRC32 = CRC(32, 0x04C11DB7) # CRC-32 (the one used everywhere but has no name) - class SystemVersion(object): + class SystemVersion: class Product(IntEnum): CONTROLLER = 0x11 BRAIN = 0x10 @@ -161,7 +159,7 @@ def __str__(self): f' Product: {self.product.name}\n' \ f' Product Flags: {self.product_flags.value:x}' - class SystemStatus(object): + class SystemStatus: def __init__(self, data: tuple): from semantic_version import Version self.system_version = Version('{}.{}.{}-{}'.format(*data[0:4])) @@ -178,7 +176,7 @@ def __init__(self, port: BasePort): self._serial_cache = b'' super().__init__(port) - class DownloadChannel(object): + class DownloadChannel: def __init__(self, device: 'V5Device', timeout: float = 5.): self.device = device self.timeout = timeout @@ -205,9 +203,7 @@ def __enter__(self): if V5Device.SystemVersion.ControllerFlags.CONNECTED not in version.product_flags: raise VEXCommError('Could not transfer V5 Controller to download channel', version) logger(__name__).info('V5 should been transferred to higher bandwidth download channel') - return self - else: - return self + return self def __exit__(self, *exc): if self.did_switch: @@ -357,7 +353,7 @@ def write_program(self, file: typing.BinaryIO, remote_name: str = None, ini: Con raise ValueError(f'Unknown quirk option: {quirk}') ui.finalize('upload', f'{finish_string} to V5') - def ensure_library_space(self, name: Optional[str] = None, vid: int_str = None, + def ensure_library_space(self, name: Optional[str] = None, vid: IntStr = None, target_name: Optional[str] = None): """ Uses algorithms, for loops, and if statements to determine what files should be removed @@ -471,7 +467,7 @@ def ensure_library_space(self, name: Optional[str] = None, vid: int_str = None, else: self.erase_file(file_name=file, erase_all=True, vid='user') - def upload_library(self, file: typing.BinaryIO, remote_name: str = None, file_len: int = -1, vid: int_str = 'pros', + def upload_library(self, file: typing.BinaryIO, remote_name: str = None, file_len: int = -1, vid: IntStr = 'pros', force_upload: bool = False, compress: bool = True, **kwargs): """ Upload a file used for linking. Contains the logic to check if the file is already present in the filesystem @@ -503,10 +499,9 @@ def upload_library(self, file: typing.BinaryIO, remote_name: str = None, file_le if response['size'] == file_len and response['crc'] == crc32: ui.echo('Library is already onboard V5') return - else: - logger(__name__).warning(f'Library onboard doesn\'t match! ' - f'Length was {response["size"]} but expected {file_len} ' - f'CRC: was {response["crc"]:x} but expected {crc32:x}') + logger(__name__).warning(f'Library onboard doesn\'t match! ' + f'Length was {response["size"]} but expected {file_len} ' + f'CRC: was {response["crc"]:x} but expected {crc32:x}') except VEXCommError as e: logger(__name__).debug(e) else: @@ -516,7 +511,7 @@ def upload_library(self, file: typing.BinaryIO, remote_name: str = None, file_le self.ensure_library_space(remote_name, vid, ) self.write_file(file, remote_name, file_len, vid=vid, **kwargs) - def read_file(self, file: typing.IO[bytes], remote_file: str, vid: int_str = 'user', target: int_str = 'flash', + def read_file(self, file: typing.IO[bytes], remote_file: str, vid: IntStr = 'user', target: IntStr = 'flash', addr: Optional[int] = None, file_len: Optional[int] = None): if isinstance(vid, str): vid = self.vid_map[vid.lower()] @@ -545,7 +540,7 @@ def read_file(self, file: typing.IO[bytes], remote_file: str, vid: int_str = 'us def write_file(self, file: typing.BinaryIO, remote_file: str, file_len: int = -1, run_after: FTCompleteOptions = FTCompleteOptions.DONT_RUN, linked_filename: Optional[str] = None, - linked_vid: int_str = 'pros', compress: bool = False, **kwargs): + linked_vid: IntStr = 'pros', compress: bool = False, **kwargs): if file_len < 0: file_len = file.seek(0, 2) file.seek(0, 0) @@ -635,7 +630,7 @@ def query_system_version(self) -> SystemVersion: return V5Device.SystemVersion(ret) @retries - def ft_transfer_channel(self, channel: int_str): + def ft_transfer_channel(self, channel: IntStr): logger(__name__).debug(f'Transferring to {channel} channel') logger(__name__).debug('Sending ext 0x10 command') if isinstance(channel, str): @@ -715,7 +710,7 @@ def ft_write(self, addr: int, payload: Union[Iterable, bytes, bytearray, str]): def ft_read(self, addr: int, n_bytes: int) -> bytearray: logger(__name__).debug('Sending ext 0x14 command') actual_n_bytes = n_bytes + (0 if n_bytes % 4 == 0 else 4 - n_bytes % 4) - ui.logger(__name__).debug(dict(actual_n_bytes=actual_n_bytes, addr=addr)) + ui.logger(__name__).debug({"actual_n_bytes": actual_n_bytes, "addr": addr}) tx_payload = struct.pack(" bytearray: return ret @retries - def ft_set_link(self, link_name: str, vid: int_str = 'user', options: int = 0): + def ft_set_link(self, link_name: str, vid: IntStr = 'user', options: int = 0): logger(__name__).debug('Sending ext 0x15 command') if isinstance(vid, str): vid = self.vid_map[vid.lower()] @@ -736,7 +731,7 @@ def ft_set_link(self, link_name: str, vid: int_str = 'user', options: int = 0): return ret @retries - def get_dir_count(self, vid: int_str = 1, options: int = 0) \ + def get_dir_count(self, vid: IntStr = 1, options: int = 0) \ -> int: logger(__name__).debug('Sending ext 0x16 command') if isinstance(vid, str): @@ -760,7 +755,7 @@ def get_file_metadata_by_idx(self, file_idx: int, options: int = 0) \ return rx @retries - def execute_program_file(self, file_name: str, vid: int_str = 'user', run: bool = True): + def execute_program_file(self, file_name: str, vid: IntStr = 'user', run: bool = True): logger(__name__).debug('Sending ext 0x18 command') if isinstance(vid, str): vid = self.vid_map[vid.lower()] @@ -773,12 +768,12 @@ def execute_program_file(self, file_name: str, vid: int_str = 'user', run: bool return ret @retries - def get_file_metadata_by_name(self, file_name: str, vid: int_str = 1, options: int = 0) \ + def get_file_metadata_by_name(self, file_name: str, vid: IntStr = 1, options: int = 0) \ -> Dict[str, Any]: logger(__name__).debug('Sending ext 0x19 command') if isinstance(vid, str): vid = self.vid_map[vid.lower()] - ui.logger(__name__).debug(f'Options: {dict(vid=vid, file_name=file_name)}') + ui.logger(__name__).debug(f"Options: {{'vid': {vid}, 'file_name': {file_name}}}") tx_payload = struct.pack("<2B24s", vid, options, file_name.encode(encoding='ascii')) rx = self._txrx_ext_struct(0x19, tx_payload, " SystemStatus: - from semantic_version import Version logger(__name__).debug('Sending ext 0x22 command') version = self.query_system_version() if (version.product == V5Device.SystemVersion.Product.BRAIN and version.system_version in Spec('<1.0.13')) or \ @@ -891,7 +885,7 @@ def user_fifo_write(self, payload: Union[Iterable, bytes, bytearray, str]): if i + max_packet_size > pl_len: packet_size = pl_len - i logger(__name__).debug(f'Writing {packet_size} bytes to user FIFO') - self._txrx_ext_packet(0x27, b'\x01\x00' + payload[i:packet_size], 0, check_length=False)[1:] + # self._txrx_ext_packet(0x27, b'\x01\x00' + payload[i:packet_size], 0, check_length=False)[1:] logger(__name__).debug('Completed ext 0x27 command (write)') @retries @@ -965,10 +959,10 @@ def _rx_ext_packet(cls, msg: Message, command: int, rx_length: int, check_ack: b :param tx_payload: what was sent, used if an exception needs to be thrown :return: The payload of the extended message """ - assert (msg['command'] == 0x56) - if not cls.VEX_CRC16.compute(msg.rx) == 0: + assert msg['command'] == 0x56 + if cls.VEX_CRC16.compute(msg.rx) != 0: raise VEXCommError("CRC of message didn't match 0: {}".format(cls.VEX_CRC16.compute(msg.rx)), msg) - assert (msg['payload'][0] == command) + assert msg['payload'][0] == command msg = msg['payload'][1:-2] if check_ack: nacks = { @@ -987,16 +981,16 @@ def _rx_ext_packet(cls, msg: Message, command: int, rx_length: int, check_ack: b 0xDA: "Max user files, no more room for another user program", 0xDB: "User file exists" } - if msg[0] in nacks.keys(): + if msg[0] in nacks: raise VEXCommError("Device NACK'd with reason: {}".format(nacks[msg[0]]), msg) - elif msg[0] != cls.ACK_BYTE: + if msg[0] != cls.ACK_BYTE: raise VEXCommError("Device didn't ACK", msg) msg = msg[1:] if len(msg) > 0: logger(cls).debug('Set msg window to {}'.format(bytes_to_str(msg))) if len(msg) < rx_length and check_length: raise VEXCommError(f'Received length is less than {rx_length} (got {len(msg)}).', msg) - elif len(msg) > rx_length and check_length: + if len(msg) > rx_length and check_length: ui.echo( f'WARNING: Recieved length is more than {rx_length} (got {len(msg)}). Consider upgrading the PROS (CLI Version: {get_version()}).') return msg diff --git a/pros/serial/devices/vex/vex_device.py b/pros/serial/devices/vex/vex_device.py index ff9862d4..80dcace5 100644 --- a/pros/serial/devices/vex/vex_device.py +++ b/pros/serial/devices/vex/vex_device.py @@ -71,7 +71,7 @@ def _rx_packet(self, timeout: Optional[float] = None) -> Dict[str, Union[Union[i logger(__name__).debug("Tossing rx ({}) because {} didn't match".format(bytes_to_str(rx), b)) response_header_stack = bytearray(response_header) rx = bytearray() - if not rx == bytearray(response_header): + if rx != bytearray(response_header): raise IOError(f"Couldn't find the response header in the device response after {timeout} s. " f"Got {rx.hex()} but was expecting {response_header.hex()}") rx.extend(self.port.read(1)) diff --git a/pros/serial/interactive/__init__.py b/pros/serial/interactive/__init__.py index aa7f4062..18e769f4 100644 --- a/pros/serial/interactive/__init__.py +++ b/pros/serial/interactive/__init__.py @@ -1,3 +1,3 @@ -from .UploadProjectModal import UploadProjectModal +from .upload_project_modal import UploadProjectModal __all__ = ['UploadProjectModal'] diff --git a/pros/serial/interactive/UploadProjectModal.py b/pros/serial/interactive/upload_project_modal.py similarity index 99% rename from pros/serial/interactive/UploadProjectModal.py rename to pros/serial/interactive/upload_project_modal.py index f14dde7e..164c71ef 100644 --- a/pros/serial/interactive/UploadProjectModal.py +++ b/pros/serial/interactive/upload_project_modal.py @@ -3,7 +3,7 @@ from threading import Thread from typing import * -import pros.common.ui as ui +from pros.common import ui from pros.common.ui.interactive import application, components, parameters from pros.common.utils import with_click_context from pros.conductor import Project diff --git a/pros/serial/ports/base_port.py b/pros/serial/ports/base_port.py index 6bfc03fc..8c71d34e 100644 --- a/pros/serial/ports/base_port.py +++ b/pros/serial/ports/base_port.py @@ -1,7 +1,7 @@ from typing import * -class BasePort(object): +class BasePort: def write(self, data: bytes): raise NotImplementedError() diff --git a/pros/serial/ports/direct_port.py b/pros/serial/ports/direct_port.py index fa225f54..f7189f75 100644 --- a/pros/serial/ports/direct_port.py +++ b/pros/serial/ports/direct_port.py @@ -21,9 +21,8 @@ def create_serial_port(port_name: str, timeout: Optional[float] = 1.0) -> serial 'Access is denied', 'Errno 16', 'Errno 13' ]): tb = sys.exc_info()[2] - raise dont_send(ConnectionRefusedException(port_name, e).with_traceback(tb)) - else: - raise dont_send(PortNotFoundException(port_name, e)) + raise dont_send(ConnectionRefusedException(port_name, e).with_traceback(tb)) from e + raise dont_send(PortNotFoundException(port_name, e)) from e @@ -40,18 +39,17 @@ def read(self, n_bytes: int = 0) -> bytes: msg = bytes(self.buffer) self.buffer = bytearray() return msg + if len(self.buffer) < n_bytes: + self.buffer.extend(self.serial.read(n_bytes - len(self.buffer))) + if len(self.buffer) < n_bytes: + msg = bytes(self.buffer) + self.buffer = bytearray() else: - if len(self.buffer) < n_bytes: - self.buffer.extend(self.serial.read(n_bytes - len(self.buffer))) - if len(self.buffer) < n_bytes: - msg = bytes(self.buffer) - self.buffer = bytearray() - else: - msg, self.buffer = bytes(self.buffer[:n_bytes]), self.buffer[n_bytes:] - return msg + msg, self.buffer = bytes(self.buffer[:n_bytes]), self.buffer[n_bytes:] + return msg except serial.SerialException as e: logger(__name__).debug(e) - raise PortConnectionException(e) + raise PortConnectionException(e) from e def write(self, data: Union[str, bytes]): if isinstance(data, str): diff --git a/pros/serial/ports/serial_share_bridge.py b/pros/serial/ports/serial_share_bridge.py index b632a5dc..27b57b1c 100644 --- a/pros/serial/ports/serial_share_bridge.py +++ b/pros/serial/ports/serial_share_bridge.py @@ -11,8 +11,8 @@ from .. import bytes_to_str -def get_port_num(serial_port_name: str, hash: str) -> int: - return sum("Powered by PROS: {}-{}".format(serial_port_name, hash).encode(encoding='ascii')) +def get_port_num(serial_port_name: str, hash_value: str) -> int: + return sum("Powered by PROS: {}-{}".format(serial_port_name, hash_value).encode(encoding='ascii')) def get_from_device_port_num(serial_port_name: str) -> int: @@ -23,7 +23,7 @@ def get_to_device_port_num(serial_port_name: str) -> int: return get_port_num(serial_port_name, 'to') -class SerialShareBridge(object): +class SerialShareBridge: def __init__(self, serial_port_name: str, base_addr: str = '127.0.0.1', to_device_port_num: int = None, from_device_port_num: int = None): self._serial_port_name = serial_port_name @@ -156,6 +156,7 @@ def _to_device_loop(self, initialization_barrier: multiprocessing.Barrier): initialization_barrier.wait() watchdog.start() while not self.dying.is_set(): + # pylint: disable=unsubscriptable-object msg = to_ser_sock.recv_multipart() if not msg or self.dying.is_set(): continue diff --git a/pros/serial/ports/serial_share_port.py b/pros/serial/ports/serial_share_port.py index f329ac7e..14c18910 100644 --- a/pros/serial/ports/serial_share_port.py +++ b/pros/serial/ports/serial_share_port.py @@ -42,6 +42,7 @@ def read(self, n_bytes: int = -1): n_bytes = 1 data = bytearray() for _ in range(n_bytes): + # pylint: disable=unsubscriptable-object data.extend(self.from_device_sock.recv_multipart()[1]) return bytes(data) @@ -81,3 +82,7 @@ def _kick_watchdog(self): self.to_device_sock.send_multipart([b'kick']) self.alive.wait(2.5) logger(__name__).info('Watchdog kicker is dying') + + @property + def name(self): + pass diff --git a/pros/serial/terminal/terminal.py b/pros/serial/terminal/terminal.py index a0c78264..80dbae72 100644 --- a/pros/serial/terminal/terminal.py +++ b/pros/serial/terminal/terminal.py @@ -15,7 +15,7 @@ # This file is a modification of the miniterm implementation on pyserial -class ConsoleBase(object): +class ConsoleBase: """OS abstraction for console (input/output codec, no echo)""" def __init__(self): @@ -65,7 +65,7 @@ def __exit__(self, *args, **kwargs): import ctypes - class Out(object): + class Out: """file-like wrapper that uses os.write""" def __init__(self, fd): @@ -104,7 +104,7 @@ def getkey(self): z = msvcrt.getwch() if z == chr(13): return chr(10) - elif z in (chr(0), chr(0x0e)): # functions keys, ignore + if z in (chr(0), chr(0x0e)): # functions keys, ignore msvcrt.getwch() else: return z @@ -148,7 +148,7 @@ def getkey(self): [], None) if self.pipe_r in ready: os.read(self.pipe_r, 1) - return + return None c = self.enc_stdin.read(1) if c == chr(0x7f): c = chr(8) # map the BS key (which yields DEL) to backspace @@ -166,7 +166,7 @@ def cleanup(self): ' available.'.format(sys.platform)) -class Terminal(object): +class Terminal: """This class is loosely based off of the pyserial miniterm""" def __init__(self, port_instance: StreamDevice, transformations=(), @@ -260,9 +260,8 @@ def transmitter(self): if c == '\x03' or not self.no_sigint: self.stop() break - else: - self.device.write(c.encode(encoding='utf-8')) - self.console.write(c) + self.device.write(c.encode(encoding='utf-8')) + self.console.write(c) except Exception as e: if not self.alive.is_set(): logger(__name__).exception(e) diff --git a/pros/upgrade/instructions/base_instructions.py b/pros/upgrade/instructions/base_instructions.py index 81f543fd..8ccdd209 100644 --- a/pros/upgrade/instructions/base_instructions.py +++ b/pros/upgrade/instructions/base_instructions.py @@ -1,4 +1,4 @@ -class UpgradeResult(object): +class UpgradeResult: def __init__(self, successful: bool, **kwargs): self.successful = successful self.__dict__.update(**kwargs) @@ -7,7 +7,7 @@ def __str__(self): return f'The upgrade was {"" if self.successful else "not "}successful.\n{getattr(self, "explanation", "")}' -class UpgradeInstruction(object): +class UpgradeInstruction: """ Base class for all upgrade instructions, not useful to instantiate """ diff --git a/pros/upgrade/manifests/upgrade_manifest_v1.py b/pros/upgrade/manifests/upgrade_manifest_v1.py index 51ba9346..0db43075 100644 --- a/pros/upgrade/manifests/upgrade_manifest_v1.py +++ b/pros/upgrade/manifests/upgrade_manifest_v1.py @@ -4,7 +4,7 @@ from ..instructions import UpgradeResult -class UpgradeManifestV1(object): +class UpgradeManifestV1: """ An Upgrade Manifest only capable of determine if there is an update - not how to update """ @@ -28,8 +28,7 @@ def describe_update(self) -> str: if self.needs_upgrade: return f'There is an update available! {self.version} is the latest version.\n' \ f'Go to {self.info_url} to learn more.' - else: - return f'You are up to date. ({self.version})' + return f'You are up to date. ({self.version})' def __str__(self): return self.describe_update() diff --git a/pros/upgrade/manifests/upgrade_manifest_v2.py b/pros/upgrade/manifests/upgrade_manifest_v2.py index b024aa3d..34b81a2f 100644 --- a/pros/upgrade/manifests/upgrade_manifest_v2.py +++ b/pros/upgrade/manifests/upgrade_manifest_v2.py @@ -8,12 +8,12 @@ class PlatformsV2(Enum): - Unknown = 0 - Windows86 = 1 - Windows64 = 2 - MacOS = 3 - Linux = 4 - Pip = 5 + UNKNOWN = 0 + WINDOWS86 = 1 + WINDOWS64 = 2 + MACOS = 3 + LINUX = 4 + PIP = 5 class UpgradeManifestV2(UpgradeManifestV1): @@ -34,7 +34,7 @@ def __init__(self): def platform(self) -> 'PlatformsV2': """ Attempts to detect the current platform type - :return: The detected platform type, or Unknown + :return: The detected platform type, or UNKNOWN """ if self._platform is not None: return self._platform @@ -43,21 +43,22 @@ def platform(self) -> 'PlatformsV2': frozen_platform = getattr(_constants, 'FROZEN_PLATFORM_V1', None) if isinstance(frozen_platform, str): if frozen_platform.startswith('Windows86'): - self._platform = PlatformsV2.Windows86 + self._platform = PlatformsV2.WINDOWS86 elif frozen_platform.startswith('Windows64'): - self._platform = PlatformsV2.Windows64 + self._platform = PlatformsV2.WINDOWS64 elif frozen_platform.startswith('MacOS'): - self._platform = PlatformsV2.MacOS + self._platform = PlatformsV2.MACOS else: try: from pip._vendor import pkg_resources + # pylint: disable=not-an-iterable results = [p for p in pkg_resources.working_set if p.project_name.startswith('pros-cli')] if any(results): - self._platform = PlatformsV2.Pip + self._platform = PlatformsV2.PIP except ImportError: pass if not self._platform: - self._platform = PlatformsV2.Unknown + self._platform = PlatformsV2.UNKNOWN return self._platform @property diff --git a/pros/upgrade/upgrade_manager.py b/pros/upgrade/upgrade_manager.py index 3ddcf8eb..167b4a53 100644 --- a/pros/upgrade/upgrade_manager.py +++ b/pros/upgrade/upgrade_manager.py @@ -4,7 +4,7 @@ from typing import * from pros.common import logger -import pros.common.ui as ui +from pros.common import ui from pros.config import Config from pros.config.cli_config import cli_config from .manifests import * @@ -12,8 +12,8 @@ class ReleaseChannel(Enum): - Stable = 'stable' - Beta = 'beta' + STABLE = 'stable' + BETA = 'beta' class UpgradeManager(Config): @@ -22,7 +22,7 @@ def __init__(self, file=None): file = os.path.join(cli_config().directory, 'upgrade.pros.json') self._last_check: datetime = datetime.min self._manifest: Optional[UpgradeManifestV1] = None - self.release_channel: ReleaseChannel = ReleaseChannel.Stable + self.release_channel: ReleaseChannel = ReleaseChannel.STABLE super().__init__(file) @@ -49,7 +49,7 @@ def get_manifest(self, force: bool = False) -> UpgradeManifestV1: manifest_urls = [f"{channel_url}/{manifest.__name__}.json" for manifest in manifests] for manifest_url in manifest_urls: - resp = requests.get(manifest_url) + resp = requests.get(manifest_url, timeout=10) if resp.status_code == 200: try: self._manifest = jsonpickle.decode(resp.text, keys=True) diff --git a/setup.py b/setup.py index f26a9741..2a0b8083 100644 --- a/setup.py +++ b/setup.py @@ -3,9 +3,12 @@ from setuptools import setup, find_packages from install_requires import install_requires as install_reqs +with open('pip_version') as version_file: + version = version_file.read().strip() + setup( name='pros-cli', - version=open('pip_version').read().strip(), + version=version, packages=find_packages(), url='https://github.com/purduesigbots/pros-cli', license='MPL-2.0', diff --git a/version.py b/version.py index 39542079..9b9a6f0b 100644 --- a/version.py +++ b/version.py @@ -4,32 +4,32 @@ try: with open(os.devnull, 'w') as devnull: - v = subprocess.check_output(['git', 'describe', '--tags', '--dirty', '--abbrev'], stderr=stdout).decode().strip() - if '-' in v: - bv = v[:v.index('-')] - bv = bv[:bv.rindex('.') + 1] + str(int(bv[bv.rindex('.') + 1:]) + 1) - sempre = 'dirty' if v.endswith('-dirty') else 'commit' - pippre = 'alpha' if v.endswith('-dirty') else 'pre' - build = subprocess.check_output(['git', 'rev-parse', '--short', 'HEAD']).decode().strip() - number_since = subprocess.check_output( - ['git', 'rev-list', v[:v.index('-')] + '..HEAD', '--count']).decode().strip() - semver = bv + '-' + sempre + '+' + build - pipver = bv + pippre + number_since - winver = v[:v.index('-')] + '.' + number_since + V = subprocess.check_output(['git', 'describe', '--tags', '--dirty', '--abbrev'], stderr=stdout).decode().strip() + if '-' in V: + BV = V[:V.index('-')] + BV = BV[:BV.rindex('.') + 1] + str(int(BV[BV.rindex('.') + 1:]) + 1) + SEMPRE = 'dirty' if V.endswith('-dirty') else 'commit' + PIPPRE = 'alpha' if V.endswith('-dirty') else 'pre' + BUILD = subprocess.check_output(['git', 'rev-parse', '--short', 'HEAD']).decode().strip() + NUMBER_SINCE = subprocess.check_output( + ['git', 'rev-list', V[:V.index('-')] + '..HEAD', '--count']).decode().strip() + SEMVER = BV + '-' + SEMPRE + '+' + BUILD + PIPVER = BV + PIPPRE + NUMBER_SINCE + WINVER = V[:V.index('-')] + '.' + NUMBER_SINCE else: - semver = v - pipver = v - winver = v + '.0' + SEMVER = V + PIPVER = V + WINVER = V + '.0' with open('version', 'w') as f: - print('Semantic version is ' + semver) - f.write(semver) + print('Semantic version is ' + SEMVER) + f.write(SEMVER) with open('pip_version', 'w') as f: - print('PIP version is ' + pipver) - f.write(pipver) + print('PIP version is ' + PIPVER) + f.write(PIPVER) with open('win_version', 'w') as f: - print('Windows version is ' + winver) - f.write(winver) + print('Windows version is ' + WINVER) + f.write(WINVER) except Exception as e: print('Error calling git') print(e)