Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,11 @@ repos:
hooks:
- id: docformatter
args: [ "--in-place", "--wrap-descriptions", "119" ]
- repo: local
hooks:
- id: clear-jupyter-notebook-output
name: Clear Jupyter Notebook Output
entry: jupyter nbconvert --ClearOutputPreprocessor.enabled=True --inplace
language: system
files: \.ipynb$
types: [file]
372 changes: 372 additions & 0 deletions jupyter/layout-clustering/layout_index.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,372 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "6a86ea9b-beca-4324-b101-5684b3ed0955",
"metadata": {
"execution": {
"iopub.execute_input": "2025-05-19T02:28:51.522577Z",
"iopub.status.busy": "2025-05-19T02:28:51.522263Z",
"iopub.status.idle": "2025-05-19T02:29:06.232626Z",
"shell.execute_reply": "2025-05-19T02:29:06.231794Z",
"shell.execute_reply.started": "2025-05-19T02:28:51.522557Z"
}
},
"outputs": [],
"source": [
"from xinghe.spark import *\n",
"from app.common.json_util import *\n",
"from xinghe.s3 import *\n",
"from xinghe.s3.read import *\n",
"from xinghe.ops.spark import spark_resize_file\n",
"\n",
"import os\n",
"os.environ[\"LLM_WEB_KIT_CFG_PATH\"] = \"/xxx.jsonc\"\n",
"\n",
"from llm_web_kit.libs.standard_utils import compress_and_decompress_str\n",
"from llm_web_kit.html_layout.html_layout_cosin import cluster_html_struct, get_feature, similarity, sum_tags\n",
"\n",
"config = {\n",
" \"spark_conf_name\": \"spark_4\", # another value is \"spark_2\"\n",
" \"skip_success_check\": True,\n",
" \"spark.executorEnv.LLM_WEB_KIT_CFG_PATH\": \"/xxx.jsonc\",\n",
"}\n",
"\n",
"\n",
"\n",
"spark = new_spark_session(\"cc_dumps.layoutID.index\", config)\n",
"sc = spark.sparkContext\n",
"sc.setLogLevel(\"ERROR\")\n",
"\n",
"spark"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "505d3150-f7da-42e5-aa7f-4b3ba1f4ece1",
"metadata": {
"execution": {
"iopub.execute_input": "2025-05-19T02:29:06.235192Z",
"iopub.status.busy": "2025-05-19T02:29:06.234343Z",
"iopub.status.idle": "2025-05-19T02:29:06.256587Z",
"shell.execute_reply": "2025-05-19T02:29:06.255679Z",
"shell.execute_reply.started": "2025-05-19T02:29:06.235167Z"
}
},
"outputs": [],
"source": [
"import warnings\n",
"warnings.filterwarnings('ignore')\n",
"\n",
"import time\n",
"import uuid\n",
"import traceback\n",
"from datetime import datetime\n",
"from pyspark.sql.functions import struct, to_json, sum as _sum, collect_list, first\n",
"from pyspark.sql.types import StructType, StructField, IntegerType, StringType, LongType\n",
"\n",
"\n",
"TIMEOUT_SECONDS = 3600 * 5 # 超时时间5min\n",
"MAX_OUTPUT_ROW_SIZE = 1024 * 1024 * 1024 * 1.5\n",
"SIMILARITY_THRESHOLD = 0.95\n"
]
},
{
"cell_type": "markdown",
"id": "e5d1fa2e-56d7-474e-a4fa-774c0450aaaa",
"metadata": {},
"source": [
"# get layout data"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "5e4750e4-de1b-4793-8906-070fba595771",
"metadata": {
"execution": {
"iopub.execute_input": "2025-05-19T02:29:06.257906Z",
"iopub.status.busy": "2025-05-19T02:29:06.257511Z",
"iopub.status.idle": "2025-05-19T02:29:08.270976Z",
"shell.execute_reply": "2025-05-19T02:29:08.270313Z",
"shell.execute_reply.started": "2025-05-19T02:29:06.257886Z"
}
},
"outputs": [],
"source": [
"input_path = \"xxx\"\n",
"input_path_lst = [f for f in list(list_s3_objects(input_path, recursive=True)) if f.endswith(\".jsonl\")]\n",
"len(input_path_lst)"
]
},
{
"cell_type": "markdown",
"id": "61583c42-3884-4c53-9a54-6a0bab0084a4",
"metadata": {},
"source": [
"# 边读数据边生成index data"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "08109586-31dd-4ed8-afb4-580fb27889fe",
"metadata": {
"execution": {
"iopub.execute_input": "2025-05-19T02:30:39.588365Z",
"iopub.status.busy": "2025-05-19T02:30:39.588026Z",
"iopub.status.idle": "2025-05-19T02:30:39.597004Z",
"shell.execute_reply": "2025-05-19T02:30:39.596428Z",
"shell.execute_reply.started": "2025-05-19T02:30:39.588347Z"
}
},
"outputs": [],
"source": [
"def read_to_index(_iter):\n",
" # TODO 错误日志存放地址\n",
" error_log_path = f\"s3://xxx.jsonl\"\n",
" print(f\"error_log_path: {error_log_path}\")\n",
" s3_doc_writer = S3DocWriter(path=error_log_path)\n",
" error_info = None\n",
" for fpath in _iter:\n",
" current_layout_id = None\n",
" start_offset = None\n",
" layout_length = 0\n",
" idx = 0\n",
" print(f\"fpath:{fpath}\")\n",
" for row in read_s3_rows(fpath):\n",
" idx += 1\n",
" try:\n",
" detail_data = json_loads(row.value)\n",
" layout_id = detail_data[\"layout_id\"]\n",
" offset, length = map(int, row.loc.split(\"bytes=\")[-1].split(\",\"))\n",
" if layout_id == current_layout_id:\n",
" layout_length += length\n",
" continue\n",
" else:\n",
" if current_layout_id is not None:\n",
" print(f\"{current_layout_id} 该批数据批次结束, 总数据量为: {idx-1}\")\n",
" line = {\n",
" \"layout_id\": current_layout_id,\n",
" \"url_host_name\": detail_data[\"url_host_name\"],\n",
" \"count\": idx-1,\n",
" \"file\": {\n",
" \"filepath\": fpath,\n",
" \"offset\": start_offset,\n",
" \"length\": layout_length,\n",
" \"record_count\": idx-1,\n",
" \"timestamp\": int(time.time())\n",
" }\n",
" }\n",
" yield line\n",
" idx = 1\n",
" current_layout_id = layout_id\n",
" start_offset = offset\n",
" layout_length = 0\n",
" layout_length += length\n",
" print(f\"新批次数据: {current_layout_id}, start_offset: {start_offset}, layout_length: {layout_length}\")\n",
" except Exception as e:\n",
" error_info = {\n",
" \"error_type\": type(e).__name__,\n",
" \"error_message\": str(e),\n",
" \"traceback\": traceback.format_exc(),\n",
" \"input_data\": row.value if hasattr(row, 'value') else str(row),\n",
" \"timestamp\": datetime.now().isoformat()\n",
" }\n",
" s3_doc_writer.write(error_info)\n",
" continue\n",
" if current_layout_id is not None:\n",
" print(f\"last: {current_layout_id} 该批数据批次结束, 总数据量为: {idx}\")\n",
" line = {\n",
" \"layout_id\": current_layout_id,\n",
" \"url_host_name\": detail_data[\"url_host_name\"],\n",
" \"count\": idx,\n",
" \"file\": {\n",
" \"filepath\": fpath,\n",
" \"offset\": start_offset,\n",
" \"length\": layout_length,\n",
" \"record_count\": idx,\n",
" \"timestamp\": int(time.time())\n",
" }\n",
" }\n",
" yield line\n",
" if error_info:\n",
" s3_doc_writer.flush()\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "9c5c2ea1-6abf-48b0-a641-199655d503f9",
"metadata": {
"execution": {
"iopub.execute_input": "2025-05-19T02:30:41.317843Z",
"iopub.status.busy": "2025-05-19T02:30:41.317471Z",
"iopub.status.idle": "2025-05-19T02:41:42.046537Z",
"shell.execute_reply": "2025-05-19T02:41:42.045786Z",
"shell.execute_reply.started": "2025-05-19T02:30:41.317822Z"
}
},
"outputs": [],
"source": [
"schema = StructType([\n",
" StructField(\"layout_id\", StringType(), True),\n",
" StructField(\"url_host_name\", StringType(), True),\n",
" StructField(\"count\", LongType(), True),\n",
" StructField(\"file\", StructType([\n",
" StructField(\"filepath\", StringType(), True),\n",
" StructField(\"offset\", LongType(), True),\n",
" StructField(\"length\", LongType(), True),\n",
" StructField(\"record_count\", LongType(), True),\n",
" StructField(\"timestamp\", IntegerType(), True),\n",
" ]), True),\n",
"])\n",
"\n",
"page_content = sc.parallelize(input_path_lst, len(input_path_lst))\n",
"dump_html_df = page_content.mapPartitions(read_to_index).toDF(schema)"
]
},
{
"cell_type": "markdown",
"id": "d61f7baa-669f-43ed-8597-dbe17ffb6e6f",
"metadata": {},
"source": [
"# 基于layout_id合并index data"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4e1b3027-5dee-4d20-940a-202dac4ebef6",
"metadata": {
"execution": {
"iopub.execute_input": "2025-05-19T02:41:44.399492Z",
"iopub.status.busy": "2025-05-19T02:41:44.399160Z",
"iopub.status.idle": "2025-05-19T02:41:44.449736Z",
"shell.execute_reply": "2025-05-19T02:41:44.449197Z",
"shell.execute_reply.started": "2025-05-19T02:41:44.399471Z"
}
},
"outputs": [],
"source": [
"result_df = dump_html_df.groupBy(\"layout_id\") \\\n",
" .agg(\n",
" _sum(\"count\").alias(\"count\"),\n",
" collect_list(\"file\").alias(\"files\"),\n",
" first(\"url_host_name\").alias(\"url_host_name\")\n",
" )"
]
},
{
"cell_type": "markdown",
"id": "25bfdb8d-4619-4f0c-b89e-a34df9374cc5",
"metadata": {},
"source": [
"# write"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "1d3f65c8-c5ff-46ef-b2b3-745d9bc2e362",
"metadata": {
"execution": {
"iopub.execute_input": "2025-05-19T02:41:44.451242Z",
"iopub.status.busy": "2025-05-19T02:41:44.450882Z",
"iopub.status.idle": "2025-05-19T02:41:44.491377Z",
"shell.execute_reply": "2025-05-19T02:41:44.490822Z",
"shell.execute_reply.started": "2025-05-19T02:41:44.451226Z"
}
},
"outputs": [],
"source": [
"struct_col = struct(result_df[\"layout_id\"], result_df[\"count\"], result_df[\"files\"], result_df[\"url_host_name\"])\n",
"output_df = result_df.withColumn(\"value\", to_json(struct_col)).select(\"value\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3d932427-8114-40f8-b4e2-5e3ce27f067b",
"metadata": {
"execution": {
"iopub.execute_input": "2025-05-19T02:41:44.492105Z",
"iopub.status.busy": "2025-05-19T02:41:44.491953Z",
"iopub.status.idle": "2025-05-19T02:41:50.424779Z",
"shell.execute_reply": "2025-05-19T02:41:50.423950Z",
"shell.execute_reply.started": "2025-05-19T02:41:44.492091Z"
}
},
"outputs": [],
"source": [
"output_file_size_gb = 2\n",
"resize_func = spark_resize_file(output_file_size_gb)\n",
"new_output_df = resize_func(output_df)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6182dd36-ec74-4da8-aabe-0d349d4520bb",
"metadata": {
"execution": {
"iopub.execute_input": "2025-05-19T02:41:50.426028Z",
"iopub.status.busy": "2025-05-19T02:41:50.425686Z",
"iopub.status.idle": "2025-05-19T02:41:53.608152Z",
"shell.execute_reply": "2025-05-19T02:41:53.607485Z",
"shell.execute_reply.started": "2025-05-19T02:41:50.426007Z"
}
},
"outputs": [],
"source": [
"output_path = \"s3://xxx/\"\n",
"config[\"skip_output_version\"] = True\n",
"config['skip_output_check'] = True\n",
"\n",
"write_any_path(new_output_df, output_path, config)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f8c82872-9987-4482-96af-102dbaffb8e3",
"metadata": {
"execution": {
"iopub.execute_input": "2025-05-14T12:16:41.230560Z",
"iopub.status.busy": "2025-05-14T12:16:41.229645Z",
"iopub.status.idle": "2025-05-14T12:16:41.747053Z",
"shell.execute_reply": "2025-05-14T12:16:41.746232Z",
"shell.execute_reply.started": "2025-05-14T12:16:41.230534Z"
}
},
"outputs": [],
"source": [
"spark.stop()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python webkit_venv (ipykernel)",
"language": "python",
"name": "webkit_venv"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.9"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Loading