[feat] Introduce high-level key-value (KV) interface by 0oshowero0 · Pull Request #28 · Ascend/TransferQueue
…ller path Co-authored-by: 看我72遍<m.pb@msn.com> # message auto-generated for no-merge-commit merge: !29 merge refactor/columnar-field-schema into main [fix,refactor] Complete columnar metadata refactor for manager→controller path Created-by: mpb159753 Commit-by: 看我72遍 Merged-by: ascend-robot Description: # Columnar FieldSchema + Unified Controller Metadata ## 1. Context & Motivation Follows: [#28 — Columnar BatchMeta + Zero-Copy Default](https://gitcode.com/Ascend/TransferQueue/pull/28) PR #39 converted `BatchMeta` from row-oriented to columnar layout, but two O(B×F) bottlenecks remained on the **Manager → Controller** path: 1. **`notify_data_update` payload**: The Manager expanded columnar `field_schema` back into per-sample dicts (`dtypes: {global_index: {field: dtype}}`, `shapes: {global_index: {field: shape}}`), transmitting O(B×F) data over ZMQ for information that is inherently O(F). 2. **Controller metadata storage**: `DataPartitionStatus` maintained three separate stores (`field_dtypes`, `field_shapes`, `field_schema_cache`) with redundant per-sample indexing, requiring multi-pass reconciliation logic to detect nested tensors. This PR completes the columnar refactoring by: - Transmitting `field_schema` directly as O(F) columnar data (no per-sample expansion) - Introducing `FieldColumnMeta` as the **single source of truth** for per-field metadata in the Controller - Adding `RoutingGroup` to carry batch positions alongside global indexes, eliminating intermediate mapping - Extracting `_pack_field_values` as a reusable static method with defensive checks ## 2. Key Changes ### 2.1 Columnar `notify_data_update` Protocol (`base.py`, `simple_backend_manager.py`) **Before** (O(B×F) expansion in Manager): ```python dtypes_for_notify = { global_index: {field_name: field_meta.get("dtype") for field_name, field_meta in field_schema.items()} for global_index in metadata.global_indexes } shapes_for_notify = { ... } # same pattern await self.notify_data_update(partition_id, field_names, global_indexes, dtypes_for_notify, shapes_for_notify) ``` **After** (O(F) — pass through as-is): ```python await self.notify_data_update(partition_id, global_indexes, field_schema) ``` - Removed `fields`, `dtypes`, `shapes` parameters - `field_schema` is already columnar from `metadata.py` — no expansion needed - KV path (`base.py`) similarly simplified, removing 25-line per-sample expansion loop ### 2.2 `FieldColumnMeta` Dataclass (`controller.py`) Replaces three separate stores (`field_dtypes`, `field_shapes`, `field_schema_cache`) with a single `@dataclass`: ```python @DataClass class FieldColumnMeta: dtype: Any = None shape: Optional[tuple] = None is_nested: bool = False is_non_tensor: bool = False per_sample_shapes: dict[int, tuple] = field(default_factory=dict) ``` - Field-level attributes are O(1) — shared across all samples - Sample-level shapes only stored for nested tensors — O(B_nested) not O(B) - `to_batch_schema()` generates `BatchMeta`-compatible dicts on demand - `remove_samples()` cleans up released indexes ### 2.3 `RoutingGroup` NamedTuple (`simple_backend_manager.py`) ```python class RoutingGroup(NamedTuple): global_indexes: list[int] batch_positions: list[int] ``` - `_group_by_hash` now returns `dict[str, RoutingGroup]` instead of `dict[str, list[int]]` - Carries both global indexes and batch positions, eliminating the intermediate `global_idx → position` mapping in `get_data` - GET merge logic simplified: scatter results directly to batch positions without building per-sample dicts ### 2.4 `_pack_field_values` Extraction (`simple_backend_manager.py`) Extracted inline packing logic into a reusable `@staticmethod` with explicit error handling: - Validates non-empty input and absence of `None` values - Handles regular tensors (`torch.stack`), nested tensors (`torch.nested.as_nested_tensor`), and non-tensors (`NonTensorStack`) ### 2.5 Simplified Controller API - `update_production_status`: Removed `field_names` and `dtypes`/`shapes` parameters; `field_names` derived from `field_schema.keys()` - `get_field_schema`: Delegates to `FieldColumnMeta.to_batch_schema()` instead of building from cache - Removed `get_field_dtype` and `get_field_shape` helper methods (no longer needed) ### 2.6 Test Suite - All test files updated to match new `notify_data_update` and `update_production_status` signatures - `test_controller_data_partitions.py`: Tests adapted for `FieldColumnMeta`-based schema storage ## 3. Benchmark Results Tests conducted in Docker (single-node Ray) across 7 payload sizes (0.05 MB → 25.4 GB). Three configurations compared: - **pre-refactor**: Baseline (row-oriented, before PR #39) - **columnar-batch-meta**: After PR #39 (columnar BatchMeta + zero-copy) - **columnar-field-schema**: This PR (columnar notify + FieldColumnMeta + RoutingGroup) ### Speedup (relative to pre-refactor baseline)   | Data Scale | PUT Speedup (vs baseline) | PUT Speedup (vs PR #39) | GET Speedup (vs baseline) | GET Speedup (vs PR #39) | |------------|:------------------------:|:-----------------------:|:------------------------:|:-----------------------:| | debug (0.05 MB) | **1.4×** | +12% | **1.5×** | +16% | | tiny (1.5 MB) | **1.8×** | +19% | **2.1×** | +13% | | small (0.15 GB) | **5.1×** | +20% | **3.4×** | ≈0% | | medium (1.5 GB) | **5.8×** | +7% | **2.2×** | −1% | | large (6.3 GB) | **5.6×** | +8% | **2.0×** | −4% | | xlarge (12.7 GB) | **5.5×** | +8% | **2.2×** | +1% | | huge (25.4 GB) | **5.4×** | +6% | **2.2×** | +1% | ### Absolute Bandwidth   | Data Scale | Pre-Refactor | Columnar BatchMeta (PR #39) | Columnar FieldSchema (This PR) | |------------|:-----------:|:---------------------------:|:------------------------------:| | **PUT** medium | 3.95 Gbps | 21.29 Gbps | **22.84 Gbps** | | **PUT** large | 5.04 Gbps | 26.14 Gbps | **28.18 Gbps** | | **PUT** huge | 5.09 Gbps | 26.05 Gbps | **27.49 Gbps** | | **GET** medium | 4.24 Gbps | 9.50 Gbps | **9.39 Gbps** | | **GET** large | 4.98 Gbps | 10.51 Gbps | **10.14 Gbps** | | **GET** huge | 4.86 Gbps | 10.46 Gbps | **10.53 Gbps** | ### Summary - **PUT path** benefits most: +6% to +20% over PR #39 across all scales, consistent 5×+ improvement over pre-refactor baseline at medium+ scales - **GET path** maintains parity with PR #39 — improvements are within noise margin; the GET bottleneck is in ZMQ transport, not metadata - Small payloads see the largest relative improvement, confirming the metadata overhead reduction ### Resource Usage Memory usage is comparable or slightly reduced (eliminated per-sample `field_dtypes`/`field_shapes` dicts in Controller). ## 4. API Breaking Changes - `notify_data_update()`: Removed `fields`, `dtypes`, `shapes` parameters; replaced with single `field_schema` dict - `update_production_status()`: Removed `field_names`, `dtypes`, `shapes` parameters; replaced with single `field_schema` dict; `field_names` derived from `field_schema.keys()` - `get_field_dtype()` / `get_field_shape()`: Removed (replaced by `FieldColumnMeta`) - `_group_by_hash()`: Now returns `dict[str, RoutingGroup]` instead of `dict[str, list[int]]` ## 5. Files Changed ``` 7 files changed, 451 insertions(+), 440 deletions(-) ``` | File | Description | |------|-------------| | `controller.py` | `FieldColumnMeta` dataclass; simplified `update_production_status` / `get_field_schema`; removed `get_field_dtype`/`get_field_shape` | | `simple_backend_manager.py` | `RoutingGroup`; `_pack_field_values`; position-based GET merge; columnar `notify_data_update` | | `base.py` | Columnar `notify_data_update` protocol; simplified KV path | | `test_controller.py` | Adapted to new API signatures | | `test_controller_data_partitions.py` | Adapted to `FieldColumnMeta`-based schema | | `test_async_simple_storage_manager.py` | Adapted to `RoutingGroup` and new notify protocol | | `test_kv_storage_manager.py` | Minor signature update | ## 6. Conclusion This PR completes the second phase of columnar refactoring by eliminating the remaining O(B×F) metadata expansion in the Manager→Controller path and unifying metadata storage in the Controller: - **PUT throughput**: Up to 5.8× over pre-refactor baseline, +6–20% over PR #39 - **GET throughput**: Up to 3.4× over pre-refactor baseline, parity with PR #39 - **Code clarity**: Three separate metadata stores → one `FieldColumnMeta` dataclass; per-sample expansion loops eliminated - **Net change**: +451 / −440 lines across 7 files > **Note on GET path**: The GET path performance improvement from metadata-level refactoring has reached diminishing returns — the minor fluctuations (±1–4%) observed in benchmarks are within normal measurement noise. Further GET throughput gains would likely require a deeper architectural change: fully columnarizing the GET data flow itself (e.g., columnar storage layout in StorageUnit, field-level parallel retrieval), rather than continuing to optimize the metadata layer. See merge request: Ascend/TransferQueue!29