Skip to content

Commit 3aa13a6

Browse files
authored
Update feast to run on newest release version 0.31 (#975)
* use tmpdir for example test * update version of feast to latest release * remove unused faiss import * update integration test feast install * adding feast to requirements to ensure correct version is available on test environment * remove pip requirements
1 parent ba7ecde commit 3aa13a6

File tree

5 files changed

+75
-70
lines changed

5 files changed

+75
-70
lines changed

ci/test_integration.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ devices=$2
2424

2525
# Run only for Merlin Tensorflow Container
2626
if [ "$container" == "merlin-tensorflow" ]; then
27-
pip install 'feast<0.20'
27+
pip install feast==0.31
2828
pip install dask==2023.1.1 distributed==2023.1.1 protobuf==3.20.3
2929
CUDA_VISIBLE_DEVICES="$devices" pytest -rxs tests/integration
3030
fi

examples/Building-and-deploying-multi-stage-RecSys/01-Building-Recommender-Systems-with-Merlin.ipynb

Lines changed: 46 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -739,8 +739,8 @@
739739
}
740740
],
741741
"source": [
742-
"!rm -rf $BASE_DIR/feature_repo\n",
743-
"!cd $BASE_DIR && feast init feature_repo"
742+
"!rm -rf $BASE_DIR/feast\n",
743+
"!cd $BASE_DIR && feast init feast"
744744
]
745745
},
746746
{
@@ -758,9 +758,9 @@
758758
"metadata": {},
759759
"outputs": [],
760760
"source": [
761-
"feature_repo_path = os.path.join(BASE_DIR, \"feature_repo\")\n",
762-
"if os.path.exists(f\"{feature_repo_path}/example.py\"):\n",
763-
" os.remove(f\"{feature_repo_path}/example.py\")\n",
761+
"feature_repo_path = os.path.join(BASE_DIR, \"feast/feature_repo\")\n",
762+
"if os.path.exists(f\"{feature_repo_path}/example_repo.py\"):\n",
763+
" os.remove(f\"{feature_repo_path}/example_repo.py\")\n",
764764
"if os.path.exists(f\"{feature_repo_path}/data/driver_stats.parquet\"):\n",
765765
" os.remove(f\"{feature_repo_path}/data/driver_stats.parquet\")"
766766
]
@@ -1157,7 +1157,7 @@
11571157
"outputs": [],
11581158
"source": [
11591159
"user_features.to_parquet(\n",
1160-
" os.path.join(BASE_DIR, \"feature_repo/data\", \"user_features.parquet\")\n",
1160+
" os.path.join(feature_repo_path, \"data\", \"user_features.parquet\")\n",
11611161
")"
11621162
]
11631163
},
@@ -1313,7 +1313,7 @@
13131313
"source": [
13141314
"# save to disk\n",
13151315
"item_features.to_parquet(\n",
1316-
" os.path.join(BASE_DIR, \"feature_repo/data\", \"item_features.parquet\")\n",
1316+
" os.path.join(feature_repo_path, \"data\", \"item_features.parquet\")\n",
13171317
")"
13181318
]
13191319
},
@@ -1604,46 +1604,46 @@
16041604
"metadata": {},
16051605
"outputs": [],
16061606
"source": [
1607-
"file = open(os.path.join(BASE_DIR, \"feature_repo/\", \"user_features.py\"), \"w\")\n",
1607+
"file = open(os.path.join(feature_repo_path, \"user_features.py\"), \"w\")\n",
16081608
"file.write(\n",
16091609
" \"\"\"\n",
1610-
"from google.protobuf.duration_pb2 import Duration\n",
1611-
"import datetime\n",
1612-
"from feast import Entity, Feature, FeatureView, ValueType\n",
1610+
"from datetime import timedelta\n",
1611+
"from feast import Entity, Field, FeatureView, ValueType\n",
1612+
"from feast.types import Int32\n",
16131613
"from feast.infra.offline_stores.file_source import FileSource\n",
16141614
"\n",
16151615
"user_features = FileSource(\n",
16161616
" path=\"{}\",\n",
1617-
" event_timestamp_column=\"datetime\",\n",
1617+
" timestamp_field=\"datetime\",\n",
16181618
" created_timestamp_column=\"created\",\n",
16191619
")\n",
16201620
"\n",
1621-
"user_raw = Entity(name=\"user_id_raw\", value_type=ValueType.INT32, description=\"user id raw\",)\n",
1621+
"user_raw = Entity(name=\"user_id_raw\", value_type=ValueType.INT32, join_keys=[\"user_id_raw\"],)\n",
16221622
"\n",
16231623
"user_features_view = FeatureView(\n",
16241624
" name=\"user_features\",\n",
1625-
" entities=[\"user_id_raw\"],\n",
1626-
" ttl=Duration(seconds=86400 * 7),\n",
1627-
" features=[\n",
1628-
" Feature(name=\"user_shops\", dtype=ValueType.INT32),\n",
1629-
" Feature(name=\"user_profile\", dtype=ValueType.INT32),\n",
1630-
" Feature(name=\"user_group\", dtype=ValueType.INT32),\n",
1631-
" Feature(name=\"user_gender\", dtype=ValueType.INT32),\n",
1632-
" Feature(name=\"user_age\", dtype=ValueType.INT32),\n",
1633-
" Feature(name=\"user_consumption_2\", dtype=ValueType.INT32),\n",
1634-
" Feature(name=\"user_is_occupied\", dtype=ValueType.INT32),\n",
1635-
" Feature(name=\"user_geography\", dtype=ValueType.INT32),\n",
1636-
" Feature(name=\"user_intentions\", dtype=ValueType.INT32),\n",
1637-
" Feature(name=\"user_brands\", dtype=ValueType.INT32),\n",
1638-
" Feature(name=\"user_categories\", dtype=ValueType.INT32),\n",
1639-
" Feature(name=\"user_id\", dtype=ValueType.INT32),\n",
1625+
" entities=[user_raw],\n",
1626+
" ttl=timedelta(0),\n",
1627+
" schema=[\n",
1628+
" Field(name=\"user_shops\", dtype=Int32),\n",
1629+
" Field(name=\"user_profile\", dtype=Int32),\n",
1630+
" Field(name=\"user_group\", dtype=Int32),\n",
1631+
" Field(name=\"user_gender\", dtype=Int32),\n",
1632+
" Field(name=\"user_age\", dtype=Int32),\n",
1633+
" Field(name=\"user_consumption_2\", dtype=Int32),\n",
1634+
" Field(name=\"user_is_occupied\", dtype=Int32),\n",
1635+
" Field(name=\"user_geography\", dtype=Int32),\n",
1636+
" Field(name=\"user_intentions\", dtype=Int32),\n",
1637+
" Field(name=\"user_brands\", dtype=Int32),\n",
1638+
" Field(name=\"user_categories\", dtype=Int32),\n",
1639+
" Field(name=\"user_id\", dtype=Int32),\n",
16401640
" ],\n",
16411641
" online=True,\n",
1642-
" input=user_features,\n",
1642+
" source=user_features,\n",
16431643
" tags=dict(),\n",
16441644
")\n",
16451645
"\"\"\".format(\n",
1646-
" os.path.join(BASE_DIR, \"feature_repo/data/\", \"user_features.parquet\")\n",
1646+
" os.path.join(feature_repo_path, \"data/\", \"user_features.parquet\")\n",
16471647
" )\n",
16481648
")\n",
16491649
"file.close()"
@@ -1656,38 +1656,38 @@
16561656
"metadata": {},
16571657
"outputs": [],
16581658
"source": [
1659-
"with open(os.path.join(BASE_DIR, \"feature_repo/\", \"item_features.py\"), \"w\") as f:\n",
1659+
"with open(os.path.join(feature_repo_path, \"item_features.py\"), \"w\") as f:\n",
16601660
" f.write(\n",
16611661
" \"\"\"\n",
1662-
"from google.protobuf.duration_pb2 import Duration\n",
1663-
"import datetime\n",
1664-
"from feast import Entity, Feature, FeatureView, ValueType\n",
1662+
"from datetime import timedelta\n",
1663+
"from feast import Entity, Field, FeatureView, ValueType\n",
1664+
"from feast.types import Int32\n",
16651665
"from feast.infra.offline_stores.file_source import FileSource\n",
16661666
"\n",
16671667
"item_features = FileSource(\n",
16681668
" path=\"{}\",\n",
1669-
" event_timestamp_column=\"datetime\",\n",
1669+
" timestamp_field=\"datetime\",\n",
16701670
" created_timestamp_column=\"created\",\n",
16711671
")\n",
16721672
"\n",
1673-
"item = Entity(name=\"item_id\", value_type=ValueType.INT32, description=\"item id\",)\n",
1673+
"item = Entity(name=\"item_id\", value_type=ValueType.INT32, join_keys=[\"item_id\"],)\n",
16741674
"\n",
16751675
"item_features_view = FeatureView(\n",
16761676
" name=\"item_features\",\n",
1677-
" entities=[\"item_id\"],\n",
1678-
" ttl=Duration(seconds=86400 * 7),\n",
1679-
" features=[\n",
1680-
" Feature(name=\"item_category\", dtype=ValueType.INT32),\n",
1681-
" Feature(name=\"item_shop\", dtype=ValueType.INT32),\n",
1682-
" Feature(name=\"item_brand\", dtype=ValueType.INT32),\n",
1683-
" Feature(name=\"item_id_raw\", dtype=ValueType.INT32),\n",
1677+
" entities=[item],\n",
1678+
" ttl=timedelta(0),\n",
1679+
" schema=[\n",
1680+
" Field(name=\"item_category\", dtype=Int32),\n",
1681+
" Field(name=\"item_shop\", dtype=Int32),\n",
1682+
" Field(name=\"item_brand\", dtype=Int32),\n",
1683+
" Field(name=\"item_id_raw\", dtype=Int32),\n",
16841684
" ],\n",
16851685
" online=True,\n",
1686-
" input=item_features,\n",
1686+
" source=item_features,\n",
16871687
" tags=dict(),\n",
16881688
")\n",
16891689
"\"\"\".format(\n",
1690-
" os.path.join(BASE_DIR, \"feature_repo/data/\", \"item_features.parquet\")\n",
1690+
" os.path.join(feature_repo_path, \"data/\", \"item_features.parquet\")\n",
16911691
" )\n",
16921692
" )\n",
16931693
"file.close()"
@@ -1746,7 +1746,7 @@
17461746
"source": [
17471747
"import seedir as sd\n",
17481748
"\n",
1749-
"feature_repo_path = os.path.join(BASE_DIR, \"feature_repo\")\n",
1749+
"feature_repo_path = os.path.join(BASE_DIR, \"feast\")\n",
17501750
"sd.seedir(\n",
17511751
" feature_repo_path,\n",
17521752
" style=\"lines\",\n",

examples/Building-and-deploying-multi-stage-RecSys/02-Deploying-multi-stage-RecSys-with-Merlin-Systems.ipynb

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -94,12 +94,10 @@
9494
"import numpy as np\n",
9595
"import pandas as pd\n",
9696
"import feast\n",
97-
"import faiss\n",
9897
"import seedir as sd\n",
9998
"from nvtabular import ColumnSchema, Schema\n",
10099
"\n",
101100
"from merlin.systems.dag.ensemble import Ensemble\n",
102-
"from merlin.systems.dag.ops.session_filter import FilterCandidates\n",
103101
"from merlin.systems.dag.ops.softmax_sampling import SoftmaxSampling\n",
104102
"from merlin.systems.dag.ops.tensorflow import PredictTensorflow\n",
105103
"from merlin.systems.dag.ops.unroll_features import UnrollFeatures\n",
@@ -134,7 +132,7 @@
134132
"BASE_DIR = os.environ.get(\"BASE_DIR\", \"/Merlin/examples/Building-and-deploying-multi-stage-RecSys/\")\n",
135133
"\n",
136134
"# define feature repo path\n",
137-
"feast_repo_path = os.path.join(BASE_DIR, \"feature_repo/\")"
135+
"feast_repo_path = os.path.join(BASE_DIR, \"feast/feature_repo/\")"
138136
]
139137
},
140138
{
@@ -236,8 +234,7 @@
236234
],
237235
"source": [
238236
"# set up the base dir to for feature store\n",
239-
"feature_repo_path = os.path.join(BASE_DIR, 'feature_repo')\n",
240-
"sd.seedir(feature_repo_path, style='lines', itemlimit=10, depthlimit=5, exclude_folders=['.ipynb_checkpoints', '__pycache__'], sort=True)"
237+
"sd.seedir(os.path.join(BASE_DIR, 'feast'), style='lines', itemlimit=10, depthlimit=5, exclude_folders=['.ipynb_checkpoints', '__pycache__'], sort=True)"
241238
]
242239
},
243240
{

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@ nvtabular>=23.4.0
44
merlin-models>=23.4.0
55
merlin-systems>=23.4.0
66
transformers4rec>=23.4.0
7+
feast==0.31

tests/unit/examples/test_building_deploying_multi_stage_RecSys.py

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212

1313
# flake8: noqa
1414

15-
def test_func():
15+
16+
def test_func(tmpdir):
1617
with testbook(
1718
REPO_ROOT
1819
/ "examples"
@@ -21,21 +22,27 @@ def test_func():
2122
execute=False,
2223
) as tb1:
2324
tb1.inject(
24-
"""
25+
f"""
2526
import os
26-
os.environ["DATA_FOLDER"] = "/tmp/data/"
27+
os.system("mkdir -p {tmpdir / 'examples/'}")
28+
os.system("mkdir -p {tmpdir / 'data/'}")
29+
os.system("mkdir -p {tmpdir / 'feast/feature_repo/data/'}")
30+
os.environ["DATA_FOLDER"] = "{tmpdir / 'data/'}"
2731
os.environ["NUM_ROWS"] = "100000"
28-
os.system("mkdir -p /tmp/examples")
29-
os.environ["BASE_DIR"] = "/tmp/examples/"
32+
os.environ["BASE_DIR"] = "{tmpdir / 'examples/'}"
3033
"""
3134
)
3235
tb1.execute()
33-
assert os.path.isdir("/tmp/examples/dlrm")
34-
assert os.path.isdir("/tmp/examples/feature_repo")
35-
assert os.path.isdir("/tmp/examples/query_tower")
36-
assert os.path.isfile("/tmp/examples/item_embeddings.parquet")
37-
assert os.path.isfile("/tmp/examples/feature_repo/user_features.py")
38-
assert os.path.isfile("/tmp/examples/feature_repo/item_features.py")
36+
assert os.path.isdir(f"{tmpdir / 'examples/dlrm'}")
37+
assert os.path.isdir(f"{tmpdir / 'examples/feast/feature_repo'}")
38+
assert os.path.isdir(f"{tmpdir / 'examples/query_tower'}")
39+
assert os.path.isfile(f"{tmpdir / 'examples/item_embeddings.parquet'}")
40+
assert os.path.isfile(
41+
f"{tmpdir / 'examples/feast/feature_repo/user_features.py'}"
42+
)
43+
assert os.path.isfile(
44+
f"{tmpdir / 'examples/feast/feature_repo/item_features.py'}"
45+
)
3946

4047
with testbook(
4148
REPO_ROOT
@@ -46,10 +53,10 @@ def test_func():
4653
timeout=180,
4754
) as tb2:
4855
tb2.inject(
49-
"""
56+
f"""
5057
import os
51-
os.environ["DATA_FOLDER"] = "/tmp/data/"
52-
os.environ["BASE_DIR"] = "/tmp/examples/"
58+
os.environ["DATA_FOLDER"] = "{tmpdir / "data"}"
59+
os.environ["BASE_DIR"] = "{tmpdir / "examples"}"
5360
os.environ["topk_retrieval"] = "20"
5461
"""
5562
)
@@ -59,23 +66,23 @@ def test_func():
5966
outputs = tb2.ref("outputs")
6067
assert outputs[0] == "ordered_ids"
6168
tb2.inject(
62-
"""
69+
f"""
6370
import shutil
6471
from merlin.core.dispatch import get_lib
6572
from merlin.dataloader.tf_utils import configure_tensorflow
6673
configure_tensorflow()
6774
df_lib = get_lib()
6875
train = df_lib.read_parquet(
69-
os.path.join("/tmp/data/processed_nvt/", "train", "part_0.parquet"),
76+
os.path.join("{tmpdir / "data"}/processed_nvt/", "train", "part_0.parquet"),
7077
columns=["user_id_raw"],
7178
)
7279
batch = train[:1]
7380
from merlin.systems.triton.utils import run_ensemble_on_tritonserver
7481
response = run_ensemble_on_tritonserver(
75-
"/tmp/examples/poc_ensemble", ensemble.graph.input_schema, batch, outputs, "executor_model"
82+
"{tmpdir / "examples"}/poc_ensemble", ensemble.graph.input_schema, batch, outputs, "executor_model"
7683
)
7784
ordered_ids = [x.tolist() for x in response["ordered_ids"]]
78-
shutil.rmtree("/tmp/examples/", ignore_errors=True)
85+
shutil.rmtree("{tmpdir / "examples"}", ignore_errors=True)
7986
"""
8087
)
8188
tb2.execute_cell(NUM_OF_CELLS - 2)

0 commit comments

Comments
 (0)