Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 44 additions & 30 deletions fastdeploy/engine/common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,17 +509,22 @@ def insert_tasks(self, tasks: List[Request], current_id=-1):
if not is_decode:
self.llm_logger.info(f"Tasks are sent to engine, req_ids={req_ids}")
for task in tasks:
task.metrics.inference_start_time = time.time()
tracing.trace_report_span(
tracing.TraceSpanName.SCHEDULE,
task.request_id.split("_")[0],
int(task.metrics.scheduler_recv_req_time * 1e9),
int(task.metrics.inference_start_time * 1e9),
thread_finish_flag=True,
)
trace_print(LoggingEventName.RESOURCE_ALLOCATE_END, task.request_id, getattr(task, "user", ""))
trace_print(LoggingEventName.REQUEST_SCHEDULE_END, task.request_id, getattr(task, "user", ""))
trace_print(LoggingEventName.INFERENCE_START, task.request_id, getattr(task, "user", ""))
if not getattr(task, "has_been_preempted_before", False):
task.metrics.inference_start_time = time.time()
tracing.trace_report_span(
tracing.TraceSpanName.SCHEDULE,
task.request_id.split("_")[0],
int(task.metrics.scheduler_recv_req_time * 1e9),
int(task.metrics.inference_start_time * 1e9),
thread_finish_flag=True,
)
trace_print(LoggingEventName.RESOURCE_ALLOCATE_END, task.request_id, getattr(task, "user", ""))
trace_print(LoggingEventName.REQUEST_SCHEDULE_END, task.request_id, getattr(task, "user", ""))
trace_print(LoggingEventName.INFERENCE_START, task.request_id, getattr(task, "user", ""))
else:
trace_print(
LoggingEventName.RESCHEDULED_INFERENCE_START, task.request_id, getattr(task, "user", "")
)
if not is_prefill:
if not self.cfg.model_config.enable_mm:
self.update_requests_chunk_size(tasks)
Expand Down Expand Up @@ -1022,28 +1027,37 @@ def _fetch_request():
for task in tasks:
if task.task_type == RequestType.PREFILL:
rid = task.request_id.split("_")[0]
trace_carrier = task.trace_carrier
tracing.trace_set_proc_propagate_context(rid, trace_carrier)
trace_carrier = tracing.trace_get_proc_propagate_context(rid)
task.trace_carrier = trace_carrier
tracing.trace_report_span(
tracing.TraceSpanName.SCHEDULE,
rid,
int(task.metrics.scheduler_recv_req_time * 1e9),
int(time.time() * 1e9),
thread_finish_flag=True,
)
trace_print(
LoggingEventName.RESOURCE_ALLOCATE_END, task.request_id, getattr(task, "user", "")
)
trace_print(
LoggingEventName.REQUEST_SCHEDULE_END, task.request_id, getattr(task, "user", "")
)
trace_print(LoggingEventName.INFERENCE_START, task.request_id, getattr(task, "user", ""))
if isinstance(task, Request) and task.has_been_preempted_before:
trace_print(
LoggingEventName.RESCHEDULED_INFERENCE_START,
task.request_id,
getattr(task, "user", ""),
)
else:
trace_carrier = task.trace_carrier
tracing.trace_set_proc_propagate_context(rid, trace_carrier)
trace_carrier = tracing.trace_get_proc_propagate_context(rid)
task.trace_carrier = trace_carrier
tracing.trace_report_span(
tracing.TraceSpanName.SCHEDULE,
rid,
int(task.metrics.scheduler_recv_req_time * 1e9),
int(time.time() * 1e9),
thread_finish_flag=True,
)
trace_print(
LoggingEventName.RESOURCE_ALLOCATE_END, task.request_id, getattr(task, "user", "")
)
trace_print(
LoggingEventName.REQUEST_SCHEDULE_END, task.request_id, getattr(task, "user", "")
)
trace_print(
LoggingEventName.INFERENCE_START, task.request_id, getattr(task, "user", "")
)
if isinstance(task, Request):
if self.cfg.scheduler_config.splitwise_role == "decode":
task.metrics.decode_inference_start_time = time.time()
else:
elif not task.has_been_preempted_before:
task.metrics.inference_start_time = time.time()
self.engine_worker_queue.put_tasks((tasks, self.resource_manager.real_bsz))

Expand Down
2 changes: 2 additions & 0 deletions fastdeploy/engine/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ def __init__(
# status
self.status = RequestStatus.WAITING
self.task_type = RequestType.PREFILL
self.has_been_preempted_before = False
self.idx = None
self.need_prefill_tokens = self.prompt_token_ids_len
self.audio_output_token_ids = []
Expand Down Expand Up @@ -873,6 +874,7 @@ class RequestMetrics:
storage_cache_token_num: Optional[int] = 0
cpu_cache_prepare_time: Optional[float] = None
storage_cache_prepare_time: Optional[float] = None
preempted_count: int = 0

def __post_init__(self):
if self.arrival_time is None:
Expand Down
29 changes: 19 additions & 10 deletions fastdeploy/engine/sched/resource_manager_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
from fastdeploy.metrics.metrics import main_process_metrics
from fastdeploy.multimodal.hasher import MultimodalHasher
from fastdeploy.platforms import current_platform
from fastdeploy.trace.constants import LoggingEventName
from fastdeploy.trace.trace_logger import print as trace_print
from fastdeploy.utils import download_from_bos, init_bos_client, llm_logger


Expand Down Expand Up @@ -246,6 +248,8 @@ def reschedule_preempt_task(self, request_id, process_func=None):
llm_logger.debug(f"reschedule {request_id} into waiting queue")
if request_id in self.to_be_rescheduled_request_id_set and request_id in self.requests:
request = self.requests[request_id]
request.has_been_preempted_before = True
request.metrics.preempted_count += 1
if process_func is not None:
process_func(request)
llm_logger.debug(f"self.waiting append request:{request.request_id},req.type:{request.status}")
Expand Down Expand Up @@ -284,6 +288,7 @@ def preempted_all(self):
self._free_blocks(req)
req.cached_block_num = 0
self.to_be_rescheduled_request_id_set.add(req.request_id)
trace_print(LoggingEventName.PREEMPTED, req.request_id, getattr(req, "user", ""))
preempted_reqs.append(self._prepare_preempt_task(req))
return preempted_reqs

Expand Down Expand Up @@ -329,6 +334,9 @@ def _trigger_preempt(self, request, num_new_blocks, preempted_reqs, scheduled_re
self._free_blocks(preempted_req)
preempted_req.num_cached_blocks = 0
self.to_be_rescheduled_request_id_set.add(preempted_req.request_id)
trace_print(
LoggingEventName.PREEMPTED, preempted_req.request_id, getattr(preempted_req, "user", "")
)
llm_logger.info(f"Preemption is triggered! Preempted request id: {preempted_req.request_id}")
preempted_reqs.append(preempted_req)
scheduled_reqs.append(self._prepare_preempt_task(preempted_req))
Expand Down Expand Up @@ -1189,16 +1197,17 @@ def get_prefix_cached_blocks(self, request: Request):
elif common_block_ids[block_idx] in metrics["match_storage_block_ids"]:
metrics["storage_match_token_num"] -= self.config.cache_config.block_size

request.metrics.gpu_cache_token_num = metrics["gpu_match_token_num"]
request.metrics.cpu_cache_token_num = metrics["cpu_match_token_num"]
request.metrics.storage_cache_token_num = metrics["storage_match_token_num"]
request.metrics.cpu_cache_prepare_time = metrics["cpu_cache_prepare_time"]
request.metrics.storage_cache_prepare_time = metrics["storage_cache_prepare_time"]

# Report the number of cached tokens to Prometheus metrics
main_process_metrics.prefix_cache_token_num.inc(request.num_computed_tokens)
main_process_metrics.prefix_gpu_cache_token_num.inc(request.metrics.gpu_cache_token_num)
main_process_metrics.prefix_cpu_cache_token_num.inc(request.metrics.cpu_cache_token_num)
if not request.has_been_preempted_before:
# NOTE: Do not log or report metrics for cache hit rate when request is being rescheduled
request.metrics.gpu_cache_token_num = metrics["gpu_match_token_num"]
request.metrics.cpu_cache_token_num = metrics["cpu_match_token_num"]
request.metrics.storage_cache_token_num = metrics["storage_match_token_num"]
request.metrics.cpu_cache_prepare_time = metrics["cpu_cache_prepare_time"]
request.metrics.storage_cache_prepare_time = metrics["storage_cache_prepare_time"]

main_process_metrics.prefix_cache_token_num.inc(request.num_computed_tokens)
main_process_metrics.prefix_gpu_cache_token_num.inc(request.metrics.gpu_cache_token_num)
main_process_metrics.prefix_cpu_cache_token_num.inc(request.metrics.cpu_cache_token_num)

return True
except Exception as e:
Expand Down
9 changes: 6 additions & 3 deletions fastdeploy/output/token_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,8 @@ def _process_per_token(self, task, batch_id: int, token_ids: np.ndarray, result:
f"Request={task_id}, InputToken={task.prompt_token_ids_len}, "
f"CachedDetail={cached_detail}, OutputToken={self.tokens_counter[task_id]}, "
f"TokenRatio={token_ratio:.2f}, TTFT={ttft:.2f}, "
f"E2E={e2e_time:.2f}, IsPrefill={is_prefill}, RecoveryStop={recovery_stop}"
f"E2E={e2e_time:.2f}, IsPrefill={is_prefill}, RecoveryStop={recovery_stop}, "
f"PreemptedCount={getattr(task.metrics, 'preempted_count', 0)}"
)

main_process_metrics.request_token_ratio.observe(token_ratio)
Expand Down Expand Up @@ -943,11 +944,13 @@ def _process_batch_output(self):

# Print combined log with all required information
ttft = task.metrics.first_token_time if task.metrics.first_token_time else 0
ttft_s = ttft + task.metrics.time_in_queue
llm_logger.info(
f"Request={task_id}, InputToken={task.prompt_token_ids_len}, "
f"CachedDetail={cached_detail}, OutputToken={self.tokens_counter[task_id]}, "
f"TokenRatio={token_ratio:.2f}, TTFT={ttft:.2f}, "
f"E2E={e2e_time:.2f}, IsPrefill={is_prefill}, RecoveryStop={recovery_stop}"
f"TokenRatio={token_ratio:.2f}, TTFT={ttft:.2f}, TTFT_S={ttft_s:.2f}, "
f"E2E={e2e_time:.2f}, IsPrefill={is_prefill}, RecoveryStop={recovery_stop}, "
f"PreemptedCount={getattr(task.metrics, 'preempted_count', 0)}"
)

main_process_metrics.request_token_ratio.observe(token_ratio)
Expand Down
4 changes: 4 additions & 0 deletions fastdeploy/trace/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class LoggingEventName(Enum):
INFERENCE_END = "INFERENCE_END"
POSTPROCESSING_START = "POSTPROCESSING_START"
POSTPROCESSING_END = "POSTPROCESSING_END"
PREEMPTED = "PREEMPTED"
RESCHEDULED_INFERENCE_START = "RESCHEDULED_INFERENCE_START"


class StageName(Enum):
Expand All @@ -60,6 +62,8 @@ class StageName(Enum):
LoggingEventName.INFERENCE_START: StageName.PREFILL,
LoggingEventName.FIRST_TOKEN_GENERATED: StageName.PREFILL,
LoggingEventName.DECODE_START: StageName.DECODE,
LoggingEventName.PREEMPTED: StageName.DECODE,
LoggingEventName.RESCHEDULED_INFERENCE_START: StageName.DECODE,
LoggingEventName.INFERENCE_END: StageName.DECODE,
LoggingEventName.POSTPROCESSING_START: StageName.POSTPROCESSING,
LoggingEventName.POSTPROCESSING_END: StageName.POSTPROCESSING,
Expand Down
Loading