Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions runtime/ops/mapper/data_quality_evaluator/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# data\_quality\_evaluator 算子

目录内容

- `operator_src/` DataMate 平台轻量算子源码。
- `service_patch/` 独立服务端评估接口相关代码。
- `example_input/` 手工联调输入样例。
- `test_cases/` 公开数据集来源说明、轻量评估样例和测试步骤。

## 开源模型链接

- 评估模型 `Qwen/Qwen2.5-7B-Instruct`: [https://huggingface.co/Qwen/Qwen2.5-7B-Instruct](https://huggingface.co/Qwen/Qwen2.5-7B-Instruct "https://huggingface.co/Qwen/Qwen2.5-7B-Instruct")

说明:数据质量评估使用 `Qwen2.5-7B-Instruct`。

## 独立服务部署

数据质量评估算子复用 `data_synthesis_service` 独立服务,但调用的是 `/evaluate-file` 接口。

依赖说明:

- `operator_src/requirements.txt` 是 DataMate 轻量算子依赖,只包含 HTTP 调用所需依赖,不包含 `vllm`。
- `service_patch/data_synthesis_service/requirements.txt` 是独立服务生产依赖。
- 服务基础镜像固定为 `quay.io/ascend/vllm-ascend:v0.18.0rc1`,对应 Python `3.11.14`、CANN `8.5.1`。
- 关键版本包括 `vllm==0.18.0+empty`、`vllm_ascend==0.18.0rc1`、`torch==2.9.0+cpu`、`torch_npu==2.9.0.post1+gitee7ba04`。
- `service_patch/data_synthesis_service/requirements-base.txt` 只用于无模型的接口冒烟测试,不用于正式验收推理。

推荐模型环境变量:

```bash
DATA_EVALUATOR_MODEL_PATH=/model/Qwen/Qwen2.5-7B-Instruct
DATA_EVALUATOR_BACKEND=vllm
```

`/model` 是容器内模型挂载点。验收方可把本机任意模型目录挂载到容器内 `/model`,或在平台参数 `evaluatorModelPath` 中改为其他容器内路径。

使用 `service_patch/data_synthesis_service/Dockerfile` 构建正式 NPU 服务时,默认已经使用 910b-jss 对标基础镜像和 `requirements.txt`。如要覆盖基础镜像,必须保证新镜像与 `quay.io/ascend/vllm-ascend:v0.18.0rc1` 的 CANN/Python/vLLM 版本一致。

## 如何生成 DataMate 上传包

压缩 `operator_src/` 目录中的全部文件,生成 `data_quality_evaluator.zip` 后上传 DataMate。

压缩包根目录应直接包含:

- `metadata.yml`
- `process.py`
- `__init__.py`
- `requirements.txt`
- `README.md`

`service_patch/`、`example_input/`、`test_cases/` 只用于服务部署和验收测试,不放入 DataMate 算子上传包。

## 平台测试

1. 启动带评估接口的独立服务,并确保 DataMate 运行环境能访问 `serviceUrl`。
2. 在 DataMate 算子市场上传按上述规则生成的上传包。
3. 新建任务,上传 `test_cases/example_input/public_eval_cases.json`。
4. 算子参数使用 `targetDimensions=accuracy,relevance,safety,diversity,completeness` 和 `evaluatorBackend=vllm`。
5. 运行任务并下载输出 JSON。
6. 按 `test_cases/README.md` 检查每条记录是否包含 5 个维度评分、理由和汇总信息。
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
[
{
"id": 1,
"type": "QA",
"content": {
"question": "青霉素过敏患者还能继续使用阿莫西林吗?",
"answer": "可以继续使用阿莫西林,无需担心过敏。"
}
},
{
"id": 2,
"type": "QA",
"content": {
"question": "高血压患者日常应如何进行家庭血压监测?",
"answer": "建议固定时间测量血压,记录结果,低盐饮食,规律运动,并按医嘱服药。"
}
},
{
"id": 3,
"type": "Preference",
"content": {
"question": "高血压患者日常应如何进行家庭血压监测?",
"chosen": "建议每天在固定时间测量血压,连续记录,配合低盐饮食、体重管理和按医嘱复诊。",
"rejected": "血压偶尔量一下就可以,不需要长期记录。",
"preference_reason": "chosen 方案更完整,更符合长期慢病管理要求。"
}
}
]
21 changes: 21 additions & 0 deletions runtime/ops/mapper/data_quality_evaluator/operator_src/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# data_quality_evaluator 算子源码

本目录是 DataMate 平台上传包中的算子源码。

## 功能

- 读取平台传入的一个输入文件。
- 将文件内容作为待评估 JSON 文本。
- 调用独立服务的 `/evaluate-file` 接口。
- 将服务返回的评估结果写成平台输出 JSON 文件。

## 关键参数

- `serviceUrl`
独立服务 HTTP 地址,默认使用容器网络服务名 `http://data-synthesis-service:18080`。
- `targetDimensions`
评估维度,默认 `accuracy,relevance,safety,diversity,completeness`。
- `evaluatorBackend`
评估后端,默认 `vllm`。
- `evaluatorModelPath`
评估模型在服务容器内的路径。
12 changes: 12 additions & 0 deletions runtime/ops/mapper/data_quality_evaluator/operator_src/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# -*- coding: utf-8 -*-

try:
from datamate.core.base_op import OPERATORS
except Exception: # pragma: no cover
OPERATORS = None

if OPERATORS is not None:
OPERATORS.register_module(
module_name="DataQualityEvaluatorMapper",
module_path="ops.user.data_quality_evaluator.process",
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
name: 'data_quality_evaluator'
description: 'Call the standalone data_synthesis HTTP service to evaluate generated data quality and export one JSON result file.'
language: 'python'
vendor: 'huawei'
raw_id: 'DataQualityEvaluatorMapper'
version: '1.0.0'
modal: 'text'
inputs: 'text'
outputs: 'text'
types:
- 'annotation'
release:
- 'Initial standalone-service wrapper for data quality evaluation.'
metrics:
- name: 'Output'
metric: '1 JSON evaluation file per input text file'
runtime:
memory: 1073741824
cpu: 0.5
gpu: 0
npu: 0
settings:
serviceUrl:
name: 'Service URL'
description: 'HTTP endpoint of the standalone data_synthesis service.'
type: 'input'
defaultVal: 'http://data-synthesis-service:18080'
required: true
targetDimensions:
name: 'Target Dimensions'
description: 'Comma-separated evaluation dimensions. Supported values: accuracy,relevance,safety,diversity,completeness.'
type: 'input'
defaultVal: 'accuracy,relevance,safety,diversity,completeness'
required: true
evaluatorModelPath:
name: 'Evaluator Model Path'
description: 'Dedicated model path for evaluation. Default uses Qwen2.5-7B-Instruct and does not affect data_synthesis generation model.'
type: 'input'
defaultVal: '/model/Qwen/Qwen2.5-7B-Instruct'
required: true
evaluatorBackend:
name: 'Evaluator Backend'
description: 'Evaluation backend. Use vllm for Qwen2.5-7B-Instruct on the standalone NPU service; rule is only for lightweight local diagnostics.'
type: 'input'
defaultVal: 'vllm'
required: true
includeSummary:
name: 'Include Summary'
description: 'Whether to include aggregate evaluation summary in the JSON response.'
type: 'switch'
defaultVal: 'true'
required: false
checkedLabel: 'true'
unCheckedLabel: 'false'
timeoutSec:
name: 'Timeout'
description: 'HTTP request timeout in seconds.'
type: 'input'
defaultVal: '600'
required: true
129 changes: 129 additions & 0 deletions runtime/ops/mapper/data_quality_evaluator/operator_src/process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import json
import os
from typing import Any, Dict, Iterable, List

import requests

try:
from datamate.core.base_op import Mapper
except Exception: # pragma: no cover
class Mapper: # type: ignore
def __init__(self, *args, **kwargs):
self.text_key = kwargs.get("text_key", "text")
self.filepath_key = kwargs.get("filePath_key", "filePath")
self.filename_key = kwargs.get("fileName_key", "fileName")
self.target_type_key = kwargs.get("target_type_key", "target_type")


DEFAULT_SERVICE_URL = "http://data-synthesis-service:18080"
DEFAULT_EVALUATOR_MODEL_PATH = "/model/Qwen/Qwen2.5-7B-Instruct"
DIMENSION_ALIASES = {
"accuracy": "准确性",
"relevance": "相关性",
"safety": "安全性",
"diversity": "多样性",
"completeness": "完整性",
"准确性": "准确性",
"相关性": "相关性",
"安全性": "安全性",
"多样性": "多样性",
"完整性": "完整性",
}
DEFAULT_DIMENSIONS = ["准确性", "相关性", "安全性", "多样性", "完整性"]


def _parse_dimensions(value: Any) -> List[str]:
if value is None or value == "":
return list(DEFAULT_DIMENSIONS)
if isinstance(value, str):
items = [item.strip() for item in value.split(",") if item.strip()]
else:
items = [str(item).strip() for item in value if str(item).strip()]

# DataMate may garble non-ASCII operator params into question marks.
if items and all(set(item) <= {"?"} for item in items):
return list(DEFAULT_DIMENSIONS)
Comment on lines +43 to +45

normalized = [DIMENSION_ALIASES.get(item.lower(), DIMENSION_ALIASES.get(item)) for item in items]
invalid = [item for item, mapped in zip(items, normalized) if mapped is None]
if invalid:
raise ValueError(f"Unsupported targetDimensions: {invalid}")
return [item for item in normalized if item] or list(DEFAULT_DIMENSIONS)


def _read_text_from_sample(sample: Dict[str, Any], text_key: str, filepath_key: str) -> str:
text = str(sample.get(text_key, "") or "").strip()
if text:
return text

file_path = sample.get(filepath_key)
if file_path and os.path.isfile(file_path):
with open(file_path, "r", encoding="utf-8") as file:
return file.read().strip()
return ""


def build_service_payload(
sample: Dict[str, Any],
target_dimensions: Iterable[str],
include_summary: bool,
evaluator_model_path: str,
evaluator_backend: str = "vllm",
text_key: str = "text",
filepath_key: str = "filePath",
filename_key: str = "fileName",
) -> Dict[str, Any]:
text = _read_text_from_sample(sample, text_key, filepath_key)
if not text:
raise ValueError("Input text is empty")
return {
"file_name": sample.get(filename_key, "input.json"),
"text": text,
"target_dimensions": list(target_dimensions),
"include_summary": include_summary,
"model_path": evaluator_model_path,
"backend": evaluator_backend,
}


def serialize_service_response(payload: Dict[str, Any]) -> str:
return json.dumps(payload, ensure_ascii=False, indent=2)


class DataQualityEvaluatorMapper(Mapper):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.service_url = str(kwargs.get("serviceUrl", DEFAULT_SERVICE_URL)).rstrip("/")
self.target_dimensions = _parse_dimensions(
kwargs.get("targetDimensions", "accuracy,relevance,safety,diversity,completeness")
)
self.evaluator_model_path = str(
kwargs.get("evaluatorModelPath", DEFAULT_EVALUATOR_MODEL_PATH)
).strip() or DEFAULT_EVALUATOR_MODEL_PATH
self.evaluator_backend = str(kwargs.get("evaluatorBackend", "vllm")).strip().lower() or "vllm"
self.include_summary = str(kwargs.get("includeSummary", "true")).lower() == "true"
self.timeout_sec = int(kwargs.get("timeoutSec", 600))

def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]:
payload = build_service_payload(
sample,
self.target_dimensions,
self.include_summary,
self.evaluator_model_path,
self.evaluator_backend,
text_key=self.text_key,
filepath_key=self.filepath_key,
filename_key=self.filename_key,
)
response = requests.post(
f"{self.service_url}/evaluate-file",
json=payload,
timeout=self.timeout_sec,
)
if response.status_code >= 400:
raise RuntimeError(
f"data_quality_evaluator service failed: {response.status_code} {response.text}"
)
sample[self.text_key] = serialize_service_response(response.json())
sample[self.target_type_key] = "json"
return sample
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
requests
Loading