Skip to content

Commit

Permalink
testing changes
Browse files Browse the repository at this point in the history
Signed-off-by: Francisco Javier Arceo <[email protected]>
  • Loading branch information
franciscojavierarceo committed Sep 28, 2024
1 parent cc08644 commit 97fc1ec
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 67 deletions.
14 changes: 9 additions & 5 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -889,12 +889,14 @@ def apply(
if isinstance(fv, FeatureView):
data_sources_set_to_update.add(fv.batch_source)
if isinstance(fv, StreamFeatureView):
data_sources_set_to_update.add(fv.stream_source)
if fv.stream_source:
data_sources_set_to_update.add(fv.stream_source)
if isinstance(fv, OnDemandFeatureView):
for source_fvp in fv.source_feature_view_projections:
data_sources_set_to_update.add(
fv.source_feature_view_projections[source_fvp].batch_source
)
if fv.source_feature_view_projections[source_fvp].batch_source:
data_sources_set_to_update.add(
fv.source_feature_view_projections[source_fvp].batch_source
)
else:
pass

Expand All @@ -909,7 +911,9 @@ def apply(

# Validate all feature views and make inferences.
self._validate_all_feature_views(
views_to_update, odfvs_to_update, sfvs_to_update,
views_to_update,
odfvs_to_update,
sfvs_to_update,
)
self._make_inferences(
data_sources_to_update,
Expand Down
59 changes: 31 additions & 28 deletions sdk/python/feast/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,41 +296,44 @@ def _infer_on_demand_features_and_entities(
table_column_names_and_types = batch_source.get_table_column_names_and_types(
config
)
for col_name, col_datatype in table_column_names_and_types:
if col_name in columns_to_exclude:
continue
elif col_name in join_keys:
field = Field(
name=col_name,
dtype=from_value_type(
batch_source.source_datatype_to_feast_value_type()(col_datatype)
),
)
if field.name not in [
entity_column.name
for entity_column in entity_columns
if hasattr(entity_column, "name")
]:
entity_columns.append(field)
elif not re.match(
"^__|__$", col_name
): # double underscores often signal an internal-use column
if run_inference_for_features:
feature_name = (
batch_field_mapping[col_name]
if col_name in batch_field_mapping
else col_name
)
if batch_field_mapping:
for col_name, col_datatype in table_column_names_and_types:
if col_name in columns_to_exclude:
continue
elif col_name in join_keys:
field = Field(
name=feature_name,
name=col_name,
dtype=from_value_type(
batch_source.source_datatype_to_feast_value_type()(
col_datatype
)
),
)
if field.name not in [
feature.name for feature in source_feature_view.features
entity_column.name
for entity_column in entity_columns
if hasattr(entity_column, "name")
]:
source_feature_view.features.append(field)
entity_columns.append(field)
elif not re.match(
"^__|__$", col_name
): # double underscores often signal an internal-use column
if run_inference_for_features:
feature_name = (
batch_field_mapping[col_name]
if col_name in batch_field_mapping
else col_name
)
field = Field(
name=feature_name,
dtype=from_value_type(
batch_source.source_datatype_to_feast_value_type()(
col_datatype
)
),
)
if field.name not in [
feature.name for feature in source_feature_view.features
]:
source_feature_view.features.append(field)
fv.entity_columns = entity_columns
44 changes: 10 additions & 34 deletions sdk/python/feast/infra/online_stores/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from pathlib import Path
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple, Union

import pandas as pd
from google.protobuf.internal.containers import RepeatedScalarFieldContainer
from pydantic import StrictStr

Expand Down Expand Up @@ -176,40 +175,18 @@ def online_write_batch(
),
)

try:
conn.execute(
f"""INSERT OR IGNORE INTO {table_name}
conn.execute(
f"""INSERT OR IGNORE INTO {table_name}
(entity_key, feature_name, value, event_ts, created_ts)
VALUES (?, ?, ?, ?, ?)""",
(
entity_key_bin,
feature_name,
val.SerializeToString(),
timestamp,
created_ts,
),
)
except Exception as e:
# print(
# f"error writing online batch for {table_name} - {feature_name} = {val}\n {e}"
# )
print(
f'querying all records for table: {conn.execute(f"select * from {table_name}").fetchall()}'
)
def get_table_data(conn):
x = conn.execute(f"select * from sqlite_master").fetchall()
y = conn.execute(f"select * from sqlite_master")
names = list(map(lambda x: x[0], y.description))
return pd.DataFrame(x, columns=names)

df = get_table_data(conn)
tmp = [ conn.execute(f"select count(*) from {table_name}").fetchall() for table_name in df['name'].values if table_name not in ['sqlite_autoindex_test_on_demand_python_transformation_driver_hourly_stats_1', 'test_on_demand_python_transformation_driver_hourly_stats_ek', 'sqlite_autoindex_test_on_demand_python_transformation_python_stored_writes_feature_view_1', 'test_on_demand_python_transformation_python_stored_writes_feature_view_ek']]
print(tmp)

r = conn.execute("""
SELECT * FROM sqlite_master WHERE type='table' and name = 'test_on_demand_python_transformation_python_stored_writes_feature_view';
""")
print(f"table exists: {r.fetchall()}")
(
entity_key_bin,
feature_name,
val.SerializeToString(),
timestamp,
created_ts,
),
)
if progress:
progress(1)

Expand Down Expand Up @@ -276,7 +253,6 @@ def update(
project = config.project

for table in tables_to_keep:
print(f"updating {_table_id(project, table)}")
conn.execute(
f"CREATE TABLE IF NOT EXISTS {_table_id(project, table)} (entity_key BLOB, feature_name TEXT, value BLOB, vector_value BLOB, event_ts timestamp, created_ts timestamp, PRIMARY KEY(entity_key, feature_name))"
)
Expand Down
1 change: 1 addition & 0 deletions sdk/python/feast/on_demand_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ def to_proto(self) -> OnDemandFeatureViewProto:
owner=self.owner,
write_to_online_store=self.write_to_online_store,
)
print("*" * 40, "\n", spec)

return OnDemandFeatureViewProto(spec=spec, meta=meta)

Expand Down
10 changes: 10 additions & 0 deletions sdk/python/tests/unit/test_on_demand_python_transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,9 @@ def python_stored_writes_feature_view(
print("running odfv transform")
return output

assert python_stored_writes_feature_view.entities == [driver.name]
assert python_stored_writes_feature_view.entity_columns == []

self.store.apply(
[
driver,
Expand All @@ -605,9 +608,16 @@ def python_stored_writes_feature_view(
]
)
fv_applied = self.store.get_feature_view("driver_hourly_stats")
odfv_applied = self.store.get_on_demand_feature_view(
"python_stored_writes_feature_view"
)

assert fv_applied.entities == [driver.name]
assert odfv_applied.entites == [driver.name]

# Note here that after apply() is called, the entity_columns are populated with the join_key
assert fv_applied.entity_columns[0].name == driver.join_key
assert odfv_applied.entity_columns[0].name == driver.join_key

self.store.write_to_online_store(
feature_view_name="driver_hourly_stats", df=driver_df
Expand Down

0 comments on commit 97fc1ec

Please sign in to comment.