Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
MEDICOVER_USER=login
MEDICOVER_PASS=pass

LOG_LEVEL=INFO

NOTIFIERS_PUSHBULLET_TOKEN=mytoken
GOTIFY_TOKEN=mytoken
GOTIFY_HOST=https://mygotify_server.com
Expand Down
6 changes: 3 additions & 3 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v5.0.0
rev: v6.0.0
hooks:
- id: check-yaml

- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.14.5
rev: v0.15.2
hooks:
- id: ruff-format

- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.14.5
rev: v0.15.2
hooks:
- id: ruff-check
args: [--fix]
59 changes: 36 additions & 23 deletions medichaser.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@
from requests.adapters import HTTPAdapter
from rich.console import Console
from rich.logging import RichHandler
from selenium import webdriver
from selenium.webdriver.chrome.options import Options as ChromeOptions
from selenium.webdriver.chrome.webdriver import WebDriver as ChromeDriver
from selenium.webdriver.common.by import By
from selenium.webdriver.remote.webdriver import WebDriver
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from selenium_stealth import stealth
Expand Down Expand Up @@ -79,14 +79,17 @@

DEFAULT_SLOT_SEARCH_TYPE: int = 0

# Load environment variables
load_dotenv()

token_lock = FileLock(TOKEN_LOCK_PATH, timeout=60)
login_lock = FileLock(LOGIN_LOCK_PATH, timeout=60)

# Setup logging
console = Console()

logging.basicConfig(
level="INFO",
level=os.environ.get("LOG_LEVEL", "INFO").upper(),
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[
RotatingFileHandler(
Expand All @@ -98,9 +101,6 @@

log = logging.getLogger("medichaser")

# Load environment variables
load_dotenv()

retry_strategy = Retry(
total=10,
backoff_factor=1,
Expand Down Expand Up @@ -152,7 +152,7 @@ def __init__(self, username: str, password: str) -> None:
self.tokenA: str | None = None
self.tokenR: str | None = None
self.expires_at: int | None = None
self.driver: WebDriver | None = None
self.driver: ChromeDriver | None = None

def _get_or_create_device_id(self) -> str:
"""Gets the device ID from storage or creates a new one."""
Expand Down Expand Up @@ -261,7 +261,9 @@ def login_requests(self) -> None: # pragma: no cover
allow_redirects=False,
)
response.raise_for_status()
next_url = response.headers["Location"]
next_url = response.headers.get("Location")
if not next_url:
raise ValueError("Missing Location header in authorize response.")
time.sleep(2)

response = self.session.get(
Expand All @@ -279,7 +281,10 @@ def login_requests(self) -> None: # pragma: no cover
csrf_token = match.group(1)
parsed_url = urlparse(response.url)
query_params = parse_qs(parsed_url.query)
return_url = query_params["ReturnUrl"][0]
return_url_values = query_params.get("ReturnUrl")
if not return_url_values:
raise MFAError("ReturnUrl not found in login page query string.")
return_url = return_url_values[0]
# Step 2: POST credentials
login_data = {
"Input.ReturnUrl": return_url,
Expand Down Expand Up @@ -308,7 +313,9 @@ def login_requests(self) -> None: # pragma: no cover

# Step 3: Handle redirects and potential MFA
while response.status_code == 302:
redirect_url = response.headers["Location"]
redirect_url = response.headers.get("Location")
if not redirect_url:
raise ValueError("Missing Location header in login redirect response.")
if not redirect_url.startswith("https://"):
redirect_url = MEDICOVER_LOGIN_URL + redirect_url

Expand Down Expand Up @@ -399,7 +406,10 @@ def _handle_mfa(self, mfa_url: str) -> requests.Response: # pragma: no cover

parsed_url = urlparse(mfa_url)
query_params = parse_qs(parsed_url.query)
return_url = query_params["returnUrl"][0]
return_url_values = query_params.get("ReturnUrl")
if not return_url_values:
raise MFAError("ReturnUrl not found in MFA URL query string.")
return_url = return_url_values[0]

mfa_data = {
"Input.MfaCodeId": mfa_code_id,
Expand Down Expand Up @@ -456,7 +466,8 @@ def _exchange_code_for_token(

data = response.json()
if "error" in data:
raise ValueError(f"Failed to get token: {data['error_description']}")
error_description = data.get("error_description", data.get("error"))
raise ValueError(f"Failed to get token: {error_description}")

expires_in = data.get("expires_in")
expires_at = int(time.time()) + expires_in if expires_in else None
Expand All @@ -475,7 +486,7 @@ def _exchange_code_for_token(
@tenacity.retry(
stop=tenacity.stop_after_attempt(2),
wait=tenacity.wait_fixed(10),
retry=tenacity.retry_if_not_exception_type(MFAError),
retry=tenacity.retry_if_not_exception_type((MFAError, ValueError)),
reraise=True,
)
def login(self) -> None:
Expand All @@ -500,16 +511,16 @@ def login(self) -> None:
else:
self.login_requests()

def _init_driver(self) -> WebDriver:
"""Initializes the Selenium WebDriver if it's not already running."""
def _init_driver(self) -> ChromeDriver:
"""Initializes the Selenium ChromeDriver if it's not already running."""
if self.driver is None:
options = webdriver.ChromeOptions()
options = ChromeOptions()
options.add_argument("--headless") # Run in headless mode
options.add_argument("--disable-gpu")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument(f"user-data-dir={DATA_PATH / 'chrome_profile'}")
self.driver = webdriver.Chrome(options=options)
self.driver = ChromeDriver(options=options)
stealth(
self.driver,
languages=["pl-PL", "pl"],
Expand All @@ -533,7 +544,7 @@ def _init_driver(self) -> WebDriver:
return self.driver

def _quit_driver(self) -> None:
"""Quits the Selenium WebDriver if it's running."""
"""Quits the Selenium ChromeDriver if it's running."""
if self.driver:
self.driver.quit()
self.driver = None
Expand Down Expand Up @@ -574,7 +585,8 @@ def refresh_token(self) -> None:

data = response.json()
if "error" in data:
if data["error"] == "invalid_grant":
error = data.get("error")
if error == "invalid_grant":
log.error(
"Refresh token is invalid or expired. Deleting token file and re-authenticating."
)
Expand All @@ -584,7 +596,7 @@ def refresh_token(self) -> None:
"Invalid grant: refresh token is likely expired or revoked."
)
raise ValueError(
f"Failed to refresh token: {response.status_code} {response.text}"
f"Failed to refresh token: {response.status_code} {data.get('error_description', response.text)}"
)

# manually set expires_at
Expand Down Expand Up @@ -833,7 +845,8 @@ def find_appointments(
items = [
x
for x in items
if datetime.datetime.fromisoformat(x["appointmentDate"]).date()
if x.get("appointmentDate")
and datetime.datetime.fromisoformat(x["appointmentDate"]).date()
<= end_date
]

Expand Down Expand Up @@ -1222,8 +1235,8 @@ def main() -> None:
else:
filters = finder.find_filters()

for r in filters[args.filter_type]:
log.info(f"{r['id']} - {r['value']}")
for r in filters.get(args.filter_type, []):
log.info(f"{r.get('id', 'N/A')} - {r.get('value', 'N/A')}")


if __name__ == "__main__":
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ medichaser = "medichaser:medichaser"
[dependency-groups]
dev = [
"mypy>=1.16.1,<2",
"pre-commit>=4.5.1",
"pytest-cov>=6.2.1,<7",
"pytest-xdist>=3.8.0,<4",
"pytest>=8.4.1,<9",
Expand Down
44 changes: 40 additions & 4 deletions tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def test_init_driver(self, monkeypatch: pytest.MonkeyPatch) -> None:
"userAgent": "Chrome User-Agent"
}
mock_chrome = MagicMock(return_value=mock_chrome_instance)
monkeypatch.setattr("selenium.webdriver.Chrome", mock_chrome)
monkeypatch.setattr("medichaser.ChromeDriver", mock_chrome)
mock_stealth = MagicMock()
monkeypatch.setattr("medichaser.stealth", mock_stealth)

Expand Down Expand Up @@ -322,15 +322,17 @@ def test_login_with_valid_stored_token(
def test_login_with_refresh_token(self, monkeypatch: pytest.MonkeyPatch) -> None:
"""Test the main login orchestrator with a successful token refresh."""
auth = Authenticator("user", "pass")
# First call to load is False, second is True after refresh
mock_load_token = Mock(side_effect=[False, True])
mock_load_token = Mock(return_value=True)
mock_refresh = Mock()
mock_login_requests = Mock()
monkeypatch.setattr(auth, "_load_token_from_storage", mock_load_token)
monkeypatch.setattr(auth, "refresh_token", mock_refresh)
monkeypatch.setattr(auth, "login_requests", mock_login_requests)

auth.login()
assert mock_load_token.call_count == 2
mock_load_token.assert_called_once()
mock_refresh.assert_called_once()
mock_login_requests.assert_not_called()

def test_login_fallback_to_selenium_no_load(
self, monkeypatch: pytest.MonkeyPatch
Expand Down Expand Up @@ -395,6 +397,40 @@ def test_login_method_selenium(self, monkeypatch: pytest.MonkeyPatch) -> None:

mock_login_selenium.assert_called_once()

def test_login_no_retry_on_value_error(
self, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Test that login does not retry when login_requests raises ValueError."""
auth = Authenticator("user", "pass")
mock_load_token = Mock(return_value=False)
mock_login_requests = Mock(side_effect=ValueError("bad parse"))

monkeypatch.setattr(auth, "_load_token_from_storage", mock_load_token)
monkeypatch.setattr(auth, "login_requests", mock_login_requests)
monkeypatch.delenv("SELENIUM_LOGIN", raising=False)

with pytest.raises(ValueError, match="bad parse"):
auth.login()

mock_login_requests.assert_called_once()

def test_login_requests_missing_location_header_in_authorize(self) -> None:
"""Test login_requests fails fast when authorize response misses Location."""
auth = Authenticator("user", "pass")
mock_session = Mock()
mock_response = Mock()
mock_response.raise_for_status = Mock()
mock_response.headers = {}
mock_session.get.return_value = mock_response
auth.session = mock_session

with pytest.raises(
ValueError, match="Missing Location header in authorize response."
):
auth.login_requests()

assert mock_session.get.call_count == 1


class TestAppointmentFinder:
"""Test cases for the AppointmentFinder class."""
Expand Down
Loading