""" 不同时间段不同地区预警处置和虚警情况 1.地区处置率数量和占比统计 2.地区虚警率数量和占比统计 3.地区蓝黄橙红数数量和占比统计 """ from pydantic import BaseModel, Field from typing import Any, Dict, Type import logging from langchain_core.tools import BaseTool from .http_tools import WarningClient, const_base_url from .code import AreaCodeTool from ..utils.logger import get_logger code_tool = AreaCodeTool() class WarningArgs(BaseModel): """预警查询参数""" start_time: str = Field("", description="开始时间 (YYYY-MM-DD HH:mm:ss)") end_time: str = Field("", description="结束时间 (YYYY-MM-DD HH:mm:ss)") region_name: str = Field("", description="地区名称,如果要查询全国数据,请输入空字符串") query_type: str = Field("1", description="查询类型,1 表示查询一段时间内的综合数据,2 表示查询指定年份全年按月度统计的虚警、处置等信息") class WarningTool(BaseTool): """查询预警处置和虚警情况""" name: str = "warning_statistics" description: str = "查询一定时间范围内不同地区预警处置和虚警情况,包括处置情况、虚警情况、蓝黄橙红数数量等统计。也支持查询指定年份全年按月度统计的虚警情况,处置情况等信息。" args_schema: Type[BaseModel] = WarningArgs client: Any = Field(None, exclude=True) logger: logging.Logger = Field(None, exclude=True) def __init__(self, base_url: str = const_base_url, **data): super().__init__(**data) self.client = WarningClient(base_url=base_url) self.logger = get_logger("WarningTool") self.logger.info(f"初始化 WarningTool,base_url: {base_url}") def _run(self, start_time: str, end_time: str, region_name: str="", query_type: str="1") -> Dict[str, Any]: code = "" if region_name != "": self.logger.debug(f"查找区域代码: {region_name}") codes = code_tool.find_code(region_name) if codes is None or len(codes) == 0: error_msg = f'未找到匹配的区域代码: {region_name}' self.logger.warning(error_msg) return {'code': 400, 'message': error_msg} code = codes[0][1] self.logger.debug(f"找到区域代码: {code}") year = start_time.split("-")[0] if query_type == "1": detail = self.get_warning_statistics(start_time, end_time, region_name, code) return detail else: monthly = self.get_warning_statistics_of_month(year, region_name, code) return monthly def get_warning_statistics(self, start_time: str, end_time: str, region_name: str="", code: str="") -> Dict[str, Any]: try: response = self.client.query_warning_statistics(start_time, end_time, code) self.logger.debug(f"API响应: {response}") if response.type != 1 or len(response.resultdata) == 0: error_msg = f"查询失败: {response.message},请检查是否有相关数据权限" self.logger.warning(error_msg) return {'code': 400, 'message': error_msg} # 计算消息占比 if int(response.resultdata["num"]) != 0: num = int(response.resultdata["num"]) redper = int(response.resultdata["rednum"])/num orangeper = int(response.resultdata["orangenum"])/num yellowper = int(response.resultdata["yellownum"])/num blueper = int(response.resultdata["bluenum"])/num else: redper = 0 orangeper = 0 yellowper = 0 blueper = 0 # 计算处置率 if int(response.resultdata["rednum"]) != 0: redcloseper = int(response.resultdata["redcloseper"])/int(response.resultdata["rednum"]) else: redcloseper = 0 if int(response.resultdata["orangenum"]) != 0: orangecloseper = int(response.resultdata["orangecloseper"])/int(response.resultdata["orangenum"]) else: orangecloseper = 0 if int(response.resultdata["yellownum"]) != 0: yellowcloseper = int(response.resultdata["yellowcloseper"])/int(response.resultdata["yellownum"]) else: yellowcloseper = 0 if int(response.resultdata["bluenum"]) != 0: bluecloseper = int(response.resultdata["bluecloseper"])/int(response.resultdata["bluenum"]) data = { "预警消息个数": response.resultdata["num"], "处置消息个数": response.resultdata["closenum"], "处置率": response.resultdata["closeper"], "虚警消息个数": response.resultdata["falsenum"], "虚警率": response.resultdata["falseper"], "红色预警消息个数": response.resultdata["rednum"], "红色预警消息占比": f"{redper*100:.2f}%", "红色预警处置个数": response.resultdata["redcloseper"], "红色预警处置率": f"{redcloseper*100:.2f}%", "橙色预警消息个数": response.resultdata["orangenum"], "橙色预警消息占比": f"{orangeper*100:.2f}%", "橙色预警处置个数": response.resultdata["orangecloseper"], "橙色预警处置率": f"{orangecloseper*100:.2f}%", "黄色预警消息个数": response.resultdata["yellownum"], "黄色预警消息占比": f"{yellowper*100:.2f}%", "黄色预警处置个数": response.resultdata["yellowcloseper"], "黄色预警处置率": f"{yellowcloseper*100:.2f}%", "蓝色预警消息个数": response.resultdata["bluenum"], "蓝色预警消息占比": f"{blueper*100:.2f}%", "蓝色预警处置个数": response.resultdata["bluecloseper"], "蓝色预警处置率": f"{bluecloseper*100:.2f}%", "数据异常消息个数": response.resultdata["datanum"], "数据异常消息占比": response.resultdata['datacloseper'], "设备维护": response.resultdata["devicemainnum"], "设备维护占比": response.resultdata["devicemaincloseper"], "设备遭到破坏": response.resultdata["damagenum"], "设备遭到破坏占比": response.resultdata["damagecloseper"], "模型待优化": response.resultdata["modelnum"], "模型待优化占比": response.resultdata["modelcloseper"], } return { 'code': 200, 'data': data } except Exception as e: self.logger.error(f"查询预警统计信息失败: {e}") return {'code': 400, 'message': str(e)} def get_warning_statistics_of_month(self, year: str, region_name: str="", code: str="") -> Dict[str, Any]: try: response = self.client.query_warning_month_statistics(year, code) self.logger.debug(f"API响应: {response}") if response.type != 1 or len(response.resultdata) == 0: error_msg = f"查询失败: {response.message},请检查是否有相关数据权限" self.logger.warning(error_msg) return {'code': 400, 'message': error_msg} self.logger.info(f"查询成功,返回 {len(response.resultdata)} 条数据") data = {} for item in response.resultdata: month = { "预警数量": item["num"], "处置率": item["closeper"], "处置数量": item["closenum"], "虚警率": item["falseper"], "虚警数量": item["falsenum"], } data[f"{item['month']}月"] = month data = sorted(data.items(), key=lambda x: x[0]) return { 'code': 200, 'summary': f"查询到{len(data)}个月份的预警统计信息,包括{','.join([item[0] for item in data])}", 'data': data } except Exception as e: self.logger.error(f"查询预警统计信息失败: {e}") return {'code': 400, 'message': str(e)}