Skip to content

Commit 6203922

Browse files
authored
Merge pull request #8 from terrafloww/feat/speed_up_indexer
Migrate codebase from httpx to aiohttp
2 parents 395d17e + 46ef288 commit 6203922

File tree

5 files changed

+69
-51
lines changed

5 files changed

+69
-51
lines changed

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
setup(
1010
name="rasteret",
11-
version="0.1.17",
11+
version="0.1.20",
1212
author="Sidharth Subramaniam",
1313
author_email="sid@terrafloww.com",
1414
description="Fast and efficient access to Cloud-Optimized GeoTIFFs (COGs)",

src/rasteret/fetch/cog.py

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
from __future__ import annotations
2020
import asyncio
21-
import httpx
21+
import aiohttp
2222
from dataclasses import dataclass
2323
from typing import Dict, List, Optional, Tuple
2424

@@ -50,32 +50,37 @@ class COGTileRequest:
5050
class COGReader:
5151
"""Manages connection pooling and COG reading operations."""
5252

53-
def __init__(self, max_concurrent: int = 50):
53+
def __init__(self, max_concurrent: int = 150): # Increased from 50
5454
self.max_concurrent = max_concurrent
55-
self.limits = httpx.Limits(
56-
max_keepalive_connections=max_concurrent,
57-
max_connections=max_concurrent,
58-
keepalive_expiry=60.0, # Shorter keepalive for HTTP/2
59-
)
60-
self.timeout = httpx.Timeout(30.0, connect=10.0)
55+
# aiohttp connector settings
56+
self.connector_limit = max_concurrent
57+
self.connector_limit_per_host = max_concurrent
58+
self.keepalive_timeout = 60.0
6159
self.client = None
6260
self.sem = None
63-
self.batch_size = 12 # Reduced for better HTTP/2 multiplexing
61+
self.batch_size = 20 # Increased for better throughput
6462

6563
async def __aenter__(self):
66-
self.client = httpx.AsyncClient(
67-
timeout=self.timeout,
68-
limits=self.limits,
69-
http2=True,
70-
verify=True,
71-
trust_env=True,
64+
# Create aiohttp connector with optimized settings
65+
connector = aiohttp.TCPConnector(
66+
limit=self.connector_limit,
67+
limit_per_host=self.connector_limit_per_host,
68+
keepalive_timeout=self.keepalive_timeout,
69+
enable_cleanup_closed=True,
70+
)
71+
72+
# Create aiohttp client session
73+
timeout = aiohttp.ClientTimeout(total=30.0, connect=10.0)
74+
self.client = aiohttp.ClientSession(
75+
connector=connector,
76+
timeout=timeout,
7277
)
7378
self.sem = asyncio.Semaphore(self.max_concurrent)
7479
return self
7580

7681
async def __aexit__(self, exc_type, exc_val, exc_tb):
7782
if self.client:
78-
await self.client.aclose()
83+
await self.client.close()
7984

8085
def merge_ranges(
8186
self, requests: List[COGTileRequest], gap_threshold: int = 1024
@@ -155,9 +160,9 @@ async def _read_range(self, url: str, start: int, end: int) -> bytes:
155160
for attempt in range(3):
156161
try:
157162
async with self.sem:
158-
response = await self.client.get(url, headers=headers)
159-
response.raise_for_status()
160-
return response.content
163+
async with self.client.get(url, headers=headers) as response:
164+
response.raise_for_status()
165+
return await response.read()
161166
except Exception:
162167
if attempt == 2:
163168
raise
@@ -363,7 +368,7 @@ async def read_cog_tile_data(
363368
url: str,
364369
metadata: CogMetadata,
365370
geometry: Optional[Polygon] = None,
366-
max_concurrent: int = 50,
371+
max_concurrent: int = 150, # Increased from 50
367372
debug: bool = False,
368373
) -> Tuple[np.ndarray, Optional[Affine]]:
369374
"""Read COG data, optionally masked by geometry."""

src/rasteret/stac/indexer.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ def __init__(
5151
name: Optional[str] = None,
5252
cloud_provider: Optional[CloudProvider] = None,
5353
cloud_config: Optional[CloudConfig] = None,
54-
max_concurrent: int = 100,
54+
max_concurrent: int = 300, # Increased from 100
5555
):
5656
self.data_source = data_source
5757
self.stac_api = stac_api
@@ -60,7 +60,7 @@ def __init__(
6060
self.cloud_config = cloud_config
6161
self.name = name
6262
self.max_concurrent = max_concurrent
63-
self.batch_size = 50
63+
self.batch_size = 100 # Increased from 50
6464

6565
@property
6666
def band_map(self) -> Dict[str, str]:

src/rasteret/stac/parser.py

Lines changed: 31 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import time
2323
from typing import Dict, List, Optional, Set, Any
2424

25-
import httpx
25+
import aiohttp
2626
from cachetools import TTLCache, LRUCache
2727

2828
from rasteret.types import CogMetadata
@@ -80,8 +80,8 @@ class AsyncCOGHeaderParser:
8080

8181
def __init__(
8282
self,
83-
max_concurrent: int = 100,
84-
batch_size: int = 50,
83+
max_concurrent: int = 300, # Increased from 100
84+
batch_size: int = 100, # Increased from 50
8585
cache_ttl: int = 3600, # 1 hour
8686
retry_attempts: int = 3,
8787
cloud_provider: Optional[CloudProvider] = None,
@@ -93,12 +93,10 @@ def __init__(
9393
self.cloud_config = cloud_config
9494
self.batch_size = batch_size
9595

96-
# Connection optimization
97-
self.connector = httpx.Limits(
98-
max_keepalive_connections=max_concurrent,
99-
max_connections=max_concurrent,
100-
keepalive_expiry=120,
101-
)
96+
# Connection optimization for aiohttp
97+
self.connector_limit = max_concurrent
98+
self.connector_limit_per_host = max_concurrent
99+
self.keepalive_timeout = 120
102100

103101
# Rate limiting
104102
self.semaphore = asyncio.Semaphore(max_concurrent)
@@ -118,17 +116,26 @@ def __init__(
118116
}
119117

120118
async def __aenter__(self):
121-
self.client = httpx.AsyncClient(
122-
limits=self.connector,
123-
timeout=30.0,
124-
http2=True,
125-
headers={"Connection": "keep-alive", "Keep-Alive": "timeout=120"},
119+
# Create aiohttp connector with optimized settings
120+
connector = aiohttp.TCPConnector(
121+
limit=self.connector_limit,
122+
limit_per_host=self.connector_limit_per_host,
123+
keepalive_timeout=self.keepalive_timeout,
124+
enable_cleanup_closed=True,
125+
)
126+
127+
# Create aiohttp client session
128+
timeout = aiohttp.ClientTimeout(total=30.0, connect=10.0)
129+
self.client = aiohttp.ClientSession(
130+
connector=connector,
131+
timeout=timeout,
132+
headers={"Connection": "keep-alive"},
126133
)
127134
return self
128135

129136
async def __aexit__(self, exc_type, exc_val, exc_tb):
130137
if self.client:
131-
await self.client.aclose()
138+
await self.client.close()
132139

133140
async def process_cog_headers_batch(
134141
self,
@@ -187,15 +194,15 @@ async def _fetch_byte_range(self, url: str, start: int, size: int) -> bytes:
187194
for attempt in range(self.retry_attempts):
188195
try:
189196
async with self.semaphore:
190-
response = await self.client.get(url, headers=headers)
191-
if response.status_code != 206:
192-
raise IOError(
193-
f"Range request failed: {response.status_code}"
194-
)
195-
196-
data = response.content
197-
self.header_cache[cache_key] = data
198-
return data
197+
async with self.client.get(url, headers=headers) as response:
198+
if response.status != 206:
199+
raise IOError(
200+
f"Range request failed: {response.status}"
201+
)
202+
203+
data = await response.read()
204+
self.header_cache[cache_key] = data
205+
return data
199206

200207
except Exception as e:
201208
if attempt == self.retry_attempts - 1:

src/rasteret/tests/test_stac_indexer.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,9 @@ async def test_stac_search(self, mock_pystac):
7979
# Configure mock chain
8080
mock_pystac.Client.open.return_value = mock_client
8181
mock_client.search.return_value = mock_search
82-
mock_search.items.return_value = iter(self.mock_stac_items)
82+
mock_search.items_as_dicts.return_value = [
83+
item.to_dict() for item in self.mock_stac_items
84+
]
8385

8486
# Create indexer
8587
indexer = StacToGeoParquetIndexer(
@@ -88,7 +90,9 @@ async def test_stac_search(self, mock_pystac):
8890

8991
# Test search
9092
items = await indexer._search_stac(
91-
bbox=[-180, -90, 180, 90], date_range=["2023-01-01", "2023-12-31"]
93+
bbox=[-180, -90, 180, 90],
94+
date_range=["2023-01-01", "2023-12-31"],
95+
query=None,
9296
)
9397

9498
# Verify results
@@ -98,7 +102,7 @@ async def test_stac_search(self, mock_pystac):
98102
# Verify mock calls
99103
mock_pystac.Client.open.assert_called_once()
100104
mock_client.search.assert_called_once()
101-
mock_search.items.assert_called_once()
105+
mock_search.items_as_dicts.assert_called_once()
102106

103107
@patch("rasteret.stac.indexer.AsyncCOGHeaderParser")
104108
@patch("rasteret.stac.indexer.pystac_client")
@@ -109,7 +113,9 @@ async def test_index_creation(self, mock_pystac, mock_parser):
109113

110114
mock_pystac.Client.open.return_value = mock_client
111115
mock_client.search.return_value = mock_search
112-
mock_search.items.return_value = iter(self.mock_stac_items)
116+
mock_search.items_as_dicts.return_value = [
117+
item.to_dict() for item in self.mock_stac_items
118+
]
113119

114120
# Setup COG parser mock
115121
mock_parser_instance = AsyncMock()

0 commit comments

Comments
 (0)