ya-dataflow is a modern, data-centric AI framework designed to orchestrate complex, large-scale workflows for Large Language Models (LLMs) and Vision-Language Models (VLMs).
It provides a unified, modular, and highly scalable architecture to manage the entire lifecycle of AI data processing—from raw data ingestion and sophisticated multi-modal reasoning to high-throughput trajectory synthesis and large-scale evaluation.
Latest Version: v1.0.15 (2026-05-15)
In the era of LLMs, the bottleneck is no longer just model capacity, but the quality and scale of the data pipelines that feed them. ya-dataflow is built to address this by treating data as a first-class citizen, providing a robust engine to transform raw, unstructured data into high-quality, structured datasets for training, fine-tuning, and evaluation.
At the heart of ya-dataflow is a highly extensible operator-based design. Every task—whether it's a simple text cleaning step, a complex RAG retrieval, or a sophisticated VLM reasoning process—is implemented as a standardized Operator. This allows for seamless composition of complex pipelines.
Designed for massive workloads, ya-dataflow supports advanced execution strategies like PartitionPipelineParallelRun. It enables high-throughput, parallel processing across distributed environments, making it capable of handling billions of tokens or millions of multi-modal assets.
Leveraging a powerful binary and structured file generation system, ya-dataflow can synthesize diverse datasets—including JSON, XML, Markdown, HTML, and complex binary formats like PDF, XLSX, and PPTX—to create robust testing and training environments.
Deeply integrated with the OpenClaw and Nanobot ecosystems. Through CLIOpenClawServing and NanobotServing, your data pipelines become accessible intelligence that can be dynamically invoked and orchestrated by autonomous agents.
Native support for Vision-Language Models with seamless image encoding, base64 transmission, and multimodal message handling. Process text, images, and their combinations in a unified pipeline.
Advanced user simulation capabilities with evidence-driven cognitive architecture:
- Perception Stage: Extract structured evidence from files and agent trajectories
- Understanding Stage: Audit progress, identify milestones, assess task state
- Decision Stage: Generate strategic dialogue responses with dynamic user personas
- Emotional Modeling: Natural emotional tone tracking (satisfied/dissatisfied/confused/urgent/neutral)
- Complete Async Removal: Simplified codebase by removing all
async/awaitpatterns from User Simulator module - Thread-Based Concurrency: Replaced
asyncio.gather()withThreadPoolExecutorfor better I/O parallelism - Simplified Debugging: Synchronous programming model makes testing and debugging significantly easier
- Multimodal Input Support: Native text + image input handling
prompt = [ {"role": "user", "type": "text", "text": "Describe this image"}, {"role": "user", "type": "image_url", "image_url": {"url": "data:image/jpeg;base64,..."}} ]
- Embedding Support: New
generate_embedding()method for vector representations - Unified Architecture: Consolidated LLM client implementations with
LLMClientAdapter
- Vision Language Model Serving: Full VLM API support with multimodal message handling
- Image Encoding: Built-in base64 encoding and transmission
- Streamlined Design: 75% code reduction through refactoring
- Cloud-Native Storage: Native, seamless integration with S3 and other cloud storage providers
- Read-Write Separation: Decoupled
DataSourceandStoragelayers for maximum flexibility - Intelligent Caching: Advanced-level caching mechanisms (
CacheStorage) to optimize I/O - Checkpointing & Resumption: Built-in support for checkpointing—resume interrupted jobs exactly where they left off
- Fine-Grained Parallelism: Partition-level parallelism scales throughput by splitting datasets into granular work units
- Structured Output Validation: Native JSON schema validation with automatic filtering of invalid samples
- Error Recovery: Robust error handling with automatic retry and fallback mechanisms
- OpenClaw Powered: Use
CLIOpenClawServingto bridge agentic reasoning with data pipelines - Nanobot Ready: Integrated with
NanobotServingfor lightweight, high-performance serving - CLI Integration: Direct CLI support for agent invocation and orchestration
- Text + Image Processing: Unified handling of text and visual content
- Base64 Encoding: Automatic image encoding for API transmission
- OpenAI-Compatible Format: Standard multimodal message format support
Install the core framework:
pip install ya-dataflowInstall with specialized capabilities via extras:
# For RAG workflows
pip install ya-dataflow[rag]
# For Multimodal (VLM) and PDF processing
pip install ya-dataflow[pdf2vqa]
# For LLM serving and high-performance evaluation
pip install ya-dataflow[vllm,eval]
# For Code and Math reasoning tasks
pip install ya-dataflow[code,reasoning]
# For User Simulator and agentic workflows
pip install ya-dataflow[agent]In production, you define a pipeline by inheriting from PartitionPipelineParallelRun and implementing the forward method to orchestrate your operators.
from dataflow.pipeline import PartitionPipelineParallelRun
from dataflow.operators.core_text import TextCleaningOperator
from dataflow.utils.storage import FileDataSource, FileStorage, FileCacheStorage
from dataflow.serving.api_llm_serving_request import APILLMServing_request
class MyProductionPipeline(PartitionPipelineParallelRun):
def __init__(self, source: FileDataSource, storage: FileStorage, llm_serving: APILLMServing_request):
# 1. Initialize CacheStorage (Crucial: cannot be None)
cache_storage = FileCacheStorage(cache_path="./cache")
# 2. Initialize base class with cache_storage and explicit partitions
super().__init__(cache_storage=cache_storage, partitions=10)
self.storage = storage
self.llm_serving = llm_serving
# Define Operators
self.clean_op = TextCleaningOperator(self.llm_serving)
self.refine_op = SomeRefineOperator(self.llm_serving)
def forward(self):
# Step 1: Clean the raw text
self.clean_op.run(
self.storage.step(),
input_key="raw_text",
output_key="cleaned_text"
)
# Step 2: Refine based on cleaned text (Dependency: cleaned_text)
self.refine_op.run(
self.storage.step(),
input_key="raw_text",
output_key="final_result",
input_prev_1="cleaned_text"
)
# Usage
source = FileDataSource(paths=["./input.jsonl"])
storage = FileStorage(data_source=source, id_key="id", cache_path="./cache")
llm = APILLMServing_request(api_url="...", model_name="...")
pipeline = MyProductionPipeline(source, storage, llm)
pipeline.compile()
pipeline.run()Process text and images together with native multimodal support:
from dataflow.serving.llm_client import LLMClientAdapter
from dataflow.utils.storage import FileDataSource, FileStorage
# Initialize multimodal client
client = LLMClientAdapter(
api_url="http://localhost:8000/v1",
client_params={"model": "/data/share/models/Qwen-VL/"}
)
# Multimodal prompt with image
prompt = [
{"role": "user", "type": "text", "text": "Describe what you see in this image"},
{"role": "user", "type": "image_url", "image_url": {"url": "data:image/jpeg;base64,/9j/4AAQSkZJRg..."}}
]
response = client.generate(prompt)
print(response)Simulate realistic user interactions with evidence-driven cognitive architecture:
from dataflow.serving.agent.user_v2.simulator import UserSimulator
from dataflow.serving.llm_client import LLMClientAdapter
# Initialize simulator
llm_client = LLMClientAdapter(api_url="http://localhost:8000/v1")
simulator = UserSimulator(llm_client=llm_client)
# Run simulation
raw_data = {
"file_contents": {"config.py": "...", "main.py": "..."},
"agent_outputs": ["Agent: I found the issue...", "Agent: Here's the fix..."],
"feedbacks": ["This doesn't work", "Can you explain?"]
}
global_context = {
"question": "How do I fix the configuration error?",
"dialogue_scripts": [...] # Optional: persona scripts per stage
}
result = simulator.run(raw_data, global_context)
print(result["final_response"])
# Output: Natural user response with emotional tone and intentGenerate data on-the-fly without relying on pre-existing files.
Use when you have base data and want to enhance it with LLM-generated fields:
from dataflow.utils.storage import GeneratorDataSource, FileCacheStorage
from dataflow.serving.agent.cli_openclaw_serving import CLIOpenClawServing
# Define your base data generator
def task_generator():
"""Yield base task data"""
for i in range(1000):
yield {
"index": i,
"scene": "search" if i % 2 == 0 else "analysis",
"keywords": "特斯拉" if i % 2 == 0 else "财务数据"
}
# Create data source with LLM enhancement
source = GeneratorDataSource(
generator_fn=task_generator,
total_rows=1000,
name="enhanced_tasks",
serving=CLIOpenClawServing(agent_id="main"),
prompt_templates={
"question": "基于场景 {scene} 和关键词 {keywords},生成一个真实的技能使用问题。返回 JSON: {{\"question\": \"...\"}}",
"target_skills": "基于场景 {scene},选择 2-3 个适合的技能。返回 JSON: {{\"target_skills\": [...]}}",
},
fields_from_base=["index", "scene", "keywords"],
)
# Read data (LLM fields are generated on-the-fly)
for row in source.read(chunk_size=32):
print(row) # Contains: index, scene, keywords, question, target_skillsUse when you want LLM to generate all data from scratch:
from dataflow.utils.storage import LLMGeneratorDataSource
# Pure LLM generation - no base data needed
source = LLMGeneratorDataSource(
serving=CLIOpenClawServing(agent_id="main"),
prompts={
"question": "生成一个真实的 OpenClaw 技能使用问题。返回 JSON: {{\"question\": \"...\"}}",
"target_skills": "为这个问题选择 2-3 个合适的技能。返回 JSON: {{\"target_skills\": [...]}}",
"difficulty": "评估问题难度(1-5 分)。返回 JSON: {{\"difficulty\": 3}}",
},
num_rows=10000,
batch_size=32,
name="llm_generated_tasks",
)
# Read generated data
for row in source.read(chunk_size=32):
print(row) # Contains: question, target_skills, difficultyfrom dataflow.pipeline import PartitionPipelineParallelRun
from dataflow.utils.storage import S3DataSource, S3Storage, S3CacheStorage
# Configure massive S3-based workflow with checkpointing
source = S3DataSource(
endpoint="https://s3.example.com",
ak="YOUR_AK", sk="YOUR_SK",
s3_paths=["s3://my-bucket/massive-dataset/"],
)
storage = S3Storage(
data_source=source,
id_key="task_id",
cache_path="./local_cache",
cache_type="jsonl"
)
# Enable checkpointing via CacheStorage
progress_storage = S3CacheStorage(
endpoint="https://s3.example.com",
ak="YOUR_AK", sk="YOUR_SK",
cache_file="s3://my-bucket/checkpoints/pipeline_v1.json"
)
pipeline = PartitionPipelineParallelRun(
steps=[...],
data_source=source,
storage=storage,
cache_storage=progress_storage,
partitions=1000, # Scale to thousands of partitions
max_parallelism=32
)
# Run with automatic resumption
pipeline.run(resume_from_last=True)Enforce strict schema validation at the LLM level:
from dataflow.core.agentic import StepSchema, UserStep
from dataflow.serving.llm_client import LLMClientAdapter
# Define JSON schema for structured output
json_schema = {
"type": "object",
"properties": {
"question": {"type": "string"},
"target_skills": {
"type": "array",
"items": {"type": "string"},
"minItems": 2,
"maxItems": 3
},
"difficulty": {
"type": "integer",
"minimum": 1,
"maximum": 5
}
},
"required": ["question", "target_skills", "difficulty"]
}
# Create step with schema validation
step = UserStep(
name="TaskGenerator",
schema=StepSchema(
input_keys=["scene", "keywords"],
output_key="task",
prompt_template="Generate a task based on scene={scene} and keywords={keywords}",
json_schema=json_schema # Enforce schema at LLM level
),
llm_config={"temperature": 0.7}
)
# Execute with automatic validation
result = step.execute(data_pool, global_context, llm_client)
# result["json_resp"] is guaranteed to match the schemadataflow/
├── core/ # Core engine, registry, and base abstractions
│ ├── agentic/ # User simulator interfaces (LLMClientABC, UserSimulatorABC)
│ └── ...
├── operators/ # Extensive library of built-in operators
│ ├── core_text/ # Text processing, cleaning, and extraction
│ ├── core_vision/ # VLM and image reasoning
│ ├── code/ # Code synthesis and execution
│ ├── reasoning/ # Math and logical reasoning
│ └── ... # Specialized domains (RAG, PDF2VQA, etc.)
├── pipeline/ # Pipeline orchestration and execution logic
├── serving/ # LLM/VLM serving integrations
│ ├── llm_client.py # Unified LLM client with multimodal support
│ ├── agent/ # User simulator (v1 & v2)
│ └── api_*_serving_request.py # LLM/VLM API serving
├── utils/ # Storage, registry, and utility helpers
│ └── storage.py # DataSource, Storage, CacheStorage, Generators
└── ...
| Version | Date | Key Features |
|---|---|---|
| 1.0.15 | 2026-05-15 | Async removal, Multimodal/VLM support, Embedding API |
| 1.0.14 | 2026-05-12 | JSON Schema validation, UserSimulator V2 enhancements |
| 1.0.13 | 2026-05-11 | Operator improvements, Performance optimizations |
| 1.0.12 | 2026-05-08 | Checkpointing, S3 integration |
| 1.0.11 | 2026-05-04 | Generator data sources, LLM synthesis |
| 1.0.10 | 2026-04-29 | Partition parallelism, Scalability |
| 1.0.9 | 2026-04-22 | RAG operators, Retrieval |
| 1.0.8 | 2026-04-16 | GeneratorDataSource, Dynamic data |
| 1.0.7 | 2026-04-15 | Code synthesis, Execution |
| 1.0.6 | 2026-04-14 | Vision support, PDF2VQA |
| 1.0.5 | 2026-04-13 | Initial release |
ya-dataflow is an evolving ecosystem. We welcome contributions from the community to expand its operator library and performance capabilities. Please visit the main repository for contribution guidelines.
This project is licensed under the Apache-2.0 license.
- CHANGELOG.md - Complete version history and change details
- User Simulator Guide - Deep dive into agentic simulation
- VLM Integration Guide - Multimodal processing patterns
- Pipeline Best Practices - Production deployment tips
Built with ❤️ for the data-centric AI community