[feat] Introduce high-level key-value (KV) interface by 0oshowero0 · Pull Request #28 · Ascend/TransferQueue

@mpb159753

…ller path

Co-authored-by: 看我72遍<m.pb@msn.com>



# message auto-generated for no-merge-commit merge:
!29 merge refactor/columnar-field-schema into main

[fix,refactor] Complete columnar metadata refactor for manager→controller path

Created-by: mpb159753
Commit-by: 看我72遍
Merged-by: ascend-robot
Description: # Columnar FieldSchema + Unified Controller Metadata

## 1. Context & Motivation

Follows: [#28 — Columnar BatchMeta + Zero-Copy Default](https://gitcode.com/Ascend/TransferQueue/pull/28)

PR #39 converted `BatchMeta` from row-oriented to columnar layout, but two O(B×F) bottlenecks remained on the **Manager → Controller** path:

1. **`notify_data_update` payload**: The Manager expanded columnar `field_schema` back into per-sample dicts (`dtypes: {global_index: {field: dtype}}`, `shapes: {global_index: {field: shape}}`), transmitting O(B×F) data over ZMQ for information that is inherently O(F).
2. **Controller metadata storage**: `DataPartitionStatus` maintained three separate stores (`field_dtypes`, `field_shapes`, `field_schema_cache`) with redundant per-sample indexing, requiring multi-pass reconciliation logic to detect nested tensors.

This PR completes the columnar refactoring by:

- Transmitting `field_schema` directly as O(F) columnar data (no per-sample expansion)
- Introducing `FieldColumnMeta` as the **single source of truth** for per-field metadata in the Controller
- Adding `RoutingGroup` to carry batch positions alongside global indexes, eliminating intermediate mapping
- Extracting `_pack_field_values` as a reusable static method with defensive checks

## 2. Key Changes

### 2.1 Columnar `notify_data_update` Protocol (`base.py`, `simple_backend_manager.py`)

**Before** (O(B×F) expansion in Manager):

```python
dtypes_for_notify = {
    global_index: {field_name: field_meta.get("dtype") for field_name, field_meta in field_schema.items()}
    for global_index in metadata.global_indexes
}
shapes_for_notify = { ... }  # same pattern
await self.notify_data_update(partition_id, field_names, global_indexes, dtypes_for_notify, shapes_for_notify)
```

**After** (O(F) — pass through as-is):

```python
await self.notify_data_update(partition_id, global_indexes, field_schema)
```

- Removed `fields`, `dtypes`, `shapes` parameters
- `field_schema` is already columnar from `metadata.py` — no expansion needed
- KV path (`base.py`) similarly simplified, removing 25-line per-sample expansion loop

### 2.2 `FieldColumnMeta` Dataclass (`controller.py`)

Replaces three separate stores (`field_dtypes`, `field_shapes`, `field_schema_cache`) with a single `@dataclass`:

```python
@DataClass
class FieldColumnMeta:
    dtype: Any = None
    shape: Optional[tuple] = None
    is_nested: bool = False
    is_non_tensor: bool = False
    per_sample_shapes: dict[int, tuple] = field(default_factory=dict)
```

- Field-level attributes are O(1) — shared across all samples
- Sample-level shapes only stored for nested tensors — O(B_nested) not O(B)
- `to_batch_schema()` generates `BatchMeta`-compatible dicts on demand
- `remove_samples()` cleans up released indexes

### 2.3 `RoutingGroup` NamedTuple (`simple_backend_manager.py`)

```python
class RoutingGroup(NamedTuple):
    global_indexes: list[int]
    batch_positions: list[int]
```

- `_group_by_hash` now returns `dict[str, RoutingGroup]` instead of `dict[str, list[int]]`
- Carries both global indexes and batch positions, eliminating the intermediate `global_idx → position` mapping in `get_data`
- GET merge logic simplified: scatter results directly to batch positions without building per-sample dicts

### 2.4 `_pack_field_values` Extraction (`simple_backend_manager.py`)

Extracted inline packing logic into a reusable `@staticmethod` with explicit error handling:

- Validates non-empty input and absence of `None` values
- Handles regular tensors (`torch.stack`), nested tensors (`torch.nested.as_nested_tensor`), and non-tensors (`NonTensorStack`)

### 2.5 Simplified Controller API

- `update_production_status`: Removed `field_names` and `dtypes`/`shapes` parameters; `field_names` derived from `field_schema.keys()`
- `get_field_schema`: Delegates to `FieldColumnMeta.to_batch_schema()` instead of building from cache
- Removed `get_field_dtype` and `get_field_shape` helper methods (no longer needed)

### 2.6 Test Suite

- All test files updated to match new `notify_data_update` and `update_production_status` signatures
- `test_controller_data_partitions.py`: Tests adapted for `FieldColumnMeta`-based schema storage

## 3. Benchmark Results

Tests conducted in Docker (single-node Ray) across 7 payload sizes (0.05 MB → 25.4 GB). Three configurations compared:

- **pre-refactor**: Baseline (row-oriented, before PR #39)
- **columnar-batch-meta**: After PR #39 (columnar BatchMeta + zero-copy)
- **columnar-field-schema**: This PR (columnar notify + FieldColumnMeta + RoutingGroup)

### Speedup (relative to pre-refactor baseline)

![image.png](https://raw.gitcode.com/user-images/assets/8886051/4c49b557-9d15-4298-9d5e-bd06e8ea05a6/image.png 'image.png')
![image.png](https://raw.gitcode.com/user-images/assets/8886051/8992bfb1-e5fc-4f06-9585-f72906c53863/image.png 'image.png')

| Data Scale | PUT Speedup (vs baseline) | PUT Speedup (vs PR #39) | GET Speedup (vs baseline) | GET Speedup (vs PR #39) |
|------------|:------------------------:|:-----------------------:|:------------------------:|:-----------------------:|
| debug (0.05 MB) | **1.4×** | +12% | **1.5×** | +16% |
| tiny (1.5 MB) | **1.8×** | +19% | **2.1×** | +13% |
| small (0.15 GB) | **5.1×** | +20% | **3.4×** | ≈0% |
| medium (1.5 GB) | **5.8×** | +7% | **2.2×** | −1% |
| large (6.3 GB) | **5.6×** | +8% | **2.0×** | −4% |
| xlarge (12.7 GB) | **5.5×** | +8% | **2.2×** | +1% |
| huge (25.4 GB) | **5.4×** | +6% | **2.2×** | +1% |

### Absolute Bandwidth

![image.png](https://raw.gitcode.com/user-images/assets/8886051/05b789cc-f4aa-4a5a-833b-55617cd3a673/image.png 'image.png')
![image.png](https://raw.gitcode.com/user-images/assets/8886051/e2f927cd-5556-46af-bf7b-71e451752c11/image.png 'image.png')

| Data Scale | Pre-Refactor | Columnar BatchMeta (PR #39) | Columnar FieldSchema (This PR) |
|------------|:-----------:|:---------------------------:|:------------------------------:|
| **PUT** medium | 3.95 Gbps | 21.29 Gbps | **22.84 Gbps** |
| **PUT** large | 5.04 Gbps | 26.14 Gbps | **28.18 Gbps** |
| **PUT** huge | 5.09 Gbps | 26.05 Gbps | **27.49 Gbps** |
| **GET** medium | 4.24 Gbps | 9.50 Gbps | **9.39 Gbps** |
| **GET** large | 4.98 Gbps | 10.51 Gbps | **10.14 Gbps** |
| **GET** huge | 4.86 Gbps | 10.46 Gbps | **10.53 Gbps** |

### Summary

- **PUT path** benefits most: +6% to +20% over PR #39 across all scales, consistent 5×+ improvement over pre-refactor baseline at medium+ scales
- **GET path** maintains parity with PR #39 — improvements are within noise margin; the GET bottleneck is in ZMQ transport, not metadata
- Small payloads see the largest relative improvement, confirming the metadata overhead reduction

### Resource Usage

Memory usage is comparable or slightly reduced (eliminated per-sample `field_dtypes`/`field_shapes` dicts in Controller).

## 4. API Breaking Changes

- `notify_data_update()`: Removed `fields`, `dtypes`, `shapes` parameters; replaced with single `field_schema` dict
- `update_production_status()`: Removed `field_names`, `dtypes`, `shapes` parameters; replaced with single `field_schema` dict; `field_names` derived from `field_schema.keys()`
- `get_field_dtype()` / `get_field_shape()`: Removed (replaced by `FieldColumnMeta`)
- `_group_by_hash()`: Now returns `dict[str, RoutingGroup]` instead of `dict[str, list[int]]`

## 5. Files Changed

```
7 files changed, 451 insertions(+), 440 deletions(-)
```

| File | Description |
|------|-------------|
| `controller.py` | `FieldColumnMeta` dataclass; simplified `update_production_status` / `get_field_schema`; removed `get_field_dtype`/`get_field_shape` |
| `simple_backend_manager.py` | `RoutingGroup`; `_pack_field_values`; position-based GET merge; columnar `notify_data_update` |
| `base.py` | Columnar `notify_data_update` protocol; simplified KV path |
| `test_controller.py` | Adapted to new API signatures |
| `test_controller_data_partitions.py` | Adapted to `FieldColumnMeta`-based schema |
| `test_async_simple_storage_manager.py` | Adapted to `RoutingGroup` and new notify protocol |
| `test_kv_storage_manager.py` | Minor signature update |

## 6. Conclusion

This PR completes the second phase of columnar refactoring by eliminating the remaining O(B×F) metadata expansion in the Manager→Controller path and unifying metadata storage in the Controller:

- **PUT throughput**: Up to 5.8× over pre-refactor baseline, +6–20% over PR #39
- **GET throughput**: Up to 3.4× over pre-refactor baseline, parity with PR #39
- **Code clarity**: Three separate metadata stores → one `FieldColumnMeta` dataclass; per-sample expansion loops eliminated
- **Net change**: +451 / −440 lines across 7 files

> **Note on GET path**: The GET path performance improvement from metadata-level refactoring has reached diminishing returns — the minor fluctuations (±1–4%) observed in benchmarks are within normal measurement noise. Further GET throughput gains would likely require a deeper architectural change: fully columnarizing the GET data flow itself (e.g., columnar storage layout in StorageUnit, field-level parallel retrieval), rather than continuing to optimize the metadata layer.


See merge request: Ascend/TransferQueue!29