Skip to content

Commit f21f67c

Browse files
franciscojavierarceontkathole
authored andcommitted
attempting fix for dynamo test
Signed-off-by: Francisco Javier Arceo <[email protected]>
1 parent f16b5e0 commit f21f67c

File tree

1 file changed

+66
-64
lines changed

1 file changed

+66
-64
lines changed

sdk/python/feast/infra/online_stores/dynamodb.py

Lines changed: 66 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -108,15 +108,24 @@ class DynamoDBOnlineStore(OnlineStore):
108108
Attributes:
109109
_dynamodb_client: Boto3 DynamoDB client.
110110
_dynamodb_resource: Boto3 DynamoDB resource.
111+
_aioboto_session: Async boto session.
112+
_aioboto_client: Async boto client.
113+
_aioboto_context_stack: Async context stack.
111114
"""
112115

113116
_dynamodb_client = None
114117
_dynamodb_resource = None
115118

119+
def __init__(self):
120+
super().__init__()
121+
self._aioboto_session = None
122+
self._aioboto_client = None
123+
self._aioboto_context_stack = None
124+
116125
async def initialize(self, config: RepoConfig):
117126
online_config = config.online_store
118127

119-
await _get_aiodynamodb_client(
128+
await self._get_aiodynamodb_client(
120129
online_config.region,
121130
online_config.max_pool_connections,
122131
online_config.keepalive_timeout,
@@ -127,7 +136,59 @@ async def initialize(self, config: RepoConfig):
127136
)
128137

129138
async def close(self):
130-
await _aiodynamodb_close()
139+
await self._aiodynamodb_close()
140+
141+
def _get_aioboto_session(self):
142+
if self._aioboto_session is None:
143+
logger.debug("initializing the aiobotocore session")
144+
self._aioboto_session = session.get_session()
145+
return self._aioboto_session
146+
147+
async def _get_aiodynamodb_client(
148+
self,
149+
region: str,
150+
max_pool_connections: int,
151+
keepalive_timeout: float,
152+
connect_timeout: Union[int, float],
153+
read_timeout: Union[int, float],
154+
total_max_retry_attempts: Union[int, None],
155+
retry_mode: Union[Literal["legacy", "standard", "adaptive"], None],
156+
):
157+
if self._aioboto_client is None:
158+
logger.debug("initializing the aiobotocore dynamodb client")
159+
160+
retries: Dict[str, Any] = {}
161+
if total_max_retry_attempts is not None:
162+
retries["total_max_attempts"] = total_max_retry_attempts
163+
if retry_mode is not None:
164+
retries["mode"] = retry_mode
165+
166+
client_context = self._get_aioboto_session().create_client(
167+
"dynamodb",
168+
region_name=region,
169+
config=AioConfig(
170+
max_pool_connections=max_pool_connections,
171+
connect_timeout=connect_timeout,
172+
read_timeout=read_timeout,
173+
retries=retries if retries else None,
174+
connector_args={"keepalive_timeout": keepalive_timeout},
175+
),
176+
)
177+
self._aioboto_context_stack = contextlib.AsyncExitStack()
178+
self._aioboto_client = await self._aioboto_context_stack.enter_async_context(
179+
client_context
180+
)
181+
return self._aioboto_client
182+
183+
async def _aiodynamodb_close(self):
184+
if self._aioboto_client:
185+
await self._aioboto_client.close()
186+
self._aioboto_client = None
187+
if self._aioboto_context_stack:
188+
await self._aioboto_context_stack.aclose()
189+
self._aioboto_context_stack = None
190+
if self._aioboto_session:
191+
self._aioboto_session = None
131192

132193
@property
133194
def async_supported(self) -> SupportedAsyncMethods:
@@ -362,7 +423,7 @@ async def online_write_batch_async(
362423
_to_client_write_item(config, entity_key, features, timestamp)
363424
for entity_key, features, timestamp, _ in _latest_data_to_write(data)
364425
]
365-
client = await _get_aiodynamodb_client(
426+
client = await self._get_aiodynamodb_client(
366427
online_config.region,
367428
online_config.max_pool_connections,
368429
online_config.keepalive_timeout,
@@ -473,7 +534,7 @@ def to_tbl_resp(raw_client_response):
473534
batches.append(batch)
474535
entity_id_batches.append(entity_id_batch)
475536

476-
client = await _get_aiodynamodb_client(
537+
client = await self._get_aiodynamodb_client(
477538
online_config.region,
478539
online_config.max_pool_connections,
479540
online_config.keepalive_timeout,
@@ -627,66 +688,7 @@ def _to_client_batch_get_payload(online_config, table_name, batch):
627688
}
628689

629690

630-
_aioboto_session = None
631-
_aioboto_client = None
632-
_aioboto_context_stack = None
633-
634-
635-
def _get_aioboto_session():
636-
global _aioboto_session
637-
if _aioboto_session is None:
638-
logger.debug("initializing the aiobotocore session")
639-
_aioboto_session = session.get_session()
640-
return _aioboto_session
641-
642-
643-
async def _get_aiodynamodb_client(
644-
region: str,
645-
max_pool_connections: int,
646-
keepalive_timeout: float,
647-
connect_timeout: Union[int, float],
648-
read_timeout: Union[int, float],
649-
total_max_retry_attempts: Union[int, None],
650-
retry_mode: Union[Literal["legacy", "standard", "adaptive"], None],
651-
):
652-
global _aioboto_client, _aioboto_context_stack
653-
if _aioboto_client is None:
654-
logger.debug("initializing the aiobotocore dynamodb client")
655-
656-
retries: Dict[str, Any] = {}
657-
if total_max_retry_attempts is not None:
658-
retries["total_max_attempts"] = total_max_retry_attempts
659-
if retry_mode is not None:
660-
retries["mode"] = retry_mode
661-
662-
client_context = _get_aioboto_session().create_client(
663-
"dynamodb",
664-
region_name=region,
665-
config=AioConfig(
666-
max_pool_connections=max_pool_connections,
667-
connect_timeout=connect_timeout,
668-
read_timeout=read_timeout,
669-
retries=retries if retries else None,
670-
connector_args={"keepalive_timeout": keepalive_timeout},
671-
),
672-
)
673-
_aioboto_context_stack = contextlib.AsyncExitStack()
674-
_aioboto_client = await _aioboto_context_stack.enter_async_context(
675-
client_context
676-
)
677-
return _aioboto_client
678-
679-
680-
async def _aiodynamodb_close():
681-
global _aioboto_client, _aioboto_session, _aioboto_context_stack
682-
if _aioboto_client:
683-
await _aioboto_client.close()
684-
_aioboto_client = None
685-
if _aioboto_context_stack:
686-
await _aioboto_context_stack.aclose()
687-
_aioboto_context_stack = None
688-
if _aioboto_session:
689-
_aioboto_session = None
691+
# Global async client functions removed - now using instance methods
690692

691693

692694
def _initialize_dynamodb_client(

0 commit comments

Comments
 (0)