[refactor] Simplify initialization and improve API usability by 0oshowero0 · Pull Request #26 · Ascend/TransferQueue
## Summary This PR introduces a **High-Level Key-Value (KV) Interface** to TransferQueue, offering a Redis-style API that can enjoy most of the advanced features provided by TransferQueue. ## Background In previous versions of TransferQueue, the learning curve was relatively sharp for new users. To perform basic operations, users had to: 1. Understand `BatchMeta` `SampleMeta` and `FieldMeta` design (as illustrated in [tutorial/02_metadat_concepts.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/02_metadata_concepts.py) 2. Navigate the flexible but complex [`TransferQueueClient`](https://github.com/Ascend/TransferQueue/blob/main/transfer_queue/client.py) API. Although PR #26 simplified the initialization process, the core interaction still required exposing low-level details. This PR bridges that gap by providing a familiar, easy-to-use KV abstraction. ## TransferQueue API Architecture With this PR, TransferQueue now supports a two-level API architecture to satisfy different user needs. | Level | Tier | Style | Fine-Grained Access | Streaming | Sampler | Multiple-Backends | |---|---|---|---|---|---|---| | High | **KV Interface** (this PR) | Put/Get/List/Clear | ✓ | ○ | ✗ | ✓ | | High | **StreamingDataLoader** (#23) | PyTorch DataLoader | ✓ |✓ | ✓ | ✓ | | Low | **TransferQueueClient** | Metadata-based | ✓ | ✓ | ✓ | ✓ | ### High-Level API #### Key-Value based API (This PR) **Methods** - **(async_)kv_put**: Insert/Update a multi-column sample by key, with optional metadata tag - **(async_)kv_batch_put**: Put multiple key-value pairs efficiently in batch - **(async_)kv_batch_get**: Retrieve samples (by keys), supporting column selection (by fields) - **(async_)kv_list**: List keys and tags (metadata) in a partition - **(async_)kv_clear**: Remove key-value pairs from storage **Key Features** - **Redis-style Semantics**: Familiar KV interface (Put/Get/List) for zero learning curve - **Fine-grained Access**: Update or retrieve specific fields (columns) within a key (row) without full op. - **Partition Isolation**: Logical separation of storage namespaces - **Metadata Tags**: Lightweight metadata for status tracking - **Pluggable Backends**: Supports multiple backends #### StreamingDataLoader API Refer to our [RoadMap](#1) and related PRs(#23). The usage example can be found in [tutorial/06_streaming_dataloader.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/06_streaming_dataloader.py). ### Low-Level API Directly manipulate the `TransferQueueClient`. Refer to [tutorial/03_metadata_concepts.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/03_metadata_concepts.py), [tutorial/04_understanding_controller.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/04_understanding_controller.py) and [tutorial/05_custom_sampler.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/05_custom_sampler.py) for details. ## Usage Example Please refer to [tutorial/02_kv_interface.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/02_kv_interface.py) and [tests/e2e/test_kv_interface_e2e.py](https://github.com/Ascend/TransferQueue/blob/main/tests/e2e/test_kv_interface_e2e.py) for details. ```python3 import torch from tensordict import TensorDict import transfer_queue as tq # initialize TQ tq.init() # prepare data batch_input_ids = torch.tensor( [ [4, 5, 6], [7, 8, 9], [10, 11, 12], [13, 14, 15], ] ) batch_attention_mask = torch.ones_like(batch_input_ids) data_batch = TensorDict( { "input_ids": batch_input_ids, "attention_mask": batch_attention_mask, }, batch_size=batch_input_ids.size(0), ) keys = ["1_0", "1_1", "1_2", "2_0"] # 4 keys for 4 samples tags = [{"global_steps": 1, "status": "running", "model_version": 1} for _ in range(len(keys))] partition_id = "test" # use kv interface to put into TQ tq.kv_batch_put(keys=keys, partition_id=partition_id, fields=data_batch, tags=tags) # list all keys and tags all_keys, all_tags = tq.kv_list(partition_id=partition_id) for k, t in zip(all_keys, all_tags, strict=False): print(f" - key='{k}' | tag={t}") # retrieve all data retrieved_all = tq.kv_batch_get(keys=all_keys, partition_id=partition_id) print(f" Fields: {list(retrieved_all.keys())}") ``` ## Use Cases & Limitations **Best For**: - Scenarios requiring fine-grained data access (e.g., updating a reward score for a specific prompt). - Integration with external ReplayBuffers or Single-Controller architectures that manage sample dispatching logic. **Limitations (vs. Streaming/Low-level APIs):** - No built-in production/consumption tracking: Users must manually check status via tags or manage logic externally. - No Built-in Sampler: Must implement data dispatch by ReplayBuffer or single-controller externally. - Not Fully Streaming: Consumers typically wait for a controller to dispatch `keys` before fetching, rather than a continuous stream. --------- Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>