diff --git a/sdk/python/feast/transformation/pandas_transformation.py b/sdk/python/feast/transformation/pandas_transformation.py index e9dab72160..41e437fb6b 100644 --- a/sdk/python/feast/transformation/pandas_transformation.py +++ b/sdk/python/feast/transformation/pandas_transformation.py @@ -40,15 +40,27 @@ def infer_features(self, random_input: dict[str, list[Any]]) -> list[Field]: df = pd.DataFrame.from_dict(random_input) output_df: pd.DataFrame = self.transform(df) - return [ - Field( - name=f, - dtype=from_value_type( - python_type_to_feast_value_type(f, type_name=str(dt)) - ), + fields = [] + for feature_name, feature_type in zip(output_df.columns, output_df.dtypes): + feature_value = output_df[feature_name].tolist() + if len(feature_value) <= 0: + raise TypeError( + f"Failed to infer type for feature '{feature_name}' with value " + + f"'{feature_value}' since no items were returned by the UDF." + ) + fields.append( + Field( + name=feature_name, + dtype=from_value_type( + python_type_to_feast_value_type( + feature_name, + value=feature_value[0], + type_name=str(feature_type), + ) + ), + ) ) - for f, dt in zip(output_df.columns, output_df.dtypes) - ] + return fields def __eq__(self, other): if not isinstance(other, PandasTransformation): diff --git a/sdk/python/feast/transformation/python_transformation.py b/sdk/python/feast/transformation/python_transformation.py index 2a9c7db876..d828890b1e 100644 --- a/sdk/python/feast/transformation/python_transformation.py +++ b/sdk/python/feast/transformation/python_transformation.py @@ -40,15 +40,26 @@ def transform(self, input_dict: dict) -> dict: def infer_features(self, random_input: dict[str, list[Any]]) -> list[Field]: output_dict: dict[str, list[Any]] = self.transform(random_input) - return [ - Field( - name=f, - dtype=from_value_type( - python_type_to_feast_value_type(f, type_name=type(dt[0]).__name__) - ), + fields = [] + for feature_name, feature_value in output_dict.items(): + if len(feature_value) <= 0: + raise TypeError( + f"Failed to infer type for feature '{feature_name}' with value " + + f"'{feature_value}' since no items were returned by the UDF." + ) + fields.append( + Field( + name=feature_name, + dtype=from_value_type( + python_type_to_feast_value_type( + feature_name, + value=feature_value[0], + type_name=type(feature_value[0]).__name__, + ) + ), + ) ) - for f, dt in output_dict.items() - ] + return fields def __eq__(self, other): if not isinstance(other, PythonTransformation): diff --git a/sdk/python/feast/transformation/substrait_transformation.py b/sdk/python/feast/transformation/substrait_transformation.py index 17c40cf0a1..1de60aed00 100644 --- a/sdk/python/feast/transformation/substrait_transformation.py +++ b/sdk/python/feast/transformation/substrait_transformation.py @@ -60,16 +60,28 @@ def infer_features(self, random_input: dict[str, list[Any]]) -> list[Field]: df = pd.DataFrame.from_dict(random_input) output_df: pd.DataFrame = self.transform(df) - return [ - Field( - name=f, - dtype=from_value_type( - python_type_to_feast_value_type(f, type_name=str(dt)) - ), - ) - for f, dt in zip(output_df.columns, output_df.dtypes) - if f not in random_input - ] + fields = [] + for feature_name, feature_type in zip(output_df.columns, output_df.dtypes): + feature_value = output_df[feature_name].tolist() + if len(feature_value) <= 0: + raise TypeError( + f"Failed to infer type for feature '{feature_name}' with value " + + f"'{feature_value}' since no items were returned by the UDF." + ) + if feature_name not in random_input: + fields.append( + Field( + name=feature_name, + dtype=from_value_type( + python_type_to_feast_value_type( + feature_name, + value=feature_value[0], + type_name=str(feature_type), + ) + ), + ) + ) + return fields def __eq__(self, other): if not isinstance(other, SubstraitTransformation): diff --git a/sdk/python/feast/type_map.py b/sdk/python/feast/type_map.py index a0859f2f7a..6ba61fc8c5 100644 --- a/sdk/python/feast/type_map.py +++ b/sdk/python/feast/type_map.py @@ -155,6 +155,7 @@ def python_type_to_feast_value_type( "uint16": ValueType.INT32, "uint8": ValueType.INT32, "int8": ValueType.INT32, + "bool_": ValueType.BOOL, # np.bool_ "bool": ValueType.BOOL, "boolean": ValueType.BOOL, "timedelta": ValueType.UNIX_TIMESTAMP, diff --git a/sdk/python/tests/unit/test_on_demand_pandas_transformation.py b/sdk/python/tests/unit/test_on_demand_pandas_transformation.py index c5f066dd83..1a04a466fb 100644 --- a/sdk/python/tests/unit/test_on_demand_pandas_transformation.py +++ b/sdk/python/tests/unit/test_on_demand_pandas_transformation.py @@ -1,15 +1,31 @@ import os +import re import tempfile from datetime import datetime, timedelta import pandas as pd +import pytest -from feast import Entity, FeatureStore, FeatureView, FileSource, RepoConfig +from feast import ( + Entity, + FeatureStore, + FeatureView, + FileSource, + RepoConfig, + RequestSource, +) from feast.driver_test_data import create_driver_hourly_stats_df from feast.field import Field from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig from feast.on_demand_feature_view import on_demand_feature_view -from feast.types import Float32, Float64, Int64 +from feast.types import ( + Array, + Bool, + Float32, + Float64, + Int64, + String, +) def test_pandas_transformation(): @@ -91,3 +107,237 @@ def pandas_view(inputs: pd.DataFrame) -> pd.DataFrame: assert online_response["conv_rate_plus_acc"].equals( online_response["conv_rate"] + online_response["acc_rate"] ) + + +def test_pandas_transformation_returning_all_data_types(): + with tempfile.TemporaryDirectory() as data_dir: + store = FeatureStore( + config=RepoConfig( + project="test_on_demand_python_transformation", + registry=os.path.join(data_dir, "registry.db"), + provider="local", + entity_key_serialization_version=2, + online_store=SqliteOnlineStoreConfig( + path=os.path.join(data_dir, "online.db") + ), + ) + ) + + # Generate test data. + end_date = datetime.now().replace(microsecond=0, second=0, minute=0) + start_date = end_date - timedelta(days=15) + + driver_entities = [1001, 1002, 1003, 1004, 1005] + driver_df = create_driver_hourly_stats_df(driver_entities, start_date, end_date) + driver_stats_path = os.path.join(data_dir, "driver_stats.parquet") + driver_df.to_parquet(path=driver_stats_path, allow_truncated_timestamps=True) + + driver = Entity(name="driver", join_keys=["driver_id"]) + + driver_stats_source = FileSource( + name="driver_hourly_stats_source", + path=driver_stats_path, + timestamp_field="event_timestamp", + created_timestamp_column="created", + ) + + driver_stats_fv = FeatureView( + name="driver_hourly_stats", + entities=[driver], + ttl=timedelta(days=0), + schema=[ + Field(name="conv_rate", dtype=Float32), + Field(name="acc_rate", dtype=Float32), + Field(name="avg_daily_trips", dtype=Int64), + ], + online=True, + source=driver_stats_source, + ) + + request_source = RequestSource( + name="request_source", + schema=[ + Field(name="avg_daily_trip_rank_thresholds", dtype=Array(Int64)), + Field(name="avg_daily_trip_rank_names", dtype=Array(String)), + ], + ) + + @on_demand_feature_view( + sources=[request_source, driver_stats_fv], + schema=[ + Field(name="highest_achieved_rank", dtype=String), + Field(name="avg_daily_trips_plus_one", dtype=Int64), + Field(name="conv_rate_plus_acc", dtype=Float64), + Field(name="is_highest_rank", dtype=Bool), + Field(name="achieved_ranks", dtype=Array(String)), + Field(name="trips_until_next_rank_int", dtype=Array(Int64)), + Field(name="trips_until_next_rank_float", dtype=Array(Float64)), + Field(name="achieved_ranks_mask", dtype=Array(Bool)), + ], + mode="pandas", + ) + def pandas_view(inputs: pd.DataFrame) -> pd.DataFrame: + df = pd.DataFrame() + df["conv_rate_plus_acc"] = inputs["conv_rate"] + inputs["acc_rate"] + df["avg_daily_trips_plus_one"] = inputs["avg_daily_trips"] + 1 + + df["trips_until_next_rank_int"] = inputs[ + ["avg_daily_trips", "avg_daily_trip_rank_thresholds"] + ].apply( + lambda x: [max(threshold - x.iloc[0], 0) for threshold in x.iloc[1]], + axis=1, + ) + df["trips_until_next_rank_float"] = df["trips_until_next_rank_int"].map( + lambda values: [float(value) for value in values] + ) + df["achieved_ranks_mask"] = df["trips_until_next_rank_int"].map( + lambda values: [value <= 0 for value in values] + ) + + temp = pd.concat( + [df[["achieved_ranks_mask"]], inputs[["avg_daily_trip_rank_names"]]], + axis=1, + ) + df["achieved_ranks"] = temp.apply( + lambda x: [ + rank if achieved else "Locked" + for achieved, rank in zip(x.iloc[0], x.iloc[1]) + ], + axis=1, + ) + df["highest_achieved_rank"] = ( + df["achieved_ranks"] + .map( + lambda ranks: str( + ([rank for rank in ranks if rank != "Locked"][-1:] or ["None"])[ + 0 + ] + ) + ) + .astype("string") + ) + df["is_highest_rank"] = df["achieved_ranks"].map( + lambda ranks: ranks[-1] != "Locked" + ) + return df + + store.apply([driver, driver_stats_source, driver_stats_fv, pandas_view]) + + entity_rows = [ + { + "driver_id": 1001, + "avg_daily_trip_rank_thresholds": [100, 250, 500, 1000], + "avg_daily_trip_rank_names": ["Bronze", "Silver", "Gold", "Platinum"], + } + ] + store.write_to_online_store( + feature_view_name="driver_hourly_stats", df=driver_df + ) + + online_response = store.get_online_features( + entity_rows=entity_rows, + features=[ + "driver_hourly_stats:conv_rate", + "driver_hourly_stats:acc_rate", + "driver_hourly_stats:avg_daily_trips", + "pandas_view:avg_daily_trips_plus_one", + "pandas_view:conv_rate_plus_acc", + "pandas_view:trips_until_next_rank_int", + "pandas_view:trips_until_next_rank_float", + "pandas_view:achieved_ranks_mask", + "pandas_view:achieved_ranks", + "pandas_view:highest_achieved_rank", + "pandas_view:is_highest_rank", + ], + ).to_df() + # We use to_df here to ensure we use the pandas backend, but convert to a dict for comparisons + result = online_response.to_dict(orient="records")[0] + + # Type assertions + # Materialized view + assert type(result["conv_rate"]) == float + assert type(result["acc_rate"]) == float + assert type(result["avg_daily_trips"]) == int + # On-demand view + assert type(result["avg_daily_trips_plus_one"]) == int + assert type(result["conv_rate_plus_acc"]) == float + assert type(result["highest_achieved_rank"]) == str + assert type(result["is_highest_rank"]) == bool + + assert type(result["trips_until_next_rank_int"]) == list + assert all([type(e) == int for e in result["trips_until_next_rank_int"]]) + + assert type(result["trips_until_next_rank_float"]) == list + assert all([type(e) == float for e in result["trips_until_next_rank_float"]]) + + assert type(result["achieved_ranks"]) == list + assert all([type(e) == str for e in result["achieved_ranks"]]) + + assert type(result["achieved_ranks_mask"]) == list + assert all([type(e) == bool for e in result["achieved_ranks_mask"]]) + + # Value assertions + expected_trips_until_next_rank = [ + max(threshold - result["avg_daily_trips"], 0) + for threshold in entity_rows[0]["avg_daily_trip_rank_thresholds"] + ] + expected_mask = [value <= 0 for value in expected_trips_until_next_rank] + expected_ranks = [ + rank if achieved else "Locked" + for achieved, rank in zip( + expected_mask, entity_rows[0]["avg_daily_trip_rank_names"] + ) + ] + highest_rank = ( + [rank for rank in expected_ranks if rank != "Locked"][-1:] or ["None"] + )[0] + + assert result["conv_rate_plus_acc"] == result["conv_rate"] + result["acc_rate"] + assert result["avg_daily_trips_plus_one"] == result["avg_daily_trips"] + 1 + assert result["highest_achieved_rank"] == highest_rank + assert result["is_highest_rank"] == (expected_ranks[-1] != "Locked") + + assert result["trips_until_next_rank_int"] == expected_trips_until_next_rank + assert result["trips_until_next_rank_float"] == [ + float(value) for value in expected_trips_until_next_rank + ] + assert result["achieved_ranks_mask"] == expected_mask + assert result["achieved_ranks"] == expected_ranks + + +def test_invalid_pandas_transformation_raises_type_error_on_apply(): + with tempfile.TemporaryDirectory() as data_dir: + store = FeatureStore( + config=RepoConfig( + project="test_on_demand_python_transformation", + registry=os.path.join(data_dir, "registry.db"), + provider="local", + entity_key_serialization_version=2, + online_store=SqliteOnlineStoreConfig( + path=os.path.join(data_dir, "online.db") + ), + ) + ) + + request_source = RequestSource( + name="request_source", + schema=[ + Field(name="driver_name", dtype=String), + ], + ) + + @on_demand_feature_view( + sources=[request_source], + schema=[Field(name="driver_name_lower", dtype=String)], + mode="pandas", + ) + def pandas_view(inputs: pd.DataFrame) -> pd.DataFrame: + return pd.DataFrame({"driver_name_lower": []}) + + with pytest.raises( + TypeError, + match=re.escape( + "Failed to infer type for feature 'driver_name_lower' with value '[]' since no items were returned by the UDF." + ), + ): + store.apply([request_source, pandas_view]) diff --git a/sdk/python/tests/unit/test_on_demand_python_transformation.py b/sdk/python/tests/unit/test_on_demand_python_transformation.py index 72e9b53a10..c5bd68d6a8 100644 --- a/sdk/python/tests/unit/test_on_demand_python_transformation.py +++ b/sdk/python/tests/unit/test_on_demand_python_transformation.py @@ -1,4 +1,5 @@ import os +import re import tempfile import unittest from datetime import datetime, timedelta @@ -7,12 +8,19 @@ import pandas as pd import pytest -from feast import Entity, FeatureStore, FeatureView, FileSource, RepoConfig +from feast import ( + Entity, + FeatureStore, + FeatureView, + FileSource, + RepoConfig, + RequestSource, +) from feast.driver_test_data import create_driver_hourly_stats_df from feast.field import Field from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig from feast.on_demand_feature_view import on_demand_feature_view -from feast.types import Float32, Float64, Int64 +from feast.types import Array, Bool, Float32, Float64, Int64, String class TestOnDemandPythonTransformation(unittest.TestCase): @@ -248,3 +256,234 @@ def test_python_docs_demo(self): + online_python_response["acc_rate"][0] == online_python_response["conv_rate_plus_val2_python"][0] ) + + +class TestOnDemandPythonTransformationAllDataTypes(unittest.TestCase): + def setUp(self): + with tempfile.TemporaryDirectory() as data_dir: + self.store = FeatureStore( + config=RepoConfig( + project="test_on_demand_python_transformation", + registry=os.path.join(data_dir, "registry.db"), + provider="local", + entity_key_serialization_version=2, + online_store=SqliteOnlineStoreConfig( + path=os.path.join(data_dir, "online.db") + ), + ) + ) + + # Generate test data. + end_date = datetime.now().replace(microsecond=0, second=0, minute=0) + start_date = end_date - timedelta(days=15) + + driver_entities = [1001, 1002, 1003, 1004, 1005] + driver_df = create_driver_hourly_stats_df( + driver_entities, start_date, end_date + ) + driver_stats_path = os.path.join(data_dir, "driver_stats.parquet") + driver_df.to_parquet( + path=driver_stats_path, allow_truncated_timestamps=True + ) + + driver = Entity(name="driver", join_keys=["driver_id"]) + + driver_stats_source = FileSource( + name="driver_hourly_stats_source", + path=driver_stats_path, + timestamp_field="event_timestamp", + created_timestamp_column="created", + ) + + driver_stats_fv = FeatureView( + name="driver_hourly_stats", + entities=[driver], + ttl=timedelta(days=0), + schema=[ + Field(name="conv_rate", dtype=Float32), + Field(name="acc_rate", dtype=Float32), + Field(name="avg_daily_trips", dtype=Int64), + ], + online=True, + source=driver_stats_source, + ) + + request_source = RequestSource( + name="request_source", + schema=[ + Field(name="avg_daily_trip_rank_thresholds", dtype=Array(Int64)), + Field(name="avg_daily_trip_rank_names", dtype=Array(String)), + ], + ) + + @on_demand_feature_view( + sources=[request_source, driver_stats_fv], + schema=[ + Field(name="highest_achieved_rank", dtype=String), + Field(name="avg_daily_trips_plus_one", dtype=Int64), + Field(name="conv_rate_plus_acc", dtype=Float64), + Field(name="is_highest_rank", dtype=Bool), + Field(name="achieved_ranks", dtype=Array(String)), + Field(name="trips_until_next_rank_int", dtype=Array(Int64)), + Field(name="trips_until_next_rank_float", dtype=Array(Float64)), + Field(name="achieved_ranks_mask", dtype=Array(Bool)), + ], + mode="python", + ) + def python_view(inputs: dict[str, Any]) -> dict[str, Any]: + output = {} + trips_until_next_rank = [ + [max(threshold - row[1], 0) for threshold in row[0]] + for row in zip( + inputs["avg_daily_trip_rank_thresholds"], + inputs["avg_daily_trips"], + ) + ] + mask = [[value <= 0 for value in row] for row in trips_until_next_rank] + ranks = [ + [rank if mask else "Locked" for mask, rank in zip(*row)] + for row in zip(mask, inputs["avg_daily_trip_rank_names"]) + ] + highest_rank = [ + ([rank for rank in row if rank != "Locked"][-1:] or ["None"])[0] + for row in ranks + ] + + output["conv_rate_plus_acc"] = [ + sum(row) for row in zip(inputs["conv_rate"], inputs["acc_rate"]) + ] + output["avg_daily_trips_plus_one"] = [ + row + 1 for row in inputs["avg_daily_trips"] + ] + output["highest_achieved_rank"] = highest_rank + output["is_highest_rank"] = [row[-1] != "Locked" for row in ranks] + + output["trips_until_next_rank_int"] = trips_until_next_rank + output["trips_until_next_rank_float"] = [ + [float(value) for value in row] for row in trips_until_next_rank + ] + output["achieved_ranks_mask"] = mask + output["achieved_ranks"] = ranks + return output + + self.store.apply( + [driver, driver_stats_source, driver_stats_fv, python_view] + ) + self.store.write_to_online_store( + feature_view_name="driver_hourly_stats", df=driver_df + ) + + def test_python_transformation_returning_all_data_types(self): + entity_rows = [ + { + "driver_id": 1001, + "avg_daily_trip_rank_thresholds": [100, 250, 500, 1000], + "avg_daily_trip_rank_names": ["Bronze", "Silver", "Gold", "Platinum"], + } + ] + online_response = self.store.get_online_features( + entity_rows=entity_rows, + features=[ + "driver_hourly_stats:conv_rate", + "driver_hourly_stats:acc_rate", + "driver_hourly_stats:avg_daily_trips", + "python_view:avg_daily_trips_plus_one", + "python_view:conv_rate_plus_acc", + "python_view:trips_until_next_rank_int", + "python_view:trips_until_next_rank_float", + "python_view:achieved_ranks_mask", + "python_view:achieved_ranks", + "python_view:highest_achieved_rank", + "python_view:is_highest_rank", + ], + ).to_dict() + result = {name: value[0] for name, value in online_response.items()} + + # Type assertions + # Materialized view + assert type(result["conv_rate"]) == float + assert type(result["acc_rate"]) == float + assert type(result["avg_daily_trips"]) == int + # On-demand view + assert type(result["avg_daily_trips_plus_one"]) == int + assert type(result["conv_rate_plus_acc"]) == float + assert type(result["highest_achieved_rank"]) == str + assert type(result["is_highest_rank"]) == bool + + assert type(result["trips_until_next_rank_int"]) == list + assert all([type(e) == int for e in result["trips_until_next_rank_int"]]) + + assert type(result["trips_until_next_rank_float"]) == list + assert all([type(e) == float for e in result["trips_until_next_rank_float"]]) + + assert type(result["achieved_ranks"]) == list + assert all([type(e) == str for e in result["achieved_ranks"]]) + + assert type(result["achieved_ranks_mask"]) == list + assert all([type(e) == bool for e in result["achieved_ranks_mask"]]) + + # Value assertions + expected_trips_until_next_rank = [ + max(threshold - result["avg_daily_trips"], 0) + for threshold in entity_rows[0]["avg_daily_trip_rank_thresholds"] + ] + expected_mask = [value <= 0 for value in expected_trips_until_next_rank] + expected_ranks = [ + rank if achieved else "Locked" + for achieved, rank in zip( + expected_mask, entity_rows[0]["avg_daily_trip_rank_names"] + ) + ] + highest_rank = ( + [rank for rank in expected_ranks if rank != "Locked"][-1:] or ["None"] + )[0] + + assert result["conv_rate_plus_acc"] == result["conv_rate"] + result["acc_rate"] + assert result["avg_daily_trips_plus_one"] == result["avg_daily_trips"] + 1 + assert result["highest_achieved_rank"] == highest_rank + assert result["is_highest_rank"] == (expected_ranks[-1] != "Locked") + + assert result["trips_until_next_rank_int"] == expected_trips_until_next_rank + assert result["trips_until_next_rank_float"] == [ + float(value) for value in expected_trips_until_next_rank + ] + assert result["achieved_ranks_mask"] == expected_mask + assert result["achieved_ranks"] == expected_ranks + + +def test_invalid_python_transformation_raises_type_error_on_apply(): + with tempfile.TemporaryDirectory() as data_dir: + store = FeatureStore( + config=RepoConfig( + project="test_on_demand_python_transformation", + registry=os.path.join(data_dir, "registry.db"), + provider="local", + entity_key_serialization_version=2, + online_store=SqliteOnlineStoreConfig( + path=os.path.join(data_dir, "online.db") + ), + ) + ) + + request_source = RequestSource( + name="request_source", + schema=[ + Field(name="driver_name", dtype=String), + ], + ) + + @on_demand_feature_view( + sources=[request_source], + schema=[Field(name="driver_name_lower", dtype=String)], + mode="python", + ) + def python_view(inputs: dict[str, Any]) -> dict[str, Any]: + return {"driver_name_lower": []} + + with pytest.raises( + TypeError, + match=re.escape( + "Failed to infer type for feature 'driver_name_lower' with value '[]' since no items were returned by the UDF." + ), + ): + store.apply([request_source, python_view])