tool_warn.py 6.24 KB
Newer Older
1 2 3 4 5 6 7 8 9
"""
不同时间段不同地区预警处置和虚警情况
1.地区处置率数量和占比统计
2.地区虚警率数量和占比统计
3.地区蓝黄橙红数数量和占比统计
"""

from pydantic import BaseModel, Field
from typing import Any, Dict, Type
tinywell committed
10 11 12
import logging

from langchain_core.tools import BaseTool
13 14

from .http_tools import WarningClient, const_base_url
tinywell committed
15 16 17 18
from .code import AreaCodeTool
from ..utils.logger import get_logger

code_tool = AreaCodeTool()
19 20 21

class WarningArgs(BaseModel):
    """预警查询参数"""
tinywell committed
22 23 24
    start_time: str = Field("", description="开始时间 (YYYY-MM-DD HH:mm:ss)")
    end_time: str = Field("", description="结束时间 (YYYY-MM-DD HH:mm:ss)")
    region_name: str = Field("", description="地区名称,如果要查询全国数据,请输入空字符串")
25
    query_type: str = Field("1", description="查询类型,1 表示查询一段时间内的综合数据,2 表示查询指定年份全年按月度统计的虚警率,处置率等信息")
26 27 28 29

class WarningTool(BaseTool):
    """查询预警处置和虚警情况"""
    name: str = "warning_statistics"
tinywell committed
30
    description: str = "查询一定时间范围内不同地区预警处置和虚警情况,包括处置率、虚警率、蓝黄橙红数数量和占比统计。也支持查询指定年份全年按月度统计的虚警率,处置率等信息。"
31 32
    args_schema: Type[BaseModel] = WarningArgs
    client: Any = Field(None, exclude=True)
tinywell committed
33
    logger: logging.Logger = Field(None, exclude=True)
34 35 36 37
    
    def __init__(self, base_url: str = const_base_url, **data):
        super().__init__(**data)
        self.client = WarningClient(base_url=base_url)
tinywell committed
38
        self.logger = get_logger("WarningTool")
39
        self.logger.info(f"初始化 WarningTool,base_url: {base_url}")
tinywell committed
40

41
        
42
    def _run(self, start_time: str, end_time: str, region_name: str="", query_type: str="1") -> Dict[str, Any]:
tinywell committed
43 44 45 46 47 48 49 50 51 52 53 54 55
        code = ""
        if region_name != "":
            self.logger.debug(f"查找区域代码: {region_name}")
            codes = code_tool.find_code(region_name)
            if codes is None or len(codes) == 0:
                error_msg = f'未找到匹配的区域代码: {region_name}'
                self.logger.warning(error_msg)
                return {'code': 400, 'message': error_msg}
            code = codes[0][1]
            self.logger.debug(f"找到区域代码: {code}")


        year = start_time.split("-")[0]
56 57 58 59 60
        if query_type == "1":
            detail = self.get_warning_statistics(start_time, end_time, region_name, code)
            return detail
        else:
            monthly = self.get_warning_statistics_of_month(year, region_name, code)
tinywell committed
61
            return monthly
62
    
tinywell committed
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
    def get_warning_statistics(self, start_time: str, end_time: str, region_name: str="", code: str="") -> Dict[str, Any]:
        try:
            response = self.client.query_warning_statistics(start_time, end_time, code)
            self.logger.debug(f"API响应: {response}")

            if response.type != 1 or len(response.resultdata) == 0:
                error_msg = f"查询失败: {response.message},请检查是否有相关数据权限"
                self.logger.warning(error_msg)
                return {'code': 400, 'message': error_msg}

            data = {
                "预警消息个数": response.resultdata["num"],
                "处置消息个数": response.resultdata["closenum"],
                "处置率": response.resultdata["closeper"],
                "虚警消息个数": response.resultdata["falsenum"],
                "虚警率": response.resultdata["falseper"],
                "红色预警消息个数": response.resultdata["rednum"],
                "红色处置消息个数": response.resultdata["redcloseper"],
                "橙色预警消息个数": response.resultdata["orangenum"],
                "橙色处置消息个数": response.resultdata["orangecloseper"],
                "黄色预警消息个数": response.resultdata["yellownum"],
                "黄色处置消息个数": response.resultdata["yellowcloseper"],
                "蓝色预警消息个数": response.resultdata["bluenum"],
                "蓝色处置消息个数": response.resultdata["bluecloseper"],
                "数据异常消息个数": response.resultdata["datanum"],
                "数据异常消息占比": response.resultdata["datacloseper"],
                "设备维护": response.resultdata["devicemainnum"],
                "设备维护占比": response.resultdata["devicemaincloseper"],
                "设备遭到破坏": response.resultdata["damagenum"],
                "设备遭到破坏占比": response.resultdata["damagecloseper"],
                "模型待优化": response.resultdata["modelnum"],
                "模型待优化占比": response.resultdata["modelcloseper"],
            }
            return {
                'code': 200,
                'data': data
            }

        except Exception as e:
            self.logger.error(f"查询预警统计信息失败: {e}")
            return {'code': 400, 'message': str(e)}

    def get_warning_statistics_of_month(self, year: str, region_name: str="", code: str="") -> Dict[str, Any]:
        try:
            response = self.client.query_warning_month_statistics(year, code)
            self.logger.debug(f"API响应: {response}")

            if response.type != 1 or len(response.resultdata) == 0:
                error_msg = f"查询失败: {response.message},请检查是否有相关数据权限"
                self.logger.warning(error_msg)
                return {'code': 400, 'message': error_msg}

            data = {}
            for item in response.resultdata:
                month = {
                    "预警数量": item["num"],
                    "处置率": item["closeper"],
                    "处置数量": item["closenum"],
                    "虚警率": item["falseper"],
                    "虚警数量": item["falsenum"],
                }
                data[item["month"]] = month

            data = sorted(data.items(), key=lambda x: x[0])
            return {
                'code': 200,
                'data': data
            }
        except Exception as e:
            self.logger.error(f"查询预警统计信息失败: {e}")
            return {'code': 400, 'message': str(e)}