Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions src/browser_automation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import logging
from random import SystemRandom

from patchright.async_api import Error as PlaywrightError
from patchright.async_api import Page

logger = logging.getLogger(__name__)
cryptogen = SystemRandom()


async def _scroll_and_load_listings(page: Page, max_entries: int = 100, max_no_change: int = 3, max_scroll_attempts: int = 50) -> None:
"""Scroll through search results to trigger lazy loading."""
await page.wait_for_selector('[class*="search-page-list-container"]', timeout=10000)

previous_count = 0
no_change_iterations = 0

for iteration in range(max_scroll_attempts):
current_cards = await page.query_selector_all('article[data-test="property-card"]')
current_count = len(current_cards)

msg = f"Iteration {iteration + 1}: Found {current_count} property cards"
logger.info(msg)

if current_count >= max_entries:
msg = f"Reached target of {max_entries} entries"
logger.info(msg)
break

# Check if we've reached the bottom of the page (element is visible on screen)
bottom_element = await page.query_selector("div.search-list-save-search-parent")
if bottom_element:
# Check if the element is actually visible in the viewport
is_visible = await page.evaluate(
"""
(element) => {
const rect = element.getBoundingClientRect();
return rect.top < window.innerHeight && rect.bottom > 0;
}
""",
bottom_element,
)

if is_visible:
msg = "Reached bottom of page (search-list-save-search-parent element is visible)"
logger.info(msg)
break

if current_count == previous_count:
no_change_iterations += 1
if no_change_iterations >= max_no_change:
logger.info("No new content loaded after several attempts, stopping")
break
else:
no_change_iterations = 0

previous_count = current_count

# Scroll down by a random amount (simulate human-like scrolling)
scroll_amount = cryptogen.randint(300, 800)

try:
await page.evaluate(f"""
const searchContainer = document.querySelector('[class*="search-page-list-container"]');
searchContainer.scrollTop += {scroll_amount};
""")
except PlaywrightError as e:
wrn = f"Scroll attempt failed: {e}, trying window scroll"
logger.warning(wrn)
await page.evaluate(f"window.scrollBy(0, {scroll_amount})")

# Random wait time between scrolls (1-4 seconds)
wait_time = cryptogen.randint(1000, 4000)
await page.wait_for_timeout(wait_time)

# Occasionally scroll back up a bit to simulate more natural browsing
scroll_up_chance: float = 0.15
if iteration > 0 and cryptogen.random() < scroll_up_chance:
back_scroll = cryptogen.randint(100, 300)
try:
await page.evaluate(f"""
const searchContainer = document.querySelector('[class*="search-page-list-container"]');
searchContainer.scrollTop += {back_scroll};
""")
except PlaywrightError as e:
wrn = f"Scroll attempt failed: {e}, trying window scroll"
logger.warning(wrn)
await page.evaluate(f"window.scrollBy(0, -{back_scroll})")

await page.wait_for_timeout(cryptogen.randint(500, 1500))

final_cards = await page.query_selector_all('article[data-test="property-card"]')
final_count = len(final_cards)
msg = f"Lazy loading complete. Total property cards loaded: {final_count}"
logger.info(msg)

# Scroll back to top to ensure all content is properly rendered
await page.evaluate("""
const searchContainer = document.querySelector('[class*="search-page-list-container"]');
if (searchContainer) {
searchContainer.scrollTop = 0;
} else {
window.scrollTo(0, 0);
}
""")
await page.wait_for_timeout(1500)
18 changes: 18 additions & 0 deletions src/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from typing import ClassVar


class ZillowURLs:
"""Constants for Zillow request handling."""

ZILLOW_URL: ClassVar[str] = "https://www.zillow.com/brunswick-me-04011/rentals/"
CLONE_URL: ClassVar[str] = "https://appbrewery.github.io/Zillow-Clone/"


class GoogleFormConstants:
"""Constants for Google Form submission."""

FORM_URL: ClassVar[str] = "https://docs.google.com/forms/d/e/1FAIpQLSfYrPaEL7FXI_wGYiQLUYLuqTijKaE4ZPQTL2LLTGNy6m_cYg/viewform"
ADDRESS_INPUT_XPATH: ClassVar[str] = 'xpath=//*[@id="mG61Hd"]/div[2]/div/div[2]/div[1]/div/div/div[2]/div/div[1]/div/div[1]/input'
PRICE_INPUT_XPATH: ClassVar[str] = 'xpath=//*[@id="mG61Hd"]/div[2]/div/div[2]/div[2]/div/div/div[2]/div/div[1]/div/div[1]/input'
LINK_INPUT_XPATH: ClassVar[str] = 'xpath=//*[@id="mG61Hd"]/div[2]/div/div[2]/div[3]/div/div/div[2]/div/div[1]/div/div[1]/input'
SUBMIT_BUTTON_XPATH: ClassVar[str] = 'xpath=//*[@id="mG61Hd"]/div[2]/div/div[3]/div/div[1]/div'
2 changes: 2 additions & 0 deletions src/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class ZillowParseError(Exception):
"""Custom exception for Zillow scraping errors."""
19 changes: 19 additions & 0 deletions src/form_submission.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import logging
from random import SystemRandom

from patchright.async_api import Page

from src.constants import GoogleFormConstants

logger = logging.getLogger(__name__)
cryptogen = SystemRandom()


async def _submit_form(page: Page, url: str, address: str, price: str, link: str) -> None:
await page.goto(url)
await page.wait_for_timeout(cryptogen.randint(1000, 3000))
await page.fill(GoogleFormConstants.ADDRESS_INPUT_XPATH, address)
await page.fill(GoogleFormConstants.PRICE_INPUT_XPATH, price)
await page.fill(GoogleFormConstants.LINK_INPUT_XPATH, link)
await page.click(GoogleFormConstants.SUBMIT_BUTTON_XPATH)
await page.wait_for_timeout(cryptogen.randint(1000, 3000))
Loading
Loading