Skip to content

Commit b486f29

Browse files
nanohannohkuepers
andauthored
feat: Improvements to Lambda materialization engine (feast-dev#5379)
* Set read_timeout for lambda client Signed-off-by: hkuepers <[email protected]> * Handle empty return from offline store Signed-off-by: hkuepers <[email protected]> * Add lambda read timeout retries Signed-off-by: hkuepers <[email protected]> * Add error handling in Lambda materialization engine Signed-off-by: hkuepers <[email protected]> * Fix error handling Signed-off-by: hkuepers <[email protected]> --------- Signed-off-by: hkuepers <[email protected]> Co-authored-by: hkuepers <[email protected]>
1 parent c3188e4 commit b486f29

File tree

1 file changed

+88
-43
lines changed

1 file changed

+88
-43
lines changed

sdk/python/feast/infra/materialization/aws_lambda/lambda_engine.py

Lines changed: 88 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from typing import Callable, List, Literal, Optional, Sequence, Union
88

99
import boto3
10+
from botocore.config import Config
1011
from pydantic import StrictStr
1112
from tqdm import tqdm
1213

@@ -33,6 +34,8 @@
3334
from feast.version import get_version
3435

3536
DEFAULT_BATCH_SIZE = 10_000
37+
DEFAULT_TIMEOUT = 600
38+
LAMBDA_TIMEOUT_RETRIES = 5
3639

3740
logger = logging.getLogger(__name__)
3841

@@ -52,11 +55,16 @@ class LambdaMaterializationEngineConfig(FeastConfigBaseModel):
5255

5356
@dataclass
5457
class LambdaMaterializationJob(MaterializationJob):
55-
def __init__(self, job_id: str, status: MaterializationJobStatus) -> None:
58+
def __init__(
59+
self,
60+
job_id: str,
61+
status: MaterializationJobStatus,
62+
error: Optional[BaseException] = None,
63+
) -> None:
5664
super().__init__()
5765
self._job_id: str = job_id
5866
self._status = status
59-
self._error = None
67+
self._error = error
6068

6169
def status(self) -> MaterializationJobStatus:
6270
return self._status
@@ -97,7 +105,7 @@ def update(
97105
PackageType="Image",
98106
Role=self.repo_config.batch_engine.lambda_role,
99107
Code={"ImageUri": self.repo_config.batch_engine.materialization_image},
100-
Timeout=600,
108+
Timeout=DEFAULT_TIMEOUT,
101109
Tags={
102110
"feast-owned": "True",
103111
"project": project,
@@ -149,7 +157,8 @@ def __init__(
149157
self.lambda_name = f"feast-materialize-{self.repo_config.project}"
150158
if len(self.lambda_name) > 64:
151159
self.lambda_name = self.lambda_name[:64]
152-
self.lambda_client = boto3.client("lambda")
160+
config = Config(read_timeout=DEFAULT_TIMEOUT + 10)
161+
self.lambda_client = boto3.client("lambda", config=config)
153162

154163
def materialize(
155164
self, registry, tasks: List[MaterializationTask]
@@ -200,47 +209,83 @@ def _materialize_one(
200209
)
201210

202211
paths = offline_job.to_remote_storage()
203-
max_workers = len(paths) if len(paths) <= 20 else 20
204-
executor = ThreadPoolExecutor(max_workers=max_workers)
205-
futures = []
206-
207-
for path in paths:
208-
payload = {
209-
FEATURE_STORE_YAML_ENV_NAME: self.feature_store_base64,
210-
"view_name": feature_view.name,
211-
"view_type": "batch",
212-
"path": path,
213-
}
214-
# Invoke a lambda to materialize this file.
215-
216-
logger.info("Invoking materialization for %s", path)
217-
futures.append(
218-
executor.submit(
219-
self.lambda_client.invoke,
220-
FunctionName=self.lambda_name,
221-
InvocationType="RequestResponse",
222-
Payload=json.dumps(payload),
223-
)
212+
if (num_files := len(paths)) == 0:
213+
logger.warning("No values to update for the given time range.")
214+
return LambdaMaterializationJob(
215+
job_id=job_id, status=MaterializationJobStatus.SUCCEEDED
224216
)
217+
else:
218+
max_workers = num_files if num_files <= 20 else 20
219+
executor = ThreadPoolExecutor(max_workers=max_workers)
220+
futures = []
221+
222+
for path in paths:
223+
payload = {
224+
FEATURE_STORE_YAML_ENV_NAME: self.feature_store_base64,
225+
"view_name": feature_view.name,
226+
"view_type": "batch",
227+
"path": path,
228+
}
229+
# Invoke a lambda to materialize this file.
230+
231+
logger.info("Invoking materialization for %s", path)
232+
futures.append(
233+
executor.submit(
234+
self.invoke_with_retries,
235+
FunctionName=self.lambda_name,
236+
InvocationType="RequestResponse",
237+
Payload=json.dumps(payload),
238+
)
239+
)
225240

226-
done, not_done = wait(futures)
227-
logger.info("Done: %s Not Done: %s", done, not_done)
228-
for f in done:
229-
response = f.result()
230-
output = json.loads(response["Payload"].read())
241+
done, not_done = wait(futures)
242+
logger.info("Done: %s Not Done: %s", done, not_done)
243+
errors = []
244+
for f in done:
245+
response, payload = f.result()
231246

232-
logger.info(
233-
f"Ingested task; request id {response['ResponseMetadata']['RequestId']}, "
234-
f"Output: {output}"
235-
)
247+
logger.info(
248+
f"Ingested task; request id {response['ResponseMetadata']['RequestId']}, "
249+
f"Output: {payload}"
250+
)
251+
if "errorMessage" in payload.keys():
252+
errors.append(payload["errorMessage"])
236253

237-
for f in not_done:
238-
response = f.result()
239-
logger.error(f"Ingestion failed: {response}")
254+
for f in not_done:
255+
response, payload = f.result()
256+
logger.error(f"Ingestion failed: {response=}, {payload=}")
240257

241-
return LambdaMaterializationJob(
242-
job_id=job_id,
243-
status=MaterializationJobStatus.SUCCEEDED
244-
if not not_done
245-
else MaterializationJobStatus.ERROR,
246-
)
258+
if len(not_done) == 0 and len(errors) == 0:
259+
return LambdaMaterializationJob(
260+
job_id=job_id, status=MaterializationJobStatus.SUCCEEDED
261+
)
262+
else:
263+
return LambdaMaterializationJob(
264+
job_id=job_id,
265+
status=MaterializationJobStatus.ERROR,
266+
error=RuntimeError(
267+
f"Lambda functions did not finish successfully: {errors}"
268+
),
269+
)
270+
271+
def invoke_with_retries(self, **kwargs):
272+
"""Invoke the Lambda function and retry if it times out.
273+
274+
The Lambda function may time out initially if many values are updated
275+
and DynamoDB throttles requests. As soon as the DynamoDB tables
276+
are scaled up, the Lambda function can succeed upon retry with higher
277+
throughput.
278+
279+
"""
280+
retries = 0
281+
while retries < LAMBDA_TIMEOUT_RETRIES:
282+
response = self.lambda_client.invoke(**kwargs)
283+
payload = json.loads(response["Payload"].read()) or {}
284+
if "Task timed out after" not in payload.get("errorMessage", ""):
285+
break
286+
retries += 1
287+
logger.warning(
288+
"Retrying lambda function after lambda timeout in request"
289+
f"{response['ResponseMetadata']['RequestId']}"
290+
)
291+
return response, payload

0 commit comments

Comments
 (0)