Ray Compute Engine (contrib)
The Ray compute engine is a distributed compute implementation that leverages Ray for executing feature pipelines including transformations, aggregations, joins, and materializations. It provides scalable and efficient distributed processing for both materialize() and get_historical_features() operations.
Quick Start with Ray Template
Ray RAG Template - Batch Embedding at Scale
For RAG (Retrieval-Augmented Generation) applications with distributed embedding generation:
feast init -t ray_rag my_rag_project
cd my_rag_project/feature_repoThe Ray RAG template demonstrates:
- Parallel Embedding Generation: Uses Ray compute engine to generate embeddings across multiple workers
- Vector Search Integration: Works with Milvus for semantic similarity search
- Complete RAG Pipeline: Data → Embeddings → Search workflow
The Ray compute engine automatically distributes the embedding generation across available workers, making it ideal for processing large datasets efficiently.
Overview
The Ray compute engine provides:
- Distributed DAG Execution: Executes feature computation DAGs across Ray clusters
- Intelligent Join Strategies: Automatic selection between broadcast and distributed joins
- Lazy Evaluation: Deferred execution for optimal performance
- Resource Management: Automatic scaling and resource optimization
- Point-in-Time Joins: Efficient temporal joins for historical feature retrieval
Architecture
The Ray compute engine follows Feast's DAG-based architecture:
EntityDF → RayReadNode → RayJoinNode → RayFilterNode → RayAggregationNode → RayTransformationNode → Output
Core Components
| Component | Description |
|---|---|
RayComputeEngine |
Main engine implementing ComputeEngine interface |
RayFeatureBuilder |
Constructs DAG from Feature View definitions |
RayDAGNode |
Ray-specific DAG node implementations |
RayDAGRetrievalJob |
Executes retrieval plans and returns results |
RayMaterializationJob |
Handles materialization job tracking |
Configuration
Configure the Ray compute engine in your feature_store.yaml:
project: my_project registry: data/registry.db provider: local offline_store: type: ray storage_path: data/ray_storage batch_engine: type: ray.engine max_workers: 4 # Optional: Maximum number of workers enable_optimization: true # Optional: Enable performance optimizations broadcast_join_threshold_mb: 100 # Optional: Broadcast join threshold (MB) max_parallelism_multiplier: 2 # Optional: Parallelism multiplier target_partition_size_mb: 64 # Optional: Target partition size (MB) window_size_for_joins: "1H" # Optional: Time window for distributed joins ray_address: localhost:10001 # Optional: Ray cluster address
Configuration Options
| Option | Type | Default | Description |
|---|---|---|---|
type |
string | "ray.engine" |
Must be ray.engine |
max_workers |
int | None (uses all cores) | Maximum number of Ray workers |
enable_optimization |
boolean | true | Enable performance optimizations |
broadcast_join_threshold_mb |
int | 100 | Size threshold for broadcast joins (MB) |
max_parallelism_multiplier |
int | 2 | Parallelism as multiple of CPU cores |
target_partition_size_mb |
int | 64 | Target partition size (MB) |
window_size_for_joins |
string | "1H" | Time window for distributed joins |
ray_address |
string | None | Ray cluster address (triggers REMOTE mode) |
use_kuberay |
boolean | None | Enable KubeRay mode (overrides ray_address) |
kuberay_conf |
dict | None | KubeRay configuration dict with keys: cluster_name (required), namespace (default: "default"), auth_token, auth_server, skip_tls (default: false) |
enable_ray_logging |
boolean | false | Enable Ray progress bars and logging |
enable_distributed_joins |
boolean | true | Enable distributed joins for large datasets |
staging_location |
string | None | Remote path for batch materialization jobs |
ray_conf |
dict | None | Ray configuration parameters (memory, CPU limits) |
Mode Detection Precedence
The Ray compute engine automatically detects the execution mode:
- Environment Variables → KubeRay mode (if
FEAST_RAY_USE_KUBERAY=true) - Config
kuberay_conf→ KubeRay mode - Config
ray_address→ Remote mode - Default → Local mode
Usage Examples
Basic Historical Feature Retrieval
from feast import FeatureStore import pandas as pd from datetime import datetime # Initialize feature store with Ray compute engine store = FeatureStore("feature_store.yaml") # Create entity DataFrame entity_df = pd.DataFrame({ "driver_id": [1, 2, 3, 4, 5], "event_timestamp": [datetime.now()] * 5 }) # Get historical features using Ray compute engine features = store.get_historical_features( entity_df=entity_df, features=[ "driver_stats:avg_daily_trips", "driver_stats:total_distance" ] ) # Convert to DataFrame df = features.to_df() print(f"Retrieved {len(df)} rows with {len(df.columns)} columns")
Batch Materialization
from datetime import datetime, timedelta # Materialize features using Ray compute engine store.materialize( start_date=datetime.now() - timedelta(days=7), end_date=datetime.now(), feature_views=["driver_stats", "customer_stats"] ) # The Ray compute engine handles: # - Distributed data processing # - Optimal join strategies # - Resource management # - Progress tracking
Large-Scale Feature Retrieval
# Handle large entity datasets efficiently large_entity_df = pd.DataFrame({ "driver_id": range(1, 1000000), # 1M entities "event_timestamp": [datetime.now()] * 1000000 }) # Ray compute engine automatically: # - Partitions data optimally # - Selects appropriate join strategies # - Distributes computation across cluster features = store.get_historical_features( entity_df=large_entity_df, features=[ "driver_stats:avg_daily_trips", "driver_stats:total_distance", "customer_stats:lifetime_value" ] ).to_df()
Advanced Configuration
# Production-ready configuration batch_engine: type: ray.engine # Resource configuration max_workers: 16 max_parallelism_multiplier: 4 # Performance optimization enable_optimization: true broadcast_join_threshold_mb: 50 target_partition_size_mb: 128 # Distributed join configuration window_size_for_joins: "30min" # Ray cluster configuration ray_address: "ray://head-node:10001"
Complete Example Configuration
Here's a complete example configuration showing how to use Ray offline store with Ray compute engine:
# Complete example configuration for Ray offline store + Ray compute engine # This shows how to use both components together for distributed processing project: my_feast_project registry: data/registry.db provider: local # Ray offline store configuration # Handles data I/O operations (reading/writing data) offline_store: type: ray storage_path: s3://my-bucket/feast-data # Optional: Path for storing datasets ray_address: localhost:10001 # Optional: Ray cluster address # Ray compute engine configuration # Handles complex feature computation and distributed processing batch_engine: type: ray.engine # Resource configuration max_workers: 8 # Maximum number of Ray workers max_parallelism_multiplier: 2 # Parallelism as multiple of CPU cores # Performance optimization enable_optimization: true # Enable performance optimizations broadcast_join_threshold_mb: 100 # Broadcast join threshold (MB) target_partition_size_mb: 64 # Target partition size (MB) # Distributed join configuration window_size_for_joins: "1H" # Time window for distributed joins # Ray cluster configuration (inherits from offline_store if not specified) ray_address: localhost:10001 # Ray cluster address
DAG Node Types
The Ray compute engine implements several specialized DAG nodes:
RayReadNode
Reads data from Ray-compatible sources:
- Supports Parquet, CSV, and other formats
- Handles partitioning and schema inference
- Applies field mappings and filters
RayJoinNode
Performs distributed joins:
- Broadcast Join: For small datasets (<100MB)
- Distributed Join: For large datasets with time-based windowing
- Automatic Strategy Selection: Based on dataset size and cluster resources
RayFilterNode
Applies filters and time-based constraints:
- TTL-based filtering
- Timestamp range filtering
- Custom predicate filtering
RayAggregationNode
Handles feature aggregations:
- Windowed aggregations
- Grouped aggregations
- Custom aggregation functions
RayTransformationNode
Applies feature transformations:
- Row-level transformations
- Column-level transformations
- Custom transformation functions
RayWriteNode
Writes results to various targets:
- Online stores
- Offline stores
- Temporary storage
Join Strategies
The Ray compute engine automatically selects optimal join strategies:
Broadcast Join
Used for small feature datasets:
- Automatically selected when feature data < 100MB
- Features are cached in Ray's object store
- Entities are distributed across cluster
- Each worker gets a copy of feature data
Distributed Windowed Join
Used for large feature datasets:
- Automatically selected when feature data > 100MB
- Data is partitioned by time windows
- Point-in-time joins within each window
- Results are combined across windows
Strategy Selection Logic
def select_join_strategy(feature_size_mb, threshold_mb): if feature_size_mb < threshold_mb: return "broadcast" else: return "distributed_windowed"
Performance Optimization
Automatic Optimization
The Ray compute engine includes several automatic optimizations:
- Partition Optimization: Automatically determines optimal partition sizes
- Join Strategy Selection: Chooses between broadcast and distributed joins
- Resource Allocation: Scales workers based on available resources
- Memory Management: Handles out-of-core processing for large datasets
Manual Tuning
For specific workloads, you can fine-tune performance:
batch_engine: type: ray.engine # Fine-tuning for high-throughput scenarios broadcast_join_threshold_mb: 200 # Larger broadcast threshold max_parallelism_multiplier: 1 # Conservative parallelism target_partition_size_mb: 512 # Larger partitions window_size_for_joins: "2H" # Larger time windows
Monitoring and Metrics
Monitor Ray compute engine performance:
import ray # Check cluster resources resources = ray.cluster_resources() print(f"Available CPUs: {resources.get('CPU', 0)}") print(f"Available memory: {resources.get('memory', 0) / 1e9:.2f} GB") # Monitor job progress job = store.get_historical_features(...) # Ray compute engine provides built-in progress tracking
Integration Examples
With Spark Offline Store
# Use Ray compute engine with Spark offline store offline_store: type: spark spark_conf: spark.executor.memory: "4g" spark.executor.cores: "2" batch_engine: type: ray.engine max_workers: 8 enable_optimization: true
With Cloud Storage
# Use Ray compute engine with cloud storage offline_store: type: ray storage_path: s3://my-bucket/feast-data batch_engine: type: ray.engine ray_address: "ray://ray-cluster:10001" broadcast_join_threshold_mb: 50
With Feature Transformations
On-Demand Transformations
from feast import FeatureView, Field from feast.types import Float64 from feast.on_demand_feature_view import on_demand_feature_view @on_demand_feature_view( sources=["driver_stats"], schema=[Field(name="trips_per_hour", dtype=Float64)] ) def trips_per_hour(features_df): features_df["trips_per_hour"] = features_df["avg_daily_trips"] / 24 return features_df # Ray compute engine handles transformations efficiently features = store.get_historical_features( entity_df=entity_df, features=["trips_per_hour:trips_per_hour"] )
Ray Native Transformations
For distributed transformations that leverage Ray's dataset and parallel processing capabilities, use mode="ray" in your BatchFeatureView:
# Feature view with Ray transformation mode document_embeddings_view = BatchFeatureView( name="document_embeddings", entities=[document], mode="ray", # Enable Ray native transformation ttl=timedelta(days=365), schema=[ Field(name="document_id", dtype=String), Field(name="embedding", dtype=Array(Float32), vector_index=True), Field(name="movie_name", dtype=String), Field(name="movie_director", dtype=String), ], source=movies_source, udf=generate_embeddings_ray_native, online=True, )
For more information, see the Ray documentation and Ray Data guide.