fix: FeatureView serialization with cycle detection (#5502) · feast-dev/feast@f287ca5
@@ -13,16 +13,39 @@ This system builds and executes DAGs (Directed Acyclic Graphs) of typed operatio
13131414## 🧠 Core Concepts
151516-| Component | Description |
17-|--------------------|----------------------------------------------------------------------|
18-| `ComputeEngine` | Interface for executing materialization and retrieval tasks |
19-| `FeatureBuilder` | Constructs a DAG from Feature View definition for a specific backend |
20-| `DAGNode` | Represents a logical operation (read, aggregate, join, etc.) |
21-| `ExecutionPlan` | Executes nodes in dependency order and stores intermediate outputs |
22-| `ExecutionContext` | Holds config, registry, stores, entity data, and node outputs |
16+| Component | Description | API |
17+|--------------------|----------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------|
18+| `ComputeEngine` | Interface for executing materialization and retrieval tasks | [link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/base.py) |
19+| `FeatureBuilder` | Constructs a DAG from Feature View definition for a specific backend | [link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/feature_builder.py) |
20+| `FeatureResolver` | Resolves feature DAG by topological order for execution | [link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/feature_resolver.py) |
21+| `DAG` | Represents a logical DAG operation (read, aggregate, join, etc.) | [link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/dag/README.md) |
22+| `ExecutionPlan` | Executes nodes in dependency order and stores intermediate outputs | [link]([link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/dag/README.md)) |
23+| `ExecutionContext` | Holds config, registry, stores, entity data, and node outputs | [link]([link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/dag/README.md)) |
23242425---
252627+## Feature resolver and builder
28+The `FeatureBuilder` initializes a `FeatureResolver` that extracts a DAG from the `FeatureView` definitions, resolving dependencies and ensuring the correct execution order. \
29+The FeatureView represents a logical data source, while DataSource represents the physical data source (e.g., BigQuery, Spark, etc.). \
30+When defining a `FeatureView`, the source can be a physical `DataSource`, a derived `FeatureView`, or a list of `FeatureViews`.
31+The FeatureResolver walks through the FeatureView sources, and topologically sorts the DAG nodes based on dependencies, and returns a head node that represents the final output of the DAG. \
32+Subsequently, the `FeatureBuilder` builds the DAG nodes from the resolved head node, creating a `DAGNode` for each operation (read, join, filter, aggregate, etc.).
33+An example of built output from FeatureBuilder:
34+```markdown
35+- Output(Agg(daily_driver_stats))
36+- Agg(daily_driver_stats)
37+- Filter(daily_driver_stats)
38+- Transform(daily_driver_stats)
39+- Agg(hourly_driver_stats)
40+- Filter(hourly_driver_stats)
41+- Transform(hourly_driver_stats)
42+- Source(hourly_driver_stats)
43+```
44+45+## Diagram
46+
47+48+2649## ✨ Available Engines
27502851### 🔥 SparkComputeEngine
@@ -44,7 +67,7 @@ This system builds and executes DAGs (Directed Acyclic Graphs) of typed operatio
4467SourceReadNode
4568 |
4669 v
47-JoinNode (Only for get_historical_features with entity df)
70+TransformationNode (If feature_transformation is defined) | JoinNode (default behavior for multiple sources)
4871 |
4972 v
5073FilterNode (Always included; applies TTL or user-defined filters)
@@ -56,9 +79,6 @@ AggregationNode (If aggregations are defined in FeatureView)
5679DeduplicationNode (If no aggregation is defined for get_historical_features)
5780 |
5881 v
59-TransformationNode (If feature_transformation is defined)
60- |
61- v
6282ValidationNode (If enable_validation = True)
6383 |
6484 v
@@ -79,20 +99,54 @@ To create your own compute engine:
799980100```python
81101from feast.infra.compute_engines.base import ComputeEngine
82-from feast.infra.materialization.batch_materialization_engine import MaterializationTask, MaterializationJob
83-from feast.infra.compute_engines.tasks import HistoricalRetrievalTask
102+from typing import Sequence, Union
103+from feast.batch_feature_view import BatchFeatureView
104+from feast.entity import Entity
105+from feast.feature_view import FeatureView
106+from feast.infra.common.materialization_job import (
107+ MaterializationJob,
108+ MaterializationTask,
109+)
110+from feast.infra.common.retrieval_task import HistoricalRetrievalTask
111+from feast.infra.offline_stores.offline_store import RetrievalJob
112+from feast.infra.registry.base_registry import BaseRegistry
113+from feast.on_demand_feature_view import OnDemandFeatureView
114+from feast.stream_feature_view import StreamFeatureView
115+116+84117class MyComputeEngine(ComputeEngine):
85-def materialize(self, task: MaterializationTask) -> MaterializationJob:
118+def update(
119+self,
120+project: str,
121+views_to_delete: Sequence[
122+ Union[BatchFeatureView, StreamFeatureView, FeatureView]
123+ ],
124+views_to_keep: Sequence[
125+ Union[BatchFeatureView, StreamFeatureView, FeatureView, OnDemandFeatureView]
126+ ],
127+entities_to_delete: Sequence[Entity],
128+entities_to_keep: Sequence[Entity],
129+ ):
130+...
131+132+def _materialize_one(
133+self,
134+registry: BaseRegistry,
135+task: MaterializationTask,
136+**kwargs,
137+ ) -> MaterializationJob:
86138...
8713988140def get_historical_features(self, task: HistoricalRetrievalTask) -> RetrievalJob:
89141...
142+90143```
91144921452. Create a FeatureBuilder
93146```python
94147from feast.infra.compute_engines.feature_builder import FeatureBuilder
95148149+96150class CustomFeatureBuilder(FeatureBuilder):
97151def build_source_node(self): ...
98152def build_aggregation_node(self, input_node): ...
@@ -101,6 +155,7 @@ class CustomFeatureBuilder(FeatureBuilder):
101155def build_dedup_node(self, input_node):
102156def build_transformation_node(self, input_node): ...
103157def build_output_nodes(self, input_node): ...
158+def build_validation_node(self, input_node): ...
104159```
1051601061613. Define DAGNode subclasses
@@ -114,7 +169,7 @@ class CustomFeatureBuilder(FeatureBuilder):
114169## 🚧 Roadmap
115170- [x] Modular, backend-agnostic DAG execution framework
116171- [x] Spark engine with native support for materialization + PIT joins
117-- [ ] PyArrow + Pandas engine for local compute
118-- [ ] Native multi-feature-view DAG optimization
172+- [x] PyArrow + Pandas engine for local compute
173+- [x] Native multi-feature-view DAG optimization
119174- [ ] DAG validation, metrics, and debug output
120175- [ ] Scalable distributed backend via Ray or Polars