import logging from typing import Dict, Any, Optional, List from pydantic import BaseModel, Field from langchain_core.tools import BaseTool from .http_tools import MonitorClient, RateClient from typing import Type from ..utils.logger import get_logger from .code import AreaCodeTool code_tool = AreaCodeTool() class MonitorPointResponse(): """监测点查询结果""" status: str = Field(..., description="状态") message: str = Field(..., description="消息") points: List[Dict[str, Any]] = Field(..., description="监测点列表") class MonitorPointArgs(BaseModel): """监测点查询参数""" key: str = Field(..., description="行政区划名称(省/市级别均可,只需要最后一级,如长沙市,不需要湖南省)") start_time: str = Field("", description="可选参数:仅当需要查询特定时间区间的监测点数据时才需要指定,默认为空") end_time: str = Field("", description="可选参数:仅当需要查询特定时间区间的监测点数据时才需要指定,默认为空") disaster_type: str = Field("", description="需要查询的灾害类型,如崩塌、滑坡、泥石流、地面塌陷、地面沉降、地裂缝等,默认为空") three_d_model: str = Field("", description="是否需要三维模型,需要为有,不需要为无,默认为空") ortho_image: str = Field("", description="是否需要正射影像,需要为有,不需要为无,默认为空") disaster_threat_people_range_start: str = Field("", description="灾害威胁人数范围起始值,如100,默认为空") disaster_threat_people_range_end: str = Field("", description="灾害威胁人数范围结束值,如200,默认为空") disaster_scale_start: str = Field("", description="灾害规模范围起始值,灾害为崩塌、滑坡、泥石流时表示体积,灾害为地面塌陷、地面沉降时表示面积,为地裂缝时表示长度,默认为空") disaster_scale_end: str = Field("", description="灾害规模范围结束值,灾害为崩塌、滑坡、泥石流时表示体积,灾害为地面塌陷、地面沉降时表示面积,为地裂缝时表示长度,默认为空") device_type: str = Field("", description="设备类型或者传感器类型,当查询设备或传感器信息时需要(例如 加速度、位移、温度、湿度、裂缝计、雨量等),默认为空") class MonitorPointTool(BaseTool): """查询监测点信息的工具""" name:str = "monitor_points_query" description:str = """查询指定行政区划的监测点信息。 可以查询任意省/市/区县级别的监测点数据,也可以通过灾害类型、灾害规模、灾害威胁人数范围、设备类型等条件查询。 输入参数为行政区划名称,如:湖南省、长沙市、岳麓区等。 返回该区域内的监测点列表,包含位置、经纬度等详细信息。 还可以查询监测点下相关监测设备、传感器信息,比如设备数量、传感器数量等。 """ args_schema: Type[BaseModel] = MonitorPointArgs client: Any = Field(None, exclude=True) logger: logging.Logger = Field(None, exclude=True) def __init__(self, base_url: str = "http://localhost:5001", **data): """ 初始化监测点查询工具 Args: base_url: API服务器地址 **data: 其他参数 """ super().__init__(**data) self.client = MonitorClient(base_url=base_url) self.logger = get_logger("MonitorPointTool") def _run(self, key: str, start_time: str = "", end_time: str = "", disaster_type: str = "", three_d_model: str = "", ortho_image: str = "", disaster_threat_people_range_start: str = "", disaster_threat_people_range_end: str = "", disaster_scale_start: str = "", disaster_scale_end: str = "", device_type: str = "", query_type: str = "points") -> Dict[str, Any]: """ 执行监测点查询 Args: key: 行政区划名称 start_time: 开始时间 end_time: 结束时间 disaster_type: 灾害类型 three_d_model: 是否需要三维模型 ortho_image: 是否需要正射影像 disaster_threat_people_range_start: 灾害威胁人数范围起始值 disaster_threat_people_range_end: 灾害威胁人数范围结束值 disaster_scale_start: 灾害规模范围起始值 disaster_scale_end: 灾害规模范围结束值 device_type: 设备类型 query_type: 查询类型 Returns: Dict: 包含查询结果的字典 """ if len(device_type) > 0: query_type = "2" if query_type == "2": return self._get_device_info(key, start_time, end_time, device_type) else: year = start_time.split("-")[0] return self._get_points_info(key, year, disaster_type, three_d_model, ortho_image, disaster_threat_people_range_start, disaster_threat_people_range_end, disaster_scale_start, disaster_scale_end, device_type) def _get_device_info(self, key: str, start_time: str = "", end_time: str = "", device_type: str = ""): self.logger.info(f"开始查询设备信息,区域: {key}") code = "" if key != "": self.logger.debug(f"查找区域代码: {key}") codes = code_tool.find_code(key) if codes is None or len(codes) == 0: error_msg = f'未找到匹配的区域代码: {key}' self.logger.warning(error_msg) return {'code': 400, 'message': error_msg} code = codes[0][1] self.logger.debug(f"找到区域代码: {code}") try: response = self.client.query_device_and_sensor(code, start_time, end_time, device_type) self.logger.debug(f"API响应: {response}") if response.type != 1 or len(response.resultdata) == 0: error_msg = f"查询失败: {response.message},请检查是否有相关数据权限" self.logger.warning(error_msg) return { 'code': 400, 'message': error_msg } self.logger.info(f"查询设备信息成功,设备数量: {len(response.resultdata['DeviceList'])}") devices_info = [] for device in response.resultdata["DeviceList"]: devices_info.append({ "设备编号": device["DEVICECODE"], "设备名称": device["DEVICETYPENAME"], # "GUID": device["GUID"] }) self.logger.debug(f"处理设备数据: {devices_info}") sensors_info = [] for sensor in response.resultdata["SensorList"]: sensors_info.append({ "传感器编号": sensor["SENSORCODE"], "传感器名称": sensor["DEVICETYPENAME"], # "GUID": sensor["GUID"] }) self.logger.debug(f"处理传感器数据: {sensors_info}") device_table_header = list(devices_info[0].keys()) device_table_data = [] for item in devices_info: device_table_data.append(list(item.values())) sensor_table_header = list(sensors_info[0].keys()) sensor_table_data = [] for item in sensors_info: sensor_table_data.append(list(item.values())) result = { 'code': 200, 'message': f"在{key}找到{len(devices_info)}个设备信息, {len(sensors_info)}个传感器信息", # 'devices': { # 'table_header': device_table_header, # 'table_data': device_table_data # }, # 'sensors': { # 'table_header': sensor_table_header, # 'table_data': sensor_table_data # } } return result except Exception as e: error_msg = f"查询失败: {str(e)}" self.logger.error(error_msg, exc_info=True) return { 'code': 400, 'message': error_msg } def _get_points_info(self, key: str, year: str = "", disaster_type: str = "", three_d_model: str = "", ortho_image: str = "", disaster_threat_people_range_start: str = "", disaster_threat_people_range_end: str = "", disaster_scale_start: str = "", disaster_scale_end: str = "", device_type: str = ""): try: self.logger.info(f"开始查询监测点信息,区域: {key}") response = self.client.query_points_sync(key, year, disaster_type, three_d_model, ortho_image, disaster_threat_people_range_start, disaster_threat_people_range_end, disaster_scale_start, disaster_scale_end, device_type) self.logger.debug(f"API响应: {response}") if response.type != 1 or len(response.resultdata) == 0: error_msg = f"查询失败: {response.message},请检查是否有相关数据权限" self.logger.warning(error_msg) return { 'code': 400, 'message': error_msg } # 提取关键信息并格式化 points_info = [] for point in response.resultdata: point_data = { "监测点编号": f"{point['MONITORPOINTCODE']}" if point["MONITORPOINTCODE"] else "", "监测点名称": f"{point['MONITORPOINTNAME']}" if point["MONITORPOINTNAME"] else "", "地理位置": f"{point['LOCATION']}" if point["LOCATION"] else "", "经度": f"{point['LONGITUDE']}" if point["LONGITUDE"] else "", "纬度": f"{point['LATITUDE']}" if point["LATITUDE"] else "", "高程": f"{point['ELEVATION']}" if point["ELEVATION"] else "", "监测责任部门": f"{point['MONITORUNIT']}" if point["MONITORUNIT"] else "", "监测建设部门": f"{point['BUILDUNIT']}" if point["BUILDUNIT"] else "", "监测运维部门": f"{point['YWUNIT']}" if point["YWUNIT"] else "", "设备厂商": f"{point['MANUFACTURER']}" if point["MANUFACTURER"] else "", "灾害类型": f"{point['MONITORTYPE']}" if point["MONITORTYPE"] else "", "有无三维模型": f"{point['MODELEXIST']}" if point["MODELEXIST"] else "", "有无正射影像": f"{point['DOMEXIST']}" if point["DOMEXIST"] else "", "威胁人数": f"{point['THREATSPOPULATION']}" if point["THREATSPOPULATION"] else "", "规模等级": f"{point['DISASTERSCALE']}" if point["DISASTERSCALE"] else "" } if point.get("SGDW") or point.get("SGDW") != "null": point_data["施工单位"] = point["SGDW"] if point.get("WARNLEVEL") or point.get("WARNLEVEL") != "null": point_data["预警等级"] = point["WARNLEVEL"] points_info.append(point_data) self.logger.debug(f"处理监测点数据: {point_data['监测点名称']} {point_data}") self.logger.info(f"成功获取 {len(points_info)} 个监测点数据") # markdown = self.to_markdown(points_info) table_header = list(points_info[0].keys()) table_data = [] for item in points_info: table_data.append(list(item.values())) result = { 'code': 200, 'message': f"在{key}找到{len(points_info)}个监测点信息", 'points': { 'table_header': table_header, 'table_data': table_data }, # 'markdown': markdown } self.logger.info("数据处理完成,返回结果") return result except Exception as e: error_msg = f"查询失败: {str(e)}" self.logger.error(error_msg, exc_info=True) return { 'code': 400, 'message': error_msg } def to_markdown(self, data: List[Dict[str, Any]]) -> str: """将数据转换为 markdown 表格""" self.logger.debug("开始生成 markdown 表格") markdown = """ | 序号 | 名称 | 位置 | 经度 | 纬度 | 海拔 | 建设单位 | 监测单位 | 监测类型 | | --- | --- | --- | --- | --- | --- | --- | --- | --- | """ for index, row in enumerate(data): markdown += f"| {index+1} | {row['名称']} | {row['位置']} | {row['经度']} | {row['纬度']} | {row['海拔']} | {row['建设单位']} | {row['监测单位']} | {row['监测类型']} | \n" self.logger.debug("markdown 表格生成完成") return markdown