Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ci/test_integration.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ devices=$2

# Run only for Merlin Tensorflow Container
if [ "$container" == "merlin-tensorflow" ]; then
pip install 'feast<0.20'
pip install feast==0.31
pip install dask==2023.1.1 distributed==2023.1.1 protobuf==3.20.3
CUDA_VISIBLE_DEVICES="$devices" pytest -rxs tests/integration
fi
Original file line number Diff line number Diff line change
Expand Up @@ -739,8 +739,8 @@
}
],
"source": [
"!rm -rf $BASE_DIR/feature_repo\n",
"!cd $BASE_DIR && feast init feature_repo"
"!rm -rf $BASE_DIR/feast\n",
"!cd $BASE_DIR && feast init feast"
]
},
{
Expand All @@ -758,9 +758,9 @@
"metadata": {},
"outputs": [],
"source": [
"feature_repo_path = os.path.join(BASE_DIR, \"feature_repo\")\n",
"if os.path.exists(f\"{feature_repo_path}/example.py\"):\n",
" os.remove(f\"{feature_repo_path}/example.py\")\n",
"feature_repo_path = os.path.join(BASE_DIR, \"feast/feature_repo\")\n",
"if os.path.exists(f\"{feature_repo_path}/example_repo.py\"):\n",
" os.remove(f\"{feature_repo_path}/example_repo.py\")\n",
"if os.path.exists(f\"{feature_repo_path}/data/driver_stats.parquet\"):\n",
" os.remove(f\"{feature_repo_path}/data/driver_stats.parquet\")"
]
Expand Down Expand Up @@ -1157,7 +1157,7 @@
"outputs": [],
"source": [
"user_features.to_parquet(\n",
" os.path.join(BASE_DIR, \"feature_repo/data\", \"user_features.parquet\")\n",
" os.path.join(feature_repo_path, \"data\", \"user_features.parquet\")\n",
")"
]
},
Expand Down Expand Up @@ -1313,7 +1313,7 @@
"source": [
"# save to disk\n",
"item_features.to_parquet(\n",
" os.path.join(BASE_DIR, \"feature_repo/data\", \"item_features.parquet\")\n",
" os.path.join(feature_repo_path, \"data\", \"item_features.parquet\")\n",
")"
]
},
Expand Down Expand Up @@ -1604,46 +1604,46 @@
"metadata": {},
"outputs": [],
"source": [
"file = open(os.path.join(BASE_DIR, \"feature_repo/\", \"user_features.py\"), \"w\")\n",
"file = open(os.path.join(feature_repo_path, \"user_features.py\"), \"w\")\n",
"file.write(\n",
" \"\"\"\n",
"from google.protobuf.duration_pb2 import Duration\n",
"import datetime\n",
"from feast import Entity, Feature, FeatureView, ValueType\n",
"from datetime import timedelta\n",
"from feast import Entity, Field, FeatureView, ValueType\n",
"from feast.types import Int32\n",
"from feast.infra.offline_stores.file_source import FileSource\n",
"\n",
"user_features = FileSource(\n",
" path=\"{}\",\n",
" event_timestamp_column=\"datetime\",\n",
" timestamp_field=\"datetime\",\n",
" created_timestamp_column=\"created\",\n",
")\n",
"\n",
"user_raw = Entity(name=\"user_id_raw\", value_type=ValueType.INT32, description=\"user id raw\",)\n",
"user_raw = Entity(name=\"user_id_raw\", value_type=ValueType.INT32, join_keys=[\"user_id_raw\"],)\n",
"\n",
"user_features_view = FeatureView(\n",
" name=\"user_features\",\n",
" entities=[\"user_id_raw\"],\n",
" ttl=Duration(seconds=86400 * 7),\n",
" features=[\n",
" Feature(name=\"user_shops\", dtype=ValueType.INT32),\n",
" Feature(name=\"user_profile\", dtype=ValueType.INT32),\n",
" Feature(name=\"user_group\", dtype=ValueType.INT32),\n",
" Feature(name=\"user_gender\", dtype=ValueType.INT32),\n",
" Feature(name=\"user_age\", dtype=ValueType.INT32),\n",
" Feature(name=\"user_consumption_2\", dtype=ValueType.INT32),\n",
" Feature(name=\"user_is_occupied\", dtype=ValueType.INT32),\n",
" Feature(name=\"user_geography\", dtype=ValueType.INT32),\n",
" Feature(name=\"user_intentions\", dtype=ValueType.INT32),\n",
" Feature(name=\"user_brands\", dtype=ValueType.INT32),\n",
" Feature(name=\"user_categories\", dtype=ValueType.INT32),\n",
" Feature(name=\"user_id\", dtype=ValueType.INT32),\n",
" entities=[user_raw],\n",
" ttl=timedelta(0),\n",
" schema=[\n",
" Field(name=\"user_shops\", dtype=Int32),\n",
" Field(name=\"user_profile\", dtype=Int32),\n",
" Field(name=\"user_group\", dtype=Int32),\n",
" Field(name=\"user_gender\", dtype=Int32),\n",
" Field(name=\"user_age\", dtype=Int32),\n",
" Field(name=\"user_consumption_2\", dtype=Int32),\n",
" Field(name=\"user_is_occupied\", dtype=Int32),\n",
" Field(name=\"user_geography\", dtype=Int32),\n",
" Field(name=\"user_intentions\", dtype=Int32),\n",
" Field(name=\"user_brands\", dtype=Int32),\n",
" Field(name=\"user_categories\", dtype=Int32),\n",
" Field(name=\"user_id\", dtype=Int32),\n",
" ],\n",
" online=True,\n",
" input=user_features,\n",
" source=user_features,\n",
" tags=dict(),\n",
")\n",
"\"\"\".format(\n",
" os.path.join(BASE_DIR, \"feature_repo/data/\", \"user_features.parquet\")\n",
" os.path.join(feature_repo_path, \"data/\", \"user_features.parquet\")\n",
" )\n",
")\n",
"file.close()"
Expand All @@ -1656,38 +1656,38 @@
"metadata": {},
"outputs": [],
"source": [
"with open(os.path.join(BASE_DIR, \"feature_repo/\", \"item_features.py\"), \"w\") as f:\n",
"with open(os.path.join(feature_repo_path, \"item_features.py\"), \"w\") as f:\n",
" f.write(\n",
" \"\"\"\n",
"from google.protobuf.duration_pb2 import Duration\n",
"import datetime\n",
"from feast import Entity, Feature, FeatureView, ValueType\n",
"from datetime import timedelta\n",
"from feast import Entity, Field, FeatureView, ValueType\n",
"from feast.types import Int32\n",
"from feast.infra.offline_stores.file_source import FileSource\n",
"\n",
"item_features = FileSource(\n",
" path=\"{}\",\n",
" event_timestamp_column=\"datetime\",\n",
" timestamp_field=\"datetime\",\n",
" created_timestamp_column=\"created\",\n",
")\n",
"\n",
"item = Entity(name=\"item_id\", value_type=ValueType.INT32, description=\"item id\",)\n",
"item = Entity(name=\"item_id\", value_type=ValueType.INT32, join_keys=[\"item_id\"],)\n",
"\n",
"item_features_view = FeatureView(\n",
" name=\"item_features\",\n",
" entities=[\"item_id\"],\n",
" ttl=Duration(seconds=86400 * 7),\n",
" features=[\n",
" Feature(name=\"item_category\", dtype=ValueType.INT32),\n",
" Feature(name=\"item_shop\", dtype=ValueType.INT32),\n",
" Feature(name=\"item_brand\", dtype=ValueType.INT32),\n",
" Feature(name=\"item_id_raw\", dtype=ValueType.INT32),\n",
" entities=[item],\n",
" ttl=timedelta(0),\n",
" schema=[\n",
" Field(name=\"item_category\", dtype=Int32),\n",
" Field(name=\"item_shop\", dtype=Int32),\n",
" Field(name=\"item_brand\", dtype=Int32),\n",
" Field(name=\"item_id_raw\", dtype=Int32),\n",
" ],\n",
" online=True,\n",
" input=item_features,\n",
" source=item_features,\n",
" tags=dict(),\n",
")\n",
"\"\"\".format(\n",
" os.path.join(BASE_DIR, \"feature_repo/data/\", \"item_features.parquet\")\n",
" os.path.join(feature_repo_path, \"data/\", \"item_features.parquet\")\n",
" )\n",
" )\n",
"file.close()"
Expand Down Expand Up @@ -1746,7 +1746,7 @@
"source": [
"import seedir as sd\n",
"\n",
"feature_repo_path = os.path.join(BASE_DIR, \"feature_repo\")\n",
"feature_repo_path = os.path.join(BASE_DIR, \"feast\")\n",
"sd.seedir(\n",
" feature_repo_path,\n",
" style=\"lines\",\n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,10 @@
"import numpy as np\n",
"import pandas as pd\n",
"import feast\n",
"import faiss\n",
"import seedir as sd\n",
"from nvtabular import ColumnSchema, Schema\n",
"\n",
"from merlin.systems.dag.ensemble import Ensemble\n",
"from merlin.systems.dag.ops.session_filter import FilterCandidates\n",
"from merlin.systems.dag.ops.softmax_sampling import SoftmaxSampling\n",
"from merlin.systems.dag.ops.tensorflow import PredictTensorflow\n",
"from merlin.systems.dag.ops.unroll_features import UnrollFeatures\n",
Expand Down Expand Up @@ -134,7 +132,7 @@
"BASE_DIR = os.environ.get(\"BASE_DIR\", \"/Merlin/examples/Building-and-deploying-multi-stage-RecSys/\")\n",
"\n",
"# define feature repo path\n",
"feast_repo_path = os.path.join(BASE_DIR, \"feature_repo/\")"
"feast_repo_path = os.path.join(BASE_DIR, \"feast/feature_repo/\")"
]
},
{
Expand Down Expand Up @@ -236,8 +234,7 @@
],
"source": [
"# set up the base dir to for feature store\n",
"feature_repo_path = os.path.join(BASE_DIR, 'feature_repo')\n",
"sd.seedir(feature_repo_path, style='lines', itemlimit=10, depthlimit=5, exclude_folders=['.ipynb_checkpoints', '__pycache__'], sort=True)"
"sd.seedir(os.path.join(BASE_DIR, 'feast'), style='lines', itemlimit=10, depthlimit=5, exclude_folders=['.ipynb_checkpoints', '__pycache__'], sort=True)"
]
},
{
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ nvtabular>=23.4.0
merlin-models>=23.4.0
merlin-systems>=23.4.0
transformers4rec>=23.4.0
feast==0.31
43 changes: 25 additions & 18 deletions tests/unit/examples/test_building_deploying_multi_stage_RecSys.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@

# flake8: noqa

def test_func():

def test_func(tmpdir):
with testbook(
REPO_ROOT
/ "examples"
Expand All @@ -21,21 +22,27 @@ def test_func():
execute=False,
) as tb1:
tb1.inject(
"""
f"""
import os
os.environ["DATA_FOLDER"] = "/tmp/data/"
os.system("mkdir -p {tmpdir / 'examples/'}")
os.system("mkdir -p {tmpdir / 'data/'}")
os.system("mkdir -p {tmpdir / 'feast/feature_repo/data/'}")
os.environ["DATA_FOLDER"] = "{tmpdir / 'data/'}"
os.environ["NUM_ROWS"] = "100000"
os.system("mkdir -p /tmp/examples")
os.environ["BASE_DIR"] = "/tmp/examples/"
os.environ["BASE_DIR"] = "{tmpdir / 'examples/'}"
"""
)
tb1.execute()
assert os.path.isdir("/tmp/examples/dlrm")
assert os.path.isdir("/tmp/examples/feature_repo")
assert os.path.isdir("/tmp/examples/query_tower")
assert os.path.isfile("/tmp/examples/item_embeddings.parquet")
assert os.path.isfile("/tmp/examples/feature_repo/user_features.py")
assert os.path.isfile("/tmp/examples/feature_repo/item_features.py")
assert os.path.isdir(f"{tmpdir / 'examples/dlrm'}")
assert os.path.isdir(f"{tmpdir / 'examples/feast/feature_repo'}")
assert os.path.isdir(f"{tmpdir / 'examples/query_tower'}")
assert os.path.isfile(f"{tmpdir / 'examples/item_embeddings.parquet'}")
assert os.path.isfile(
f"{tmpdir / 'examples/feast/feature_repo/user_features.py'}"
)
assert os.path.isfile(
f"{tmpdir / 'examples/feast/feature_repo/item_features.py'}"
)

with testbook(
REPO_ROOT
Expand All @@ -46,10 +53,10 @@ def test_func():
timeout=180,
) as tb2:
tb2.inject(
"""
f"""
import os
os.environ["DATA_FOLDER"] = "/tmp/data/"
os.environ["BASE_DIR"] = "/tmp/examples/"
os.environ["DATA_FOLDER"] = "{tmpdir / "data"}"
os.environ["BASE_DIR"] = "{tmpdir / "examples"}"
os.environ["topk_retrieval"] = "20"
"""
)
Expand All @@ -59,23 +66,23 @@ def test_func():
outputs = tb2.ref("outputs")
assert outputs[0] == "ordered_ids"
tb2.inject(
"""
f"""
import shutil
from merlin.core.dispatch import get_lib
from merlin.dataloader.tf_utils import configure_tensorflow
configure_tensorflow()
df_lib = get_lib()
train = df_lib.read_parquet(
os.path.join("/tmp/data/processed_nvt/", "train", "part_0.parquet"),
os.path.join("{tmpdir / "data"}/processed_nvt/", "train", "part_0.parquet"),
columns=["user_id_raw"],
)
batch = train[:1]
from merlin.systems.triton.utils import run_ensemble_on_tritonserver
response = run_ensemble_on_tritonserver(
"/tmp/examples/poc_ensemble", ensemble.graph.input_schema, batch, outputs, "executor_model"
"{tmpdir / "examples"}/poc_ensemble", ensemble.graph.input_schema, batch, outputs, "executor_model"
)
ordered_ids = [x.tolist() for x in response["ordered_ids"]]
shutil.rmtree("/tmp/examples/", ignore_errors=True)
shutil.rmtree("{tmpdir / "examples"}", ignore_errors=True)
"""
)
tb2.execute_cell(NUM_OF_CELLS - 2)
Expand Down