tool_monitor.py 13.7 KB
Newer Older
tinywell committed
1
import logging
2
from typing import Dict, Any, Optional, List
3 4
from pydantic import BaseModel, Field
from langchain_core.tools import BaseTool
5
from .http_tools import MonitorClient, RateClient
文靖昊 committed
6
from typing import Type
7
from ..utils.logger import get_logger
8
from .code import AreaCodeTool
9

10
code_tool = AreaCodeTool()
11 12 13 14 15
class MonitorPointResponse():
    """监测点查询结果"""
    status: str = Field(..., description="状态")
    message: str = Field(..., description="消息")
    points: List[Dict[str, Any]] = Field(..., description="监测点列表")
16 17 18

class MonitorPointArgs(BaseModel):
    """监测点查询参数"""
19
    key: str = Field(..., description="行政区划名称(省/市级别均可,只需要最后一级,如长沙市,不需要湖南省)")
20 21 22
    start_time: str = Field("", description="可选参数:仅当需要查询特定时间区间的监测点数据时才需要指定,默认为空")
    end_time: str = Field("", description="可选参数:仅当需要查询特定时间区间的监测点数据时才需要指定,默认为空")
    disaster_type: str = Field("", description="需要查询的灾害类型,如崩塌、滑坡、泥石流、地面塌陷、地面沉降、地裂缝等,默认为空")
tinywell committed
23 24
    three_d_model: str = Field("", description="是否需要三维模型,需要为有,不需要为无,默认为空")
    ortho_image: str = Field("", description="是否需要正射影像,需要为有,不需要为无,默认为空")
25 26 27 28
    disaster_threat_people_range_start: str = Field("", description="灾害威胁人数范围起始值,如100,默认为空")
    disaster_threat_people_range_end: str = Field("", description="灾害威胁人数范围结束值,如200,默认为空")
    disaster_scale_start: str = Field("", description="灾害规模范围起始值,灾害为崩塌、滑坡、泥石流时表示体积,灾害为地面塌陷、地面沉降时表示面积,为地裂缝时表示长度,默认为空")
    disaster_scale_end: str = Field("", description="灾害规模范围结束值,灾害为崩塌、滑坡、泥石流时表示体积,灾害为地面塌陷、地面沉降时表示面积,为地裂缝时表示长度,默认为空")
29
    warn_level: str = Field("", description="预警等级,1、2、3、4、5,1-4分别对应红色、橙色、黄色、蓝色,5为无预警,默认为空")
30 31
    device_type: str = Field("", description="设备类型或者传感器类型,当查询设备或传感器信息时需要(例如 加速度、位移、温度、湿度、裂缝计、雨量等),默认为空")
    
32 33 34

class MonitorPointTool(BaseTool):
    """查询监测点信息的工具"""
文靖昊 committed
35 36
    name:str = "monitor_points_query"
    description:str = """查询指定行政区划的监测点信息。
37
    可以查询任意省/市/区县级别的监测点数据,也可以通过灾害类型、灾害规模、灾害威胁人数范围、设备类型等条件查询。
38
    输入参数为行政区划名称,如:湖南省、长沙市、岳麓区等。
39
    返回该区域内的监测点列表,包含位置、经纬度、海拔、建设单位、监测单位、监测类型、有无三维模型、有无正射影像、威胁人数、规模等级等详细信息。
40
    还可以查询监测点下相关监测设备、传感器信息,比如设备数量、传感器数量等。
41
    """
文靖昊 committed
42
    args_schema: Type[BaseModel] = MonitorPointArgs
43
    client: Any = Field(None, exclude=True)
44
    logger: logging.Logger = Field(None, exclude=True)
45 46 47 48 49 50 51 52 53 54 55
    
    def __init__(self, base_url: str = "http://localhost:5001", **data):
        """
        初始化监测点查询工具
        
        Args:
            base_url: API服务器地址
            **data: 其他参数
        """
        super().__init__(**data)
        self.client = MonitorClient(base_url=base_url)
56
        self.logger = get_logger("MonitorPointTool")
57

58
    def _run(self, key: str, start_time: str = "", end_time: str = "", disaster_type: str = "", 
tinywell committed
59 60
             three_d_model: str = "", ortho_image: str = "", 
             disaster_threat_people_range_start: str = "", disaster_threat_people_range_end: str = "", 
61
             disaster_scale_start: str = "", disaster_scale_end: str = "", warn_level: str = "", device_type: str = "",
62
             query_type: str = "points") -> Dict[str, Any]:
63 64 65 66 67
        """
        执行监测点查询
        
        Args:
            key: 行政区划名称
68 69
            start_time: 开始时间
            end_time: 结束时间
70 71 72 73 74
            disaster_type: 灾害类型
            three_d_model: 是否需要三维模型
            ortho_image: 是否需要正射影像
            disaster_threat_people_range_start: 灾害威胁人数范围起始值
            disaster_threat_people_range_end: 灾害威胁人数范围结束值
tinywell committed
75 76
            disaster_scale_start: 灾害规模范围起始值
            disaster_scale_end: 灾害规模范围结束值
77
            warn_level: 预警等级
78
            device_type: 设备类型
79
            query_type: 查询类型
80 81 82
        Returns:
            Dict: 包含查询结果的字典
        """
83 84 85 86 87 88 89 90 91 92
        code = ""
        if key != "":
            self.logger.debug(f"查找区域代码: {key}")
            codes = code_tool.find_code(key)
            if codes is None or len(codes) == 0:
                error_msg = f'未找到匹配的区域代码: {key}'
                self.logger.warning(error_msg)
                return {'code': 400, 'message': error_msg}
            code = codes[0][1]
            self.logger.debug(f"找到区域代码: {code}")
93 94 95 96 97 98 99
        self.logger.info(f"查询地区: {key}, 区域代码: {code}" if key != "" else "查询全国监测点信息")
        if len(device_type) > 0:
            query_type = "2"
        if query_type == "2":
            return self._get_device_info(code, start_time, end_time, device_type)
        else:
            year = start_time.split("-")[0]
100
            return self._get_points_info(code, year, disaster_type, three_d_model, ortho_image, disaster_threat_people_range_start, disaster_threat_people_range_end, disaster_scale_start, disaster_scale_end, warn_level, device_type)
101 102

    def _get_device_info(self, code: str, start_time: str = "", end_time: str = "", device_type: str = ""):
103
        self.logger.info(f"开始查询设备信息,区域: {code}")
104

105 106 107
        try:
            response = self.client.query_device_and_sensor(code, start_time, end_time, device_type)
            self.logger.debug(f"API响应: {response}")
108
            if response.type != 1 or len(response.resultdata) == 0 or len(response.resultdata.get("DeviceList")) == 0:
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
                error_msg = f"查询失败: {response.message},请检查是否有相关数据权限"
                self.logger.warning(error_msg)
                return {
                    'code': 400,
                    'message': error_msg
                }
            self.logger.info(f"查询设备信息成功,设备数量: {len(response.resultdata['DeviceList'])}")
            devices_info = []
            for device in response.resultdata["DeviceList"]:
                devices_info.append({
                    "设备编号": device["DEVICECODE"],
                    "设备名称": device["DEVICETYPENAME"],
                    # "GUID": device["GUID"]
                })
            self.logger.debug(f"处理设备数据: {devices_info}")

            sensors_info = []
            for sensor in response.resultdata["SensorList"]:
                sensors_info.append({
                    "传感器编号": sensor["SENSORCODE"],
                    "传感器名称": sensor["DEVICETYPENAME"],
                    # "GUID": sensor["GUID"]
                })
            self.logger.debug(f"处理传感器数据: {sensors_info}")
            
            device_table_header = list(devices_info[0].keys())
            device_table_data = []
            for item in devices_info:
                device_table_data.append(list(item.values()))

            sensor_table_header = list(sensors_info[0].keys())
            sensor_table_data = []
            for item in sensors_info:
                sensor_table_data.append(list(item.values()))

            result = {
                'code': 200,
146
                'summary': f"找到{len(devices_info)}个设备信息, {len(sensors_info)}个传感器信息",
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
                # 'devices': {
                #     'table_header': device_table_header,
                #     'table_data': device_table_data
                # },
                # 'sensors': {
                #     'table_header': sensor_table_header,
                #     'table_data': sensor_table_data
                # }
            }
            return result
        except Exception as e:
            error_msg = f"查询失败: {str(e)}"
            self.logger.error(error_msg, exc_info=True)
            return {
                'code': 400,
                'message': error_msg
            }

    def _get_points_info(self, key: str, year: str = "", disaster_type: str = "", 
                         three_d_model: str = "", ortho_image: str = "", 
                         disaster_threat_people_range_start: str = "", disaster_threat_people_range_end: str = "", 
168
                         disaster_scale_start: str = "", disaster_scale_end: str = "", warn_level: str = "", device_type: str = ""):
169
        try:
170
            self.logger.info(f"开始查询监测点信息,区域: {key}")
171

tinywell committed
172 173 174
            response = self.client.query_points_sync(key, year, 
                disaster_type, three_d_model, ortho_image, 
                disaster_threat_people_range_start, disaster_threat_people_range_end, 
175
                disaster_scale_start, disaster_scale_end, warn_level, device_type)
176 177
            self.logger.debug(f"API响应: {response}")
            
tinywell committed
178
            if response.type != 1 or len(response.resultdata) == 0:
179 180
                error_msg = f"查询失败: {response.message},请检查是否有相关数据权限"
                self.logger.warning(error_msg)
tinywell committed
181 182
                return {
                    'code': 400,
183
                    'message': error_msg
tinywell committed
184
                }
185

tinywell committed
186
            # 提取关键信息并格式化            
187 188
            points_info = []
            for point in response.resultdata:
189
                point_data = {
tinywell committed
190 191 192
                    "监测点编号": f"{point['MONITORPOINTCODE']}" if point["MONITORPOINTCODE"] else "",
                    "监测点名称": f"{point['MONITORPOINTNAME']}" if point["MONITORPOINTNAME"] else "",
                    "地理位置": f"{point['LOCATION']}" if point["LOCATION"] else "",
193 194
                    "经度": f"{point['LONGITUDE']}" if point["LONGITUDE"] else "",
                    "纬度": f"{point['LATITUDE']}" if point["LATITUDE"] else "",
tinywell committed
195 196 197 198 199 200 201 202 203 204
                    "高程": f"{point['ELEVATION']}" if point["ELEVATION"] else "",
                    "监测责任部门": f"{point['MONITORUNIT']}" if point["MONITORUNIT"] else "",
                    "监测建设部门": f"{point['BUILDUNIT']}" if point["BUILDUNIT"] else "",
                    "监测运维部门": f"{point['YWUNIT']}" if point["YWUNIT"] else "",
                    "设备厂商": f"{point['MANUFACTURER']}" if point["MANUFACTURER"] else "",
                    "灾害类型": f"{point['MONITORTYPE']}" if point["MONITORTYPE"] else "",
                    "有无三维模型": f"{point['MODELEXIST']}" if point["MODELEXIST"] else "",
                    "有无正射影像": f"{point['DOMEXIST']}" if point["DOMEXIST"] else "",
                    "威胁人数": f"{point['THREATSPOPULATION']}" if point["THREATSPOPULATION"] else "",
                    "规模等级": f"{point['DISASTERSCALE']}" if point["DISASTERSCALE"] else ""
205
                }
tinywell committed
206 207 208 209 210
                if point.get("SGDW") or point.get("SGDW") != "null":
                    point_data["施工单位"] = point["SGDW"]
                if point.get("WARNLEVEL") or point.get("WARNLEVEL") != "null":
                    point_data["预警等级"] = point["WARNLEVEL"]

211
                points_info.append(point_data)
tinywell committed
212
                self.logger.debug(f"处理监测点数据: {point_data['监测点名称']} {point_data}")
213 214

            self.logger.info(f"成功获取 {len(points_info)} 个监测点数据")
tinywell committed
215
            # markdown = self.to_markdown(points_info)
216
            
217 218 219 220
            table_header = list(points_info[0].keys())
            table_data = []
            for item in points_info:
                table_data.append(list(item.values()))
221 222
            if len(table_data) > 200:
                table_data = table_data[:200]
223
                table_data.append(["..."])
224
            result = {
tinywell committed
225
                'code': 200,
226
                'summary': f"找到{len(points_info)}个满足条件的监测点信息",
227 228 229 230
                'points': {
                    'table_header': table_header,
                    'table_data': table_data
                },
tinywell committed
231
                # 'markdown': markdown
tinywell committed
232
            }
233 234
            self.logger.info("数据处理完成,返回结果")
            return result
235 236
            
        except Exception as e:
237 238
            error_msg = f"查询失败: {str(e)}"
            self.logger.error(error_msg, exc_info=True)
tinywell committed
239 240
            return {
                'code': 400,
241
                'message': error_msg
tinywell committed
242
            }
243

244 245 246 247 248 249 250 251 252 253 254 255
    def to_markdown(self, data: List[Dict[str, Any]]) -> str:
        """将数据转换为 markdown 表格"""
        self.logger.debug("开始生成 markdown 表格")
        markdown = """
| 序号 | 名称 | 位置 | 经度 | 纬度 | 海拔 | 建设单位 | 监测单位 | 监测类型 |
| --- | --- | --- | --- | --- | --- | --- | --- | --- |
"""
        for index, row in enumerate(data):
            markdown += f"| {index+1} | {row['名称']} | {row['位置']} | {row['经度']} | {row['纬度']} | {row['海拔']} | {row['建设单位']} | {row['监测单位']} | {row['监测类型']} | \n"
        
        self.logger.debug("markdown 表格生成完成")
        return markdown