-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathasync_batch.py
More file actions
41 lines (30 loc) · 1.17 KB
/
Copy pathasync_batch.py
File metadata and controls
41 lines (30 loc) · 1.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
"""Async fan-out: score N responses concurrently with bounded concurrency."""
from __future__ import annotations
import asyncio
from collections.abc import Sequence
from latence import AsyncLatence
async def score_one(client: AsyncLatence, sample: dict) -> dict:
result = await client.grounding.rag(**sample)
return {
"id": sample.get("id"),
"risk_band": result.risk_band.value,
"groundedness_v2": result.scores.groundedness_v2,
}
async def main(samples: Sequence[dict], concurrency: int = 8) -> list[dict]:
sem = asyncio.Semaphore(concurrency)
async def gated(sample: dict) -> dict:
async with sem:
return await score_one(client, sample)
async with AsyncLatence() as client:
return await asyncio.gather(*(gated(s) for s in samples))
if __name__ == "__main__":
SAMPLES = [
{
"id": i,
"query": "What is the speed of light in vacuum?",
"response_text": "The speed of light is 299,792,458 m/s.",
"raw_context": "The speed of light in vacuum is exactly 299,792,458 m/s.",
}
for i in range(10)
]
print(asyncio.run(main(SAMPLES)))