fix: FeatureView serialization with cycle detection (#5502) · feast-dev/feast@f287ca5

@@ -13,16 +13,39 @@ This system builds and executes DAGs (Directed Acyclic Graphs) of typed operatio

13131414

## 🧠 Core Concepts

151516-

| Component | Description |

17-

|--------------------|----------------------------------------------------------------------|

18-

| `ComputeEngine` | Interface for executing materialization and retrieval tasks |

19-

| `FeatureBuilder` | Constructs a DAG from Feature View definition for a specific backend |

20-

| `DAGNode` | Represents a logical operation (read, aggregate, join, etc.) |

21-

| `ExecutionPlan` | Executes nodes in dependency order and stores intermediate outputs |

22-

| `ExecutionContext` | Holds config, registry, stores, entity data, and node outputs |

16+

| Component | Description | API |

17+

|--------------------|----------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------|

18+

| `ComputeEngine` | Interface for executing materialization and retrieval tasks | [link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/base.py) |

19+

| `FeatureBuilder` | Constructs a DAG from Feature View definition for a specific backend | [link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/feature_builder.py) |

20+

| `FeatureResolver` | Resolves feature DAG by topological order for execution | [link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/feature_resolver.py) |

21+

| `DAG` | Represents a logical DAG operation (read, aggregate, join, etc.) | [link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/dag/README.md) |

22+

| `ExecutionPlan` | Executes nodes in dependency order and stores intermediate outputs | [link]([link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/dag/README.md)) |

23+

| `ExecutionContext` | Holds config, registry, stores, entity data, and node outputs | [link]([link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/dag/README.md)) |

23242425

---

252627+

## Feature resolver and builder

28+

The `FeatureBuilder` initializes a `FeatureResolver` that extracts a DAG from the `FeatureView` definitions, resolving dependencies and ensuring the correct execution order. \

29+

The FeatureView represents a logical data source, while DataSource represents the physical data source (e.g., BigQuery, Spark, etc.). \

30+

When defining a `FeatureView`, the source can be a physical `DataSource`, a derived `FeatureView`, or a list of `FeatureViews`.

31+

The FeatureResolver walks through the FeatureView sources, and topologically sorts the DAG nodes based on dependencies, and returns a head node that represents the final output of the DAG. \

32+

Subsequently, the `FeatureBuilder` builds the DAG nodes from the resolved head node, creating a `DAGNode` for each operation (read, join, filter, aggregate, etc.).

33+

An example of built output from FeatureBuilder:

34+

```markdown

35+

- Output(Agg(daily_driver_stats))

36+

- Agg(daily_driver_stats)

37+

- Filter(daily_driver_stats)

38+

- Transform(daily_driver_stats)

39+

- Agg(hourly_driver_stats)

40+

- Filter(hourly_driver_stats)

41+

- Transform(hourly_driver_stats)

42+

- Source(hourly_driver_stats)

43+

```

44+45+

## Diagram

46+

![feature_dag.png](feature_dag.png)

47+48+2649

## ✨ Available Engines

27502851

### 🔥 SparkComputeEngine

@@ -44,7 +67,7 @@ This system builds and executes DAGs (Directed Acyclic Graphs) of typed operatio

4467

SourceReadNode

4568

|

4669

v

47-

JoinNode (Only for get_historical_features with entity df)

70+

TransformationNode (If feature_transformation is defined) | JoinNode (default behavior for multiple sources)

4871

|

4972

v

5073

FilterNode (Always included; applies TTL or user-defined filters)

@@ -56,9 +79,6 @@ AggregationNode (If aggregations are defined in FeatureView)

5679

DeduplicationNode (If no aggregation is defined for get_historical_features)

5780

|

5881

v

59-

TransformationNode (If feature_transformation is defined)

60-

|

61-

v

6282

ValidationNode (If enable_validation = True)

6383

|

6484

v

@@ -79,20 +99,54 @@ To create your own compute engine:

799980100

```python

81101

from feast.infra.compute_engines.base import ComputeEngine

82-

from feast.infra.materialization.batch_materialization_engine import MaterializationTask, MaterializationJob

83-

from feast.infra.compute_engines.tasks import HistoricalRetrievalTask

102+

from typing import Sequence, Union

103+

from feast.batch_feature_view import BatchFeatureView

104+

from feast.entity import Entity

105+

from feast.feature_view import FeatureView

106+

from feast.infra.common.materialization_job import (

107+

MaterializationJob,

108+

MaterializationTask,

109+

)

110+

from feast.infra.common.retrieval_task import HistoricalRetrievalTask

111+

from feast.infra.offline_stores.offline_store import RetrievalJob

112+

from feast.infra.registry.base_registry import BaseRegistry

113+

from feast.on_demand_feature_view import OnDemandFeatureView

114+

from feast.stream_feature_view import StreamFeatureView

115+116+84117

class MyComputeEngine(ComputeEngine):

85-

def materialize(self, task: MaterializationTask) -> MaterializationJob:

118+

def update(

119+

self,

120+

project: str,

121+

views_to_delete: Sequence[

122+

Union[BatchFeatureView, StreamFeatureView, FeatureView]

123+

],

124+

views_to_keep: Sequence[

125+

Union[BatchFeatureView, StreamFeatureView, FeatureView, OnDemandFeatureView]

126+

],

127+

entities_to_delete: Sequence[Entity],

128+

entities_to_keep: Sequence[Entity],

129+

):

130+

...

131+132+

def _materialize_one(

133+

self,

134+

registry: BaseRegistry,

135+

task: MaterializationTask,

136+

**kwargs,

137+

) -> MaterializationJob:

86138

...

8713988140

def get_historical_features(self, task: HistoricalRetrievalTask) -> RetrievalJob:

89141

...

142+90143

```

9114492145

2. Create a FeatureBuilder

93146

```python

94147

from feast.infra.compute_engines.feature_builder import FeatureBuilder

95148149+96150

class CustomFeatureBuilder(FeatureBuilder):

97151

def build_source_node(self): ...

98152

def build_aggregation_node(self, input_node): ...

@@ -101,6 +155,7 @@ class CustomFeatureBuilder(FeatureBuilder):

101155

def build_dedup_node(self, input_node):

102156

def build_transformation_node(self, input_node): ...

103157

def build_output_nodes(self, input_node): ...

158+

def build_validation_node(self, input_node): ...

104159

```

105160106161

3. Define DAGNode subclasses

@@ -114,7 +169,7 @@ class CustomFeatureBuilder(FeatureBuilder):

114169

## 🚧 Roadmap

115170

- [x] Modular, backend-agnostic DAG execution framework

116171

- [x] Spark engine with native support for materialization + PIT joins

117-

- [ ] PyArrow + Pandas engine for local compute

118-

- [ ] Native multi-feature-view DAG optimization

172+

- [x] PyArrow + Pandas engine for local compute

173+

- [x] Native multi-feature-view DAG optimization

119174

- [ ] DAG validation, metrics, and debug output

120175

- [ ] Scalable distributed backend via Ray or Polars