http_tools.py 8.81 KB
Newer Older
1
import httpx
tinywell committed
2
from typing import TypeVar, Generic, Any, Optional, List, Dict
3 4 5
from pydantic import BaseModel
from urllib.parse import urljoin

tinywell committed
6 7 8
import time
from ..utils.logger import get_logger

9 10 11
# 泛型类型定义
T = TypeVar('T')

tinywell committed
12 13
# const_base_url = "http://172.30.0.37:30007"
const_base_url = "http://localhost:5001"
tinywell committed
14 15 16
const_url_point = "/cigem/getMonitorPointAll"
const_url_rate = "/cigem/getAvgOnlineRate"
const_url_rate_ranking = "/cigem/getOnlineRateRank"
tinywell committed
17
const_url_rate_month = "/cigem/getOnlineRateOfMonth"
18
const_url_device_list = "/cigem/getMonitorDeviceList"
tinywell committed
19 20 21 22
const_url_device_and_sensor = "/cigem/getDeviceAndSensorCount"
const_url_warning = "/cigem/getWarnMsgDisposeRate"
const_url_warning_month = "/cigem/getWarnMsgDisposeRateOfMonth"

tinywell committed
23

24 25 26 27 28
class BaseResponse(BaseModel, Generic[T]):
    """通用响应模型"""
    type: int
    resultcode: int
    message: str
29 30
    resultdata: Optional[T] = None
    otherinfo: Optional[str] = None
31 32 33

class BaseHttpClient:
    """基础HTTP客户端"""
tinywell committed
34
    def __init__(self, base_url: str = const_base_url):
35
        self.base_url = base_url.rstrip('/')
tinywell committed
36 37
        self.timeout = 60.0
        self.logger = get_logger(self.__class__.__name__)
38 39 40 41 42 43 44 45 46 47 48

    async def _request_async(self, method: str, endpoint: str, **kwargs) -> Any:
        """通用异步请求方法"""
        async with httpx.AsyncClient(timeout=self.timeout) as client:
            url = urljoin(self.base_url, endpoint)
            response = await client.request(method, url, **kwargs)
            response.raise_for_status()
            return response.json()

    def _request_sync(self, method: str, endpoint: str, **kwargs) -> Any:
        """通用同步请求方法"""
tinywell committed
49 50 51
        self.logger.info(f"请求URL: {urljoin(self.base_url, endpoint)},请求参数: {kwargs}")
        start_time = time.time()
        result = None
52 53 54 55
        with httpx.Client(timeout=self.timeout) as client:
            url = urljoin(self.base_url, endpoint)
            response = client.request(method, url, **kwargs)
            response.raise_for_status()
tinywell committed
56 57 58 59
            result = response.json()
        end_time = time.time()
        self.logger.info(f"请求耗时: {end_time - start_time}秒")
        return result
60 61 62 63 64 65 66 67 68 69 70 71


class MonitorPoint(BaseModel):
    """监测点数据模型"""
    MONITORPOINTCODE: str
    MONITORPOINTNAME: str
    LOCATION: str
    LATITUDE: str
    LONGITUDE: str
    ELEVATION: str
    BUILDUNIT: str
    MONITORUNIT: str
72
    YWUNIT: Optional[str] = None
73
    SGDW: Optional[str] = None
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
    MANUFACTURER: Optional[str] = None
    MONITORTYPE: Optional[str] = None

class Sensor(BaseModel):
    """传感器数据模型"""
    SENSORCODE: str
    LOCATION: Optional[str] = None
    LATITUDE: Optional[str] = None
    LONGITUDE: Optional[str] = None
    DEVICETYPENAME: Optional[str] = None
    MANUFACTURER: Optional[str] = None

class Device(BaseModel):
    """设备数据模型"""
    DEVICECODE: str
    SN: str
    LOCATION: Optional[str] = None
    LATITUDE: Optional[str] = None
    LONGITUDE: Optional[str] = None
    DEVICETYPENAME: Optional[str] = None
    MANUFACTURER: Optional[str] = None
    MONITORPOINTNAME: Optional[str] = None
    SensorList: List[Sensor]
97 98 99

class MonitorClient(BaseHttpClient):
    """监测点查询客户端"""
100
    async def query_points(self, key: str) -> BaseResponse[List]:
101 102 103
        """异步查询监测点信息"""
        data = await self._request_async(
            "POST", 
tinywell committed
104
            const_url_point, 
105 106
            json={"key": key}
        )
107
        return BaseResponse[List](**data)
108

tinywell committed
109 110 111
    def query_points_sync(self, key: str, 
                           year: str, monitor_type: str, three_d_model: str, ortho_image: str, 
                           disaster_threat_people_range_start: str, disaster_threat_people_range_end: str, 
112
                           disaster_scale_start: str, disaster_scale_end: str, warn_level: str, device_type: str) -> BaseResponse[List]:
113
        """同步查询监测点信息"""
tinywell committed
114 115 116 117 118 119
        params = {
            "key": key,
            "year": year,
            "MONITORTYPE": monitor_type,
            "MODELEXIST": three_d_model,
            "DOMEXIST": ortho_image,
120
            "WARNLEVEL": warn_level,
tinywell committed
121 122 123
            "STARTTHREATSPOPULATION": disaster_threat_people_range_start,
            "ENDTHREATSPOPULATION": disaster_threat_people_range_end,
            "STARTDISASTERSCALE": disaster_scale_start,
124
            "ENDDISASTERSCALE": disaster_scale_end
tinywell committed
125 126 127
        }

        print(f"查询参数: {params}")
128 129
        data = self._request_sync(
            "POST", 
tinywell committed
130
            const_url_point, 
tinywell committed
131
            json=params
132
        )
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
        return BaseResponse[List](**data)

    def query_device_list(self, code: str) -> BaseResponse[List]:
        """同步查询设备列表"""
        data = self._request_sync(
            "POST", 
            const_url_device_list, 
            json={"monitorpointcode": code}
        )
        print(data)
        return BaseResponse[List](**data)

    async def query_device_list_async(self, code: str) -> BaseResponse[List]:
        """异步查询设备列表"""
        data = await self._request_async(
            "POST", 
            const_url_device_list, 
            json={"monitorpointcode": code}
        )
        return BaseResponse[List](**data)
153

tinywell committed
154 155 156 157 158 159 160 161 162 163 164 165
    def query_device_and_sensor(self, area_code: str, start_time: str, end_time: str,  device_type: str) -> BaseResponse[List]:
        """同步查询设备和传感器数量"""
        data = self._request_sync(
            "POST", 
            const_url_device_and_sensor, 
            json={
                "startTime": start_time, 
                "endTime": end_time, 
                "areaCode": area_code,
                "deviceType": device_type
            }
        )
166
        return BaseResponse[Dict](**data)
tinywell committed
167

168 169 170
# 示例:添加新的数据接口客户端
class RateClient(BaseHttpClient):
    """在线率查询客户端"""
tinywell committed
171
    async def query_rates(self, areacode: str, startDate: str, endDate: str, manufacturer_name: str, typeArr: str) -> BaseResponse[List]:
172 173 174
        """异步查询在线率信息"""
        data = await self._request_async(
            "POST",
tinywell committed
175
            const_url_rate,
tinywell committed
176 177 178
            json={
                'areaCode': areacode,
                'startDate': startDate,
tinywell committed
179 180 181
                'endDate': endDate,
                'manufacturerName': manufacturer_name,
                'typeArr': typeArr
tinywell committed
182
            }
183
        )
184
        return BaseResponse[List](**data)
185

tinywell committed
186
    def query_rates_sync(self, areacode: str, startDate: str, endDate: str, manufacturer_name: str, typeArr: str) -> BaseResponse[List]:
187 188 189
        """同步查询在线率信息"""
        data = self._request_sync(
            "POST",
tinywell committed
190
            const_url_rate,
tinywell committed
191 192 193
            json={
                'areaCode': areacode,
                'startDate': startDate,
tinywell committed
194 195 196
                'endDate': endDate,
                'manufacturerName': manufacturer_name,
                'typeArr': typeArr
tinywell committed
197
            }
198
        )
199
        return BaseResponse[List](**data)
200
    
201
    def query_rates_ranking_sync(self, rank_type: int) -> BaseResponse[List]:
202 203 204
        """同步查询在线率排名信息"""
        data = self._request_sync(
            "POST",
tinywell committed
205
            const_url_rate_ranking,
tinywell committed
206
            json={'type': rank_type}
207
        )
208
        return BaseResponse[List](**data)
209

tinywell committed
210
    async def query_rates_ranking(self, rank_type: int) -> BaseResponse[Dict]:
211 212 213
        """异步查询在线率排名信息"""
        data = await self._request_async(
            "POST",
tinywell committed
214
            const_url_rate_ranking,
tinywell committed
215
            json={'type': rank_type}
216
        )
tinywell committed
217
        return BaseResponse[Dict](**data)
218

tinywell committed
219
    def query_rates_month_sync(self, year: str, areaCode: str, typeArr: str) -> BaseResponse[List]:
220 221 222 223 224 225 226 227 228 229 230 231 232 233 234
        """同步查询按月度统计的在线率信息"""
        data = self._request_sync(
            "POST",
            const_url_rate_month,
            json={
                'year': year, 
                'areaCode': areaCode,
                'typeArr': typeArr
            }
        )
        return BaseResponse[List](**data)


class WarningClient(BaseHttpClient):
    """预警查询客户端"""
tinywell committed
235
    def query_warning_statistics(self, start_time: str, end_time: str, area_code: str) -> BaseResponse[Dict]:
236 237 238 239 240
        """同步查询预警统计信息"""
        data = self._request_sync(
            "POST",
            const_url_warning,
            json={
tinywell committed
241 242 243
                'startTime': start_time,
                'endTime': end_time,
                'areaCode': area_code
244 245
            }
        )
tinywell committed
246
        return BaseResponse[Dict](**data)
247 248 249 250 251
    
    def query_warning_month_statistics(self, year: str, areaCode: str) -> BaseResponse[List]:
        """同步查询按月度统计的预警统计信息"""
        data = self._request_sync(
            "POST",
tinywell committed
252
            const_url_warning_month,
253 254 255 256 257 258
            json={
                'year': year,
                'areaCode': areaCode
            }
        )
        return BaseResponse[List](**data)