diff --git a/rsconnect/api.py b/rsconnect/api.py index 8753012c..cf1fb93e 100644 --- a/rsconnect/api.py +++ b/rsconnect/api.py @@ -265,7 +265,7 @@ def token_endpoint(self) -> str: if params is None: raise RSConnectException("No Snowflake connection found.") - return "https://{}.snowflakecomputing.com/".format(params["account"]) + return f"https://{params['account']}.snowflakecomputing.com/" def fmt_payload(self): params = get_parameters(self.snowflake_connection_name) @@ -276,9 +276,7 @@ def fmt_payload(self): authenticator = params.get("authenticator") if authenticator == "SNOWFLAKE_JWT": spcs_url = urlparse(self.url) - scope = ( - "session:role:{} {}".format(params["role"], spcs_url.netloc) if params.get("role") else spcs_url.netloc - ) + scope = f"session:role:{params['role']} {spcs_url.netloc}" if params.get("role") else spcs_url.netloc jwt = generate_jwt(self.snowflake_connection_name) grant_type = "urn:ietf:params:oauth:grant-type:jwt-bearer" @@ -300,13 +298,13 @@ def fmt_payload(self): "body": payload, "headers": { "Content-Type": "application/json", - "Authorization": "Bearer %s" % params["token"], + "Authorization": f"Bearer {params['token']}", "X-Snowflake-Authorization-Token-Type": "OAUTH", }, "path": "/session/v1/login-request", } else: - raise NotImplementedError("Unsupported authenticator for SPCS Connect: %s" % authenticator) + raise NotImplementedError(f"Unsupported authenticator for SPCS Connect: {authenticator}") def exchange_token(self) -> str: try: @@ -451,13 +449,13 @@ def python_settings(self) -> PyInfo: return response def app_get(self, app_id: str) -> ContentItemV0: - response = cast(Union[ContentItemV0, HTTPResponse], self.get("applications/%s" % app_id)) + response = cast(Union[ContentItemV0, HTTPResponse], self.get(f"applications/{app_id}")) response = self._server.handle_bad_response(response) return response def add_environment_vars(self, content_guid: str, env_vars: list[tuple[str, str]]): env_body = [dict(name=kv[0], value=kv[1]) for kv in env_vars] - return self.patch("v1/content/%s/environment" % content_guid, body=env_body) + return self.patch(f"v1/content/{content_guid}/environment", body=env_body) def is_failed_response(self, response: HTTPResponse | JsonData) -> bool: return isinstance(response, HTTPResponse) and response.status >= 500 @@ -481,7 +479,7 @@ def access_content(self, content_guid: str) -> None: def bundle_download(self, content_guid: str, bundle_id: str) -> HTTPResponse: response = cast( HTTPResponse, - self.get("v1/content/%s/bundles/%s/download" % (content_guid, bundle_id), decode_response=False), + self.get(f"v1/content/{content_guid}/bundles/{bundle_id}/download", decode_response=False), ) response = self._server.handle_bad_response(response, is_httpresponse=True) return response @@ -489,7 +487,7 @@ def bundle_download(self, content_guid: str, bundle_id: str) -> HTTPResponse: def content_lockfile(self, content_guid: str) -> HTTPResponse: response = cast( HTTPResponse, - self.get("v1/content/%s/lockfile" % content_guid, decode_response=False), + self.get(f"v1/content/{content_guid}/lockfile", decode_response=False), ) response = self._server.handle_bad_response(response, is_httpresponse=True) return response @@ -500,7 +498,7 @@ def content_list(self, filters: Optional[Mapping[str, JsonData]] = None) -> list return response def content_get(self, content_guid: str) -> ContentItemV1: - response = cast(Union[ContentItemV1, HTTPResponse], self.get("v1/content/%s" % content_guid)) + response = cast(Union[ContentItemV1, HTTPResponse], self.get(f"v1/content/{content_guid}")) response = self._server.handle_bad_response(response) return response @@ -546,17 +544,17 @@ def upload_bundle( body, content_type = create_multipart_form_data(fields) response = cast( Union[BundleMetadata, HTTPResponse], - self.post("v1/content/%s/bundles" % content_guid, body=body, headers={"Content-Type": content_type}), + self.post(f"v1/content/{content_guid}/bundles", body=body, headers={"Content-Type": content_type}), ) else: response = cast( - Union[BundleMetadata, HTTPResponse], self.post("v1/content/%s/bundles" % content_guid, body=tarball) + Union[BundleMetadata, HTTPResponse], self.post(f"v1/content/{content_guid}/bundles", body=tarball) ) response = self._server.handle_bad_response(response) return response def content_update(self, content_guid: str, updates: Mapping[str, str | None]) -> ContentItemV1: - response = cast(Union[ContentItemV1, HTTPResponse], self.patch("v1/content/%s" % content_guid, body=updates)) + response = cast(Union[ContentItemV1, HTTPResponse], self.patch(f"v1/content/{content_guid}", body=updates)) response = self._server.handle_bad_response(response) return response @@ -571,7 +569,7 @@ def content_build( body["activate"] = False response = cast( Union[BuildOutputDTO, HTTPResponse], - self.post("v1/content/%s/build" % content_guid, body=body), + self.post(f"v1/content/{content_guid}/build", body=body), ) response = self._server.handle_bad_response(response) return response @@ -587,7 +585,7 @@ def content_deploy( body["activate"] = False response = cast( Union[BuildOutputDTO, HTTPResponse], - self.post("v1/content/%s/deploy" % content_guid, body=body), + self.post(f"v1/content/{content_guid}/deploy", body=body), ) response = self._server.handle_bad_response(response) return response @@ -615,7 +613,7 @@ def task_get( params["first"] = first if wait is not None: params["wait"] = wait - response = cast(Union[TaskStatusV1, HTTPResponse], self.get("v1/tasks/%s" % task_id, query_params=params)) + response = cast(Union[TaskStatusV1, HTTPResponse], self.get(f"v1/tasks/{task_id}", query_params=params)) response = self._server.handle_bad_response(response) # compatibility with rsconnect-jupyter @@ -1102,11 +1100,11 @@ def make_bundle( def upload_posit_bundle(self, prepare_deploy_result: PrepareDeployResult, bundle_size: int, contents: bytes): upload_url = prepare_deploy_result.presigned_url parsed_upload_url = urlparse(upload_url) - with S3Client("{}://{}".format(parsed_upload_url.scheme, parsed_upload_url.netloc)) as s3_client: + with S3Client(f"{parsed_upload_url.scheme}://{parsed_upload_url.netloc}") as s3_client: upload_result = cast( HTTPResponse, s3_client.upload( - "{}?{}".format(parsed_upload_url.path, parsed_upload_url.query), + f"{parsed_upload_url.path}?{parsed_upload_url.query}", prepare_deploy_result.presigned_checksum, bundle_size, contents, @@ -1156,7 +1154,7 @@ def deploy_bundle(self, activate: bool = True): # type: ignore[arg-type] - PrepareDeployResult uses int, but format() accepts it shinyapps_service.do_deploy(prepare_deploy_result.bundle_id, prepare_deploy_result.app_id) - print("Application successfully deployed to {}".format(prepare_deploy_result.app_url)) + print(f"Application successfully deployed to {prepare_deploy_result.app_url}") webbrowser.open_new(prepare_deploy_result.app_url) self.deployed_info = RSConnectClientDeployResult( @@ -1577,21 +1575,21 @@ def get_extra_headers(self, url: str, method: str, body: str | bytes): signature = self._get_canonical_request_signature(canonical_request) return { - "X-Auth-Token": "{0}".format(self._token), - "X-Auth-Signature": "{0}; version=1".format(signature), + "X-Auth-Token": self._token, + "X-Auth-Signature": f"{signature}; version=1", "Date": canonical_request_date, "X-Content-Checksum": canonical_request_checksum, } def get_application(self, application_id: str): - response = cast(Union[PositClientApp, HTTPResponse], self.get("/v1/applications/{}".format(application_id))) + response = cast(Union[PositClientApp, HTTPResponse], self.get(f"/v1/applications/{application_id}")) response = self._server.handle_bad_response(response) return response def update_application_property(self, application_id: int, property: str, value: str) -> HTTPResponse: response = cast( HTTPResponse, - self.put("/v1/applications/{}/properties/{}".format(application_id, property), body={"value": value}), + self.put(f"/v1/applications/{application_id}/properties/{property}", body={"value": value}), ) response = self._server.handle_bad_response(response, is_httpresponse=True) return response @@ -1614,11 +1612,7 @@ def get_accounts(self) -> PositClientAccountSearchResults: def _get_applications_like_name_page(self, name: str, offset: int) -> PositClientAppSearchResults: response = cast( Union[PositClientAppSearchResults, HTTPResponse], - self.get( - "/v1/applications?filter=name:like:{}&offset={}&count=100&use_advanced_filters=true".format( - name, offset - ) - ), + self.get(f"/v1/applications?filter=name:like:{name}&offset={offset}&count=100&use_advanced_filters=true"), ) response = self._server.handle_bad_response(response) return response @@ -1637,14 +1631,14 @@ def create_bundle( return response def set_bundle_status(self, bundle_id: str, bundle_status: str): - response = self.post("/v1/bundles/{}/status".format(bundle_id), body={"status": bundle_status}) + response = self.post(f"/v1/bundles/{bundle_id}/status", body={"status": bundle_status}) response = self._server.handle_bad_response(response) return response def deploy_application(self, bundle_id: str, app_id: str) -> PositClientDeployTask: response = cast( Union[PositClientDeployTask, HTTPResponse], - self.post("/v1/applications/{}/deploy".format(app_id), body={"bundle": bundle_id, "rebuild": False}), + self.post(f"/v1/applications/{app_id}/deploy", body={"bundle": bundle_id, "rebuild": False}), ) response = self._server.handle_bad_response(response) return response @@ -1652,7 +1646,7 @@ def deploy_application(self, bundle_id: str, app_id: str) -> PositClientDeployTa def get_task(self, task_id: str) -> PositClientDeployTask: response = cast( Union[PositClientDeployTask, HTTPResponse], - self.get("/v1/tasks/{}".format(task_id), query_params={"legacy": "true"}), + self.get(f"/v1/tasks/{task_id}", query_params={"legacy": "true"}), ) response = self._server.handle_bad_response(response) return response @@ -1664,7 +1658,7 @@ def get_shinyapps_build_task(self, parent_task_id: str) -> PositClientShinyappsB "/v1/tasks", query_params={ "filter": [ - "parent_id:eq:{}".format(parent_task_id), + f"parent_id:eq:{parent_task_id}", "action:eq:image-build", ] }, @@ -1674,7 +1668,7 @@ def get_shinyapps_build_task(self, parent_task_id: str) -> PositClientShinyappsB return response def get_task_logs(self, task_id: str) -> HTTPResponse: - response = cast(HTTPResponse, self.get("/v1/tasks/{}/logs".format(task_id))) + response = cast(HTTPResponse, self.get(f"/v1/tasks/{task_id}/logs")) response = self._server.handle_bad_response(response, is_httpresponse=True) return response @@ -1685,7 +1679,7 @@ def get_current_user(self): def wait_until_task_is_successful(self, task_id: str, timeout: int = get_task_timeout()) -> None: print() - print("Waiting for task: {}".format(task_id)) + print(f"Waiting for task: {task_id}") start_time = time.time() finished: bool | None = None @@ -1703,16 +1697,16 @@ def wait_until_task_is_successful(self, task_id: str, timeout: int = get_task_ti if finished: break - print(" {} - {}".format(status, description)) + print(f" {status} - {description}") time.sleep(2) if not finished: raise RSConnectException(get_task_timeout_help_message(timeout)) if status != "success": - raise DeploymentFailedException("Application deployment failed with error: {}".format(error)) + raise DeploymentFailedException(f"Application deployment failed with error: {error}") - print("Task done: {}".format(description)) + print(f"Task done: {description}") def get_applications_like_name(self, name: str) -> list[str]: applications: list[PositClientApp] = [] @@ -1794,7 +1788,7 @@ def do_deploy(self, bundle_id: str, app_id: str): build_task_result = self._posit_client.get_shinyapps_build_task(deploy_task["id"]) build_task = build_task_result["tasks"][0] logs = self._posit_client.get_task_logs(build_task["id"]) - logger.error("Build logs:\n{}".format(logs.response_body)) + logger.error(f"Build logs:\n{logs.response_body}") raise e diff --git a/rsconnect/http_support.py b/rsconnect/http_support.py index b0a4b06a..707eb0ea 100644 --- a/rsconnect/http_support.py +++ b/rsconnect/http_support.py @@ -31,7 +31,7 @@ Dict[str, "JsonData"], ] -_user_agent = "RSConnectPython/%s" % VERSION +_user_agent = f"RSConnectPython/{VERSION}" # noinspection PyUnusedLocal,PyUnresolvedReferences @@ -64,11 +64,11 @@ def _get_proxy(): parsed = urlparse(proxyURL) if parsed.scheme not in ["https"]: warn("HTTPS_PROXY scheme is not using https") - redacted_url = "{}://".format(parsed.scheme) + redacted_url = f"{parsed.scheme}://" if parsed.username: - redacted_url += "{}:{}@".format(parsed.username, "REDACTED") - redacted_url += "{}:{}".format(parsed.hostname, parsed.port or 8080) - logger.info("Using custom proxy server {}".format(redacted_url)) + redacted_url += f"{parsed.username}:REDACTED@" + redacted_url += f"{parsed.hostname}:{parsed.port or 8080}" + logger.info(f"Using custom proxy server {redacted_url}") return parsed.username, parsed.password, parsed.hostname, parsed.port or 8080 @@ -76,9 +76,9 @@ def _get_proxy_headers(*args: object, **kwargs: object): proxyHeaders = None proxyUsername, proxyPassword, _, _ = _get_proxy() if proxyUsername and proxyPassword: - credentials = "{}:{}".format(proxyUsername, proxyPassword) + credentials = f"{proxyUsername}:{proxyPassword}" credentials = base64.b64encode(credentials.encode("utf-8")).decode("utf-8") - proxyHeaders = {"Proxy-Authorization": "Basic {}".format(credentials)} + proxyHeaders = {"Proxy-Authorization": f"Basic {credentials}"} return proxyHeaders @@ -293,7 +293,7 @@ def __init__( self._url = urlparse(url) if self._url.scheme not in _connection_factory: - raise ValueError('The "%s" URL scheme is not supported.' % self._url.scheme) + raise ValueError(f'The "{self._url.scheme}" URL scheme is not supported.') self._disable_tls_check = disable_tls_check self._ca_data = ca_data @@ -314,13 +314,13 @@ def get_authorization(self): return self._headers["Authorization"] def key_authorization(self, key: str): - self.authorization("Key %s" % key) + self.authorization(f"Key {key}") def bootstrap_authorization(self, key: str): - self.authorization("Connect-Bootstrap %s" % key) + self.authorization(f"Connect-Bootstrap {key}") def snowflake_authorization(self, token: str): - self.authorization('Snowflake Token="%s"' % token) + self.authorization(f'Snowflake Token="{token}"') def _get_full_path(self, path: str): return append_to_path(self._url.path, path) @@ -426,7 +426,7 @@ def _do_request( ) -> JsonData | HTTPResponse: full_uri = path if query_params is not None: - full_uri = "%s?%s" % (path, urlencode(query_params, doseq=True)) + full_uri = f"{path}?{urlencode(query_params, doseq=True)}" headers = self._headers.copy() if self._proxy_headers: headers.update(self._proxy_headers) @@ -436,12 +436,12 @@ def _do_request( try: if logger.is_debugging(): - logger.debug("Request: %s %s" % (method, full_uri)) + logger.debug(f"Request: {method} {full_uri}") logger.debug("Headers:") for key, value in headers.items(): - logger.debug("--> %s: %s" % (key, value)) + logger.debug(f"--> {key}: {value}") logger.debug("Body:") - logger.debug("--> %s" % (body if body is not None else "")) + logger.debug(f"--> {body if body is not None else ''}") # if we weren't called under a `with` statement, we'll need to manage the # connection here. @@ -461,16 +461,16 @@ def _do_request( response_body = response_body.decode("utf-8").strip() if logger.is_debugging(): - logger.debug("Response: %s %s" % (response.status, response.reason)) + logger.debug(f"Response: {response.status} {response.reason}") logger.debug("Headers:") for key, value in response.getheaders(): - logger.debug("--> %s: %s" % (key, value)) + logger.debug(f"--> {key}: {value}") logger.debug("Body:") if response.getheader("Content-Type", "").startswith("application/json"): # Only print JSON responses. # Otherwise we end up dumping entire web pages to the log. try: - logger.debug("--> %s" % response_body) + logger.debug(f"--> {response_body}") except json.JSONDecodeError: logger.debug("--> ") else: @@ -493,13 +493,13 @@ def _do_request( if location.startswith("http"): parsed_location = urlparse(location) if parsed_location.query: - next_url = "{}?{}".format(parsed_location.path, parsed_location.query) + next_url = f"{parsed_location.path}?{parsed_location.query}" else: next_url = parsed_location.path else: next_url = location - logger.debug("--> Redirected to: %s" % urljoin(self._url.geturl(), location)) + logger.debug(f"--> Redirected to: {urljoin(self._url.geturl(), location)}") redirect_extra_headers = self.get_extra_headers(next_url, "GET", body) return self._do_request( @@ -573,13 +573,13 @@ def store_cookies(self, response: http.HTTPResponse): if morsel.key not in self._keys: self._keys.append(morsel.key) self._content[morsel.key] = morsel.value - logger.debug("--> Set cookie %s: %s" % (morsel.key, morsel.value)) + logger.debug(f"--> Set cookie {morsel.key}: {morsel.value}") - logger.debug("CookieJar contents: %s\n%s" % (self._keys, self._content)) + logger.debug(f"CookieJar contents: {self._keys}\n{self._content}") def get_cookie_header_value(self): - result = "; ".join(["%s=%s" % (key, self._reference.value_encode(self._content[key])[1]) for key in self._keys]) - logger.debug("Cookie: %s" % result) + result = "; ".join([f"{key}={self._reference.value_encode(self._content[key])[1]}" for key in self._keys]) + logger.debug(f"Cookie: {result}") return result def as_dict(self):