π§ ComputeEngine (WIP)
The ComputeEngine is Feastβs pluggable abstraction for executing feature pipelines β including transformations, aggregations, joins, and materializations/get_historical_features β on a backend of your choice (e.g., Spark, PyArrow, Pandas, Ray).
It powers both:
materialize()β for batch and stream generation of features to offline/online storesget_historical_features()β for point-in-time correct training dataset retrieval
This system builds and executes DAGs (Directed Acyclic Graphs) of typed operations, enabling modular and scalable workflows.
π§ Core Concepts
| Component | Description | API |
|---|---|---|
ComputeEngine |
Interface for executing materialization and retrieval tasks | link |
FeatureBuilder |
Constructs a DAG from Feature View definition for a specific backend | link |
FeatureResolver |
Resolves feature DAG by topological order for execution | link |
DAG |
Represents a logical DAG operation (read, aggregate, join, etc.) | link |
ExecutionPlan |
Executes nodes in dependency order and stores intermediate outputs | link |
ExecutionContext |
Holds config, registry, stores, entity data, and node outputs | link |
Feature resolver and builder
The FeatureBuilder initializes a FeatureResolver that extracts a DAG from the FeatureView definitions, resolving dependencies and ensuring the correct execution order.
The FeatureView represents a logical data source, while DataSource represents the physical data source (e.g., BigQuery, Spark, etc.).
When defining a FeatureView, the source can be a physical DataSource, a derived FeatureView, or a list of FeatureViews.
The FeatureResolver walks through the FeatureView sources, and topologically sorts the DAG nodes based on dependencies, and returns a head node that represents the final output of the DAG.
Subsequently, the FeatureBuilder builds the DAG nodes from the resolved head node, creating a DAGNode for each operation (read, join, filter, aggregate, etc.).
An example of built output from FeatureBuilder:
- Output(Agg(daily_driver_stats)) - Agg(daily_driver_stats) - Filter(daily_driver_stats) - Transform(daily_driver_stats) - Agg(hourly_driver_stats) - Filter(hourly_driver_stats) - Transform(hourly_driver_stats) - Source(hourly_driver_stats)
Diagram
β¨ Available Engines
π₯ SparkComputeEngine
{% page-ref page="spark.md" %}
- Distributed DAG execution via Apache Spark
- Supports point-in-time joins and large-scale materialization
- Integrates with
SparkOfflineStoreandSparkMaterializationJob
β‘ RayComputeEngine (contrib)
- Distributed DAG execution via Ray
- Intelligent join strategies (broadcast vs distributed)
- Automatic resource management and optimization
- Integrates with
RayOfflineStoreandRayMaterializationJob - See Ray Compute Engine documentation for details
π§ͺ LocalComputeEngine
{% page-ref page="local.md" %}
- Runs on Arrow + Specified backend (e.g., Pandas, Polars)
- Designed for local dev, testing, or lightweight feature generation
- Supports
LocalMaterializationJobandLocalHistoricalRetrievalJob
π§ SnowflakeComputeEngine
- Runs entirely in Snowflake
- Supports Snowflake SQL for feature transformations and aggregations
- Integrates with
SnowflakeOfflineStoreandSnowflakeMaterializationJob
{% page-ref page="snowflake.md" %}
LambdaComputeEngine
{% page-ref page="lambda.md" %}
π οΈ Feature Builder Flow
SourceReadNode
|
v
TransformationNode (If feature_transformation is defined) | JoinNode (default behavior for multiple sources)
|
v
FilterNode (Always included; applies TTL or user-defined filters)
|
v
AggregationNode (If aggregations are defined in FeatureView)
|
v
DeduplicationNode (If no aggregation is defined for get_historical_features)
|
v
ValidationNode (If enable_validation = True)
|
v
Output
βββ> RetrievalOutput (For get_historical_features)
βββ> OnlineStoreWrite / OfflineStoreWrite (For materialize)Each step is implemented as a DAGNode. An ExecutionPlan executes these nodes in topological order, caching DAGValue outputs.
π§© Implementing a Custom Compute Engine
To create your own compute engine:
- Implement the interface
from feast.infra.compute_engines.base import ComputeEngine from typing import Sequence, Union from feast.batch_feature_view import BatchFeatureView from feast.entity import Entity from feast.feature_view import FeatureView from feast.infra.common.materialization_job import ( MaterializationJob, MaterializationTask, ) from feast.infra.common.retrieval_task import HistoricalRetrievalTask from feast.infra.offline_stores.offline_store import RetrievalJob from feast.infra.registry.base_registry import BaseRegistry from feast.on_demand_feature_view import OnDemandFeatureView from feast.stream_feature_view import StreamFeatureView class MyComputeEngine(ComputeEngine): def update( self, project: str, views_to_delete: Sequence[ Union[BatchFeatureView, StreamFeatureView, FeatureView] ], views_to_keep: Sequence[ Union[BatchFeatureView, StreamFeatureView, FeatureView, OnDemandFeatureView] ], entities_to_delete: Sequence[Entity], entities_to_keep: Sequence[Entity], ): ... def _materialize_one( self, registry: BaseRegistry, task: MaterializationTask, **kwargs, ) -> MaterializationJob: ... def get_historical_features(self, task: HistoricalRetrievalTask) -> RetrievalJob: ...
- Create a FeatureBuilder
from feast.infra.compute_engines.feature_builder import FeatureBuilder class CustomFeatureBuilder(FeatureBuilder): def build_source_node(self): ... def build_aggregation_node(self, input_node): ... def build_join_node(self, input_node): ... def build_filter_node(self, input_node): def build_dedup_node(self, input_node): def build_transformation_node(self, input_node): ... def build_output_nodes(self, input_node): ... def build_validation_node(self, input_node): ...
-
Define DAGNode subclasses
- ReadNode, AggregationNode, JoinNode, WriteNode, etc.
- Each DAGNode.execute(context) -> DAGValue
-
Return an ExecutionPlan
- ExecutionPlan stores DAG nodes in topological order
- Automatically handles intermediate value caching
π§ Roadmap
- Modular, backend-agnostic DAG execution framework
- Spark engine with native support for materialization + PIT joins
- PyArrow + Pandas engine for local compute
- Native multi-feature-view DAG optimization
- DAG validation, metrics, and debug output
- Scalable distributed backend via Ray or Polars
