Commit da717251 by tinywell

工具数据单独处理为 markdown;增加日志便于排查

parent 45d5cb3a
from .utils.logger import setup_logging
# 设置日志配置
setup_logging()
from typing import Dict, Any, Optional,List
from typing import Dict, Any, Optional, List
from pydantic import BaseModel, Field
from langchain_core.tools import BaseTool
from .http_tools import MonitorClient, RateClient
from typing import Type
from ..utils.logger import get_logger
class MonitorPointResponse():
"""监测点查询结果"""
status: str = Field(..., description="状态")
......@@ -23,6 +25,7 @@ class MonitorPointTool(BaseTool):
"""
args_schema: Type[BaseModel] = MonitorPointArgs
client: Any = Field(None, exclude=True)
logger: logging.Logger = Field(None, exclude=True)
def __init__(self, base_url: str = "http://localhost:5001", **data):
"""
......@@ -34,6 +37,7 @@ class MonitorPointTool(BaseTool):
"""
super().__init__(**data)
self.client = MonitorClient(base_url=base_url)
self.logger = get_logger("MonitorPointTool")
def _run(self, key: str) -> Dict[str, Any]:
"""
......@@ -46,39 +50,52 @@ class MonitorPointTool(BaseTool):
Dict: 包含查询结果的字典
"""
try:
print(f"进入 monitor_points_query 工具, 查询监测点信息: {key}")
self.logger.info(f"开始查询监测点信息,区域: {key}")
response = self.client.query_points_sync(key)
print(f"查询结果: {response}")
self.logger.debug(f"API响应: {response}")
if response.type != 1 or len(response.resultdata) == 0:
error_msg = f"查询失败: {response.message},请检查是否有相关数据权限"
self.logger.warning(error_msg)
return {
'code': 400,
'message': f"查询失败: {response.message},请检查是否有相关数据权限"
'message': error_msg
}
# 提取关键信息并格式化
points_info = []
for point in response.resultdata:
points_info.append({
"名称": point["MONITORPOINTNAME"] if point["MONITORPOINTNAME"] else "",
"位置": point["LOCATION"] if point["LOCATION"] else "",
"经度": point["LONGITUDE"] if point["LONGITUDE"] else "",
"纬度": point["LATITUDE"] if point["LATITUDE"] else "",
"海拔": point["ELEVATION"] if point["ELEVATION"] else "",
"建设单位": point["BUILDUNIT"] if point["BUILDUNIT"] else "",
"监测单位": point["MONITORUNIT"] if point["MONITORUNIT"] else "",
"监测类型": point["MONITORTYPE"] if point["MONITORTYPE"] else ""
})
point_data = {
"名称": f"{point['MONITORPOINTNAME']}" if point["MONITORPOINTNAME"] else "",
"位置": f"{point['LOCATION']}" if point["LOCATION"] else "",
"经度": f"{point['LONGITUDE']}" if point["LONGITUDE"] else "",
"纬度": f"{point['LATITUDE']}" if point["LATITUDE"] else "",
"海拔": f"{point['ELEVATION']}" if point["ELEVATION"] else "",
"建设单位": f"{point['BUILDUNIT']}" if point["BUILDUNIT"] else "",
"监测单位": f"{point['MONITORUNIT']}" if point["MONITORUNIT"] else "",
"监测类型": f"{point['MONITORTYPE']}" if point["MONITORTYPE"] else ""
}
points_info.append(point_data)
self.logger.debug(f"处理监测点数据: {point_data['名称']} {point_data}")
self.logger.info(f"成功获取 {len(points_info)} 个监测点数据")
markdown = self.to_markdown(points_info)
return {
result = {
'code': 200,
'message': f"在{key}找到{len(points_info)}个监测点",
'points': points_info
'points': points_info,
'markdown': markdown
}
self.logger.info("数据处理完成,返回结果")
return result
except Exception as e:
error_msg = f"查询失败: {str(e)}"
self.logger.error(error_msg, exc_info=True)
return {
'code': 400,
'message': f"查询失败: {str(e)}"
'message': error_msg
}
def _arun(self, key: str) -> Dict[str, Any]:
......@@ -91,4 +108,18 @@ class MonitorPointTool(BaseTool):
Returns:
Dict: 包含查询结果的字典
"""
raise NotImplementedError("异步查询暂未实现")
\ No newline at end of file
self.logger.warning("异步查询方法未实现")
raise NotImplementedError("异步查询暂未实现")
def to_markdown(self, data: List[Dict[str, Any]]) -> str:
"""将数据转换为 markdown 表格"""
self.logger.debug("开始生成 markdown 表格")
markdown = """
| 序号 | 名称 | 位置 | 经度 | 纬度 | 海拔 | 建设单位 | 监测单位 | 监测类型 |
| --- | --- | --- | --- | --- | --- | --- | --- | --- |
"""
for index, row in enumerate(data):
markdown += f"| {index+1} | {row['名称']} | {row['位置']} | {row['经度']} | {row['纬度']} | {row['海拔']} | {row['建设单位']} | {row['监测单位']} | {row['监测类型']} | \n"
self.logger.debug("markdown 表格生成完成")
return markdown
\ No newline at end of file
......@@ -185,10 +185,9 @@ class RateAgentV3:
def run(self, input: str):
picker_result = self.picker.pick(input)
res = self.runner.run(input, picker_result["tool"], picker_result["params"])
output = f"相关数据如下:\n{res['table']}\n\n{res['output']}"
return {
"input": input,
"output": res.content,
"tool": picker_result["tool"],
"params": picker_result["params"]
"output": output
}
......@@ -5,7 +5,7 @@ from langchain_core.prompts import PromptTemplate,ChatPromptTemplate,SystemMessa
from langchain.tools.render import render_text_description_and_args
from langchain_core.output_parsers import JsonOutputParser as JSONOutputParser
from langchain_core.tools import BaseTool
from ..utils.logger import get_logger
PICKER_SYSTEM_PROMPT = """
你是一个智能工具选择助手,你需要根据用户的问题选择最合适的工具,并提取出工具所需的参数。
......@@ -38,6 +38,8 @@ class ToolPicker:
def __init__(self, llm, tools: List):
self.tools = tools
self.llm = llm
self.logger = get_logger("ToolPicker")
date_now = datetime.now().strftime("%Y-%m-%d")
picker_human = f"今天是{date_now}\n\n{PICKER_HUMAN_PROMPT}"
prompt = ChatPromptTemplate.from_messages([
......@@ -50,9 +52,14 @@ class ToolPicker:
self.chain = prompt | self.llm | JSONOutputParser()
def pick(self, input: str):
print(input)
return self.chain.invoke({"input": input})
self.logger.info(f"Received input: {input}")
try:
result = self.chain.invoke({"input": input})
self.logger.info(f"Selected tool: {result['tool']} with params: {result['params']}")
return result
except Exception as e:
self.logger.error(f"Error picking tool: {str(e)}")
raise
RUNNER_SYSTEM_PROMPT = """
你是一个擅长根据工具执行结果回答用户问题的助手。
......@@ -62,10 +69,6 @@ RUNNER_SYSTEM_PROMPT = """
2. 根据用户的问题,解读工具执行结果,进行简要的分析说明
3. 返回用户问题的答案
请遵循以下规则:
- 工具执行结果中的数据必须使用 markdown 表格展示
- 确保数据的完整性, 不要遗漏数据
- 表格中的数据只能来源于工具执行结果
"""
......@@ -79,6 +82,7 @@ class ToolRunner:
def __init__(self, llm, tools: Dict[str, BaseTool]):
self.tools = tools
self.llm = llm
self.logger = get_logger("ToolRunner")
prompt = ChatPromptTemplate.from_messages([
SystemMessagePromptTemplate.from_template(RUNNER_SYSTEM_PROMPT),
......@@ -86,11 +90,34 @@ class ToolRunner:
])
self.chain = prompt | self.llm
def run(self, input: str, tool_name: str, params: Dict):
self.logger.info(f"Running tool '{tool_name}' with params: {params}")
if tool_name not in self.tools:
self.logger.error(f"Tool {tool_name} not found")
raise ValueError(f"Tool {tool_name} not found")
tool = self.tools[tool_name]
result = tool.invoke(params)
return self.chain.invoke({"input": input, "result": result})
try:
tool = self.tools[tool_name]
self.logger.info(f"Invoking tool {tool_name}")
result = tool.invoke(params)
self.logger.debug(f"Tool result: {result}")
table = ""
if "markdown" in result:
table = result["markdown"]
del result["markdown"]
self.logger.info("Getting LLM interpretation")
llm_result = self.chain.invoke({"input": input, "result": result})
response = {
"output": llm_result.content,
"table": table
}
self.logger.info("Tool execution completed successfully")
return response
except Exception as e:
self.logger.error(f"Error running tool: {str(e)}")
raise
import logging
import os
from datetime import datetime
def setup_logging(log_level=logging.INFO, log_dir="logs"):
"""
设置统一的日志配置
Args:
log_level: 日志级别,默认为 INFO
log_dir: 日志文件目录,默认为 logs
"""
# 创建日志目录
if not os.path.exists(log_dir):
os.makedirs(log_dir)
# 生成日志文件名,包含日期
log_file = os.path.join(log_dir, f"app_{datetime.now().strftime('%Y%m%d')}.log")
# 配置根日志记录器
logging.basicConfig(
level=log_level,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
# 输出到控制台
logging.StreamHandler(),
# 输出到文件
logging.FileHandler(log_file, encoding='utf-8')
]
)
# 设置第三方库的日志级别
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.info(f"日志配置完成,日志文件: {log_file}")
def get_logger(name):
"""
获取指定名称的日志记录器
Args:
name: 日志记录器名称
Returns:
logging.Logger: 日志记录器实例
"""
return logging.getLogger(name)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment