[refactor] Simplify initialization and improve API usability by 0oshowero0 · Pull Request #26 · Ascend/TransferQueue

@0oshowero0

## Summary

This PR introduces a **High-Level Key-Value (KV) Interface** to
TransferQueue, offering a Redis-style API that can enjoy most of the
advanced features provided by TransferQueue.

## Background

In previous versions of TransferQueue, the learning curve was relatively
sharp for new users. To perform basic operations, users had to:

1. Understand `BatchMeta` `SampleMeta` and `FieldMeta` design (as
illustrated in
[tutorial/02_metadat_concepts.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/02_metadata_concepts.py)
2. Navigate the flexible but complex
[`TransferQueueClient`](https://github.com/Ascend/TransferQueue/blob/main/transfer_queue/client.py)
API.

Although PR #26 simplified
the initialization process, the core interaction still required exposing
low-level details. This PR bridges that gap by providing a familiar,
easy-to-use KV abstraction.

## TransferQueue API Architecture
With this PR, TransferQueue now supports a two-level API architecture to
satisfy different user needs.

| Level | Tier | Style | Fine-Grained Access | Streaming | Sampler |
Multiple-Backends |
|---|---|---|---|---|---|---|
| High | **KV Interface** (this PR) | Put/Get/List/Clear | ✓ | ○ | ✗ | ✓
|
| High | **StreamingDataLoader** (#23) | PyTorch DataLoader | ✓ |✓ | ✓ |
✓ |
| Low |  **TransferQueueClient** | Metadata-based | ✓ | ✓ | ✓ | ✓ | 

### High-Level API
#### Key-Value based API (This PR)

**Methods**

- **(async_)kv_put**: Insert/Update a multi-column sample by key, with
optional metadata tag
- **(async_)kv_batch_put**: Put multiple key-value pairs efficiently in
batch
- **(async_)kv_batch_get**: Retrieve samples (by keys), supporting
column selection (by fields)
- **(async_)kv_list**: List keys and tags (metadata) in a partition
- **(async_)kv_clear**: Remove key-value pairs from storage

**Key Features**

- **Redis-style Semantics**: Familiar KV interface (Put/Get/List) for
zero learning curve
- **Fine-grained Access**: Update or retrieve specific fields (columns)
within a key (row) without full op.
- **Partition Isolation**: Logical separation of storage namespaces
- **Metadata Tags**: Lightweight metadata for status tracking
- **Pluggable Backends**: Supports multiple backends

#### StreamingDataLoader API
Refer to our [RoadMap](#1)
and related PRs(#23).

The usage example can be found in
[tutorial/06_streaming_dataloader.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/06_streaming_dataloader.py).

### Low-Level API
Directly manipulate the `TransferQueueClient`. Refer to
[tutorial/03_metadata_concepts.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/03_metadata_concepts.py),
[tutorial/04_understanding_controller.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/04_understanding_controller.py)
and
[tutorial/05_custom_sampler.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/05_custom_sampler.py)
for details.

## Usage Example

Please refer to
[tutorial/02_kv_interface.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/02_kv_interface.py)
and
[tests/e2e/test_kv_interface_e2e.py](https://github.com/Ascend/TransferQueue/blob/main/tests/e2e/test_kv_interface_e2e.py)
for details.

```python3
import torch
from tensordict import TensorDict
import transfer_queue as tq

# initialize TQ
tq.init()

# prepare data
batch_input_ids = torch.tensor(
    [
        [4, 5, 6],
        [7, 8, 9],
        [10, 11, 12],
        [13, 14, 15],
    ]
)
batch_attention_mask = torch.ones_like(batch_input_ids)

data_batch = TensorDict(
    {
        "input_ids": batch_input_ids,
        "attention_mask": batch_attention_mask,
    },
    batch_size=batch_input_ids.size(0),
)

keys = ["1_0", "1_1", "1_2", "2_0"]  # 4 keys for 4 samples
tags = [{"global_steps": 1, "status": "running", "model_version": 1} for _ in range(len(keys))]
partition_id = "test"
# use kv interface to put into TQ
tq.kv_batch_put(keys=keys, partition_id=partition_id, fields=data_batch, tags=tags)

# list all keys and tags
all_keys, all_tags = tq.kv_list(partition_id=partition_id)
for k, t in zip(all_keys, all_tags, strict=False):
    print(f"    - key='{k}' | tag={t}")

# retrieve all data
retrieved_all = tq.kv_batch_get(keys=all_keys, partition_id=partition_id)
print(f"  Fields: {list(retrieved_all.keys())}")
```

## Use Cases & Limitations

**Best For**:

- Scenarios requiring fine-grained data access (e.g., updating a reward
score for a specific prompt).
- Integration with external ReplayBuffers or Single-Controller
architectures that manage sample dispatching logic.

**Limitations (vs. Streaming/Low-level APIs):**
- No built-in production/consumption tracking: Users must manually check
status via tags or manage logic externally.
- No Built-in Sampler: Must implement data dispatch by ReplayBuffer or
single-controller externally.
- Not Fully Streaming: Consumers typically wait for a controller to
dispatch `keys` before fetching, rather than a continuous stream.

---------

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>