import httpx from typing import TypeVar, Generic, Any, Optional, List, Dict from pydantic import BaseModel from urllib.parse import urljoin import time from ..utils.logger import get_logger # 泛型类型定义 T = TypeVar('T') # const_base_url = "http://172.30.0.37:30007" const_base_url = "http://localhost:5001" const_url_point = "/cigem/getMonitorPointAll" const_url_rate = "/cigem/getAvgOnlineRate" const_url_rate_ranking = "/cigem/getOnlineRateRank" const_url_rate_month = "/cigem/getOnlineRateOfMonth" const_url_device_list = "/cigem/getMonitorDeviceList" const_url_device_and_sensor = "/cigem/getDeviceAndSensorCount" const_url_warning = "/cigem/getWarnMsgDisposeRate" const_url_warning_month = "/cigem/getWarnMsgDisposeRateOfMonth" class BaseResponse(BaseModel, Generic[T]): """通用响应模型""" type: int resultcode: int message: str resultdata: Optional[T] = None otherinfo: Optional[str] = None class BaseHttpClient: """基础HTTP客户端""" def __init__(self, base_url: str = const_base_url): self.base_url = base_url.rstrip('/') self.timeout = 60.0 self.logger = get_logger(self.__class__.__name__) async def _request_async(self, method: str, endpoint: str, **kwargs) -> Any: """通用异步请求方法""" async with httpx.AsyncClient(timeout=self.timeout) as client: url = urljoin(self.base_url, endpoint) response = await client.request(method, url, **kwargs) response.raise_for_status() return response.json() def _request_sync(self, method: str, endpoint: str, **kwargs) -> Any: """通用同步请求方法""" self.logger.info(f"请求URL: {urljoin(self.base_url, endpoint)},请求参数: {kwargs}") start_time = time.time() result = None with httpx.Client(timeout=self.timeout) as client: url = urljoin(self.base_url, endpoint) response = client.request(method, url, **kwargs) response.raise_for_status() result = response.json() end_time = time.time() self.logger.info(f"请求耗时: {end_time - start_time}秒") return result class MonitorPoint(BaseModel): """监测点数据模型""" MONITORPOINTCODE: str MONITORPOINTNAME: str LOCATION: str LATITUDE: str LONGITUDE: str ELEVATION: str BUILDUNIT: str MONITORUNIT: str YWUNIT: Optional[str] = None SGDW: Optional[str] = None MANUFACTURER: Optional[str] = None MONITORTYPE: Optional[str] = None class Sensor(BaseModel): """传感器数据模型""" SENSORCODE: str LOCATION: Optional[str] = None LATITUDE: Optional[str] = None LONGITUDE: Optional[str] = None DEVICETYPENAME: Optional[str] = None MANUFACTURER: Optional[str] = None class Device(BaseModel): """设备数据模型""" DEVICECODE: str SN: str LOCATION: Optional[str] = None LATITUDE: Optional[str] = None LONGITUDE: Optional[str] = None DEVICETYPENAME: Optional[str] = None MANUFACTURER: Optional[str] = None MONITORPOINTNAME: Optional[str] = None SensorList: List[Sensor] class MonitorClient(BaseHttpClient): """监测点查询客户端""" async def query_points(self, key: str) -> BaseResponse[List]: """异步查询监测点信息""" data = await self._request_async( "POST", const_url_point, json={"key": key} ) return BaseResponse[List](**data) def query_points_sync(self, key: str, year: str, monitor_type: str, three_d_model: str, ortho_image: str, disaster_threat_people_range_start: str, disaster_threat_people_range_end: str, disaster_scale_start: str, disaster_scale_end: str, warn_level: str, device_type: str) -> BaseResponse[List]: """同步查询监测点信息""" params = { "key": key, "year": year, "MONITORTYPE": monitor_type, "MODELEXIST": three_d_model, "DOMEXIST": ortho_image, "WARNLEVEL": warn_level, "STARTTHREATSPOPULATION": disaster_threat_people_range_start, "ENDTHREATSPOPULATION": disaster_threat_people_range_end, "STARTDISASTERSCALE": disaster_scale_start, "ENDDISASTERSCALE": disaster_scale_end } print(f"查询参数: {params}") data = self._request_sync( "POST", const_url_point, json=params ) return BaseResponse[List](**data) def query_device_list(self, code: str) -> BaseResponse[List]: """同步查询设备列表""" data = self._request_sync( "POST", const_url_device_list, json={"monitorpointcode": code} ) print(data) return BaseResponse[List](**data) async def query_device_list_async(self, code: str) -> BaseResponse[List]: """异步查询设备列表""" data = await self._request_async( "POST", const_url_device_list, json={"monitorpointcode": code} ) return BaseResponse[List](**data) def query_device_and_sensor(self, area_code: str, start_time: str, end_time: str, device_type: str) -> BaseResponse[List]: """同步查询设备和传感器数量""" data = self._request_sync( "POST", const_url_device_and_sensor, json={ "startTime": start_time, "endTime": end_time, "areaCode": area_code, "deviceType": device_type } ) return BaseResponse[Dict](**data) # 示例:添加新的数据接口客户端 class RateClient(BaseHttpClient): """在线率查询客户端""" async def query_rates(self, areacode: str, startDate: str, endDate: str, manufacturer_name: str, typeArr: str) -> BaseResponse[List]: """异步查询在线率信息""" data = await self._request_async( "POST", const_url_rate, json={ 'areaCode': areacode, 'startDate': startDate, 'endDate': endDate, 'manufacturerName': manufacturer_name, 'typeArr': typeArr } ) return BaseResponse[List](**data) def query_rates_sync(self, areacode: str, startDate: str, endDate: str, manufacturer_name: str, typeArr: str) -> BaseResponse[List]: """同步查询在线率信息""" data = self._request_sync( "POST", const_url_rate, json={ 'areaCode': areacode, 'startDate': startDate, 'endDate': endDate, 'manufacturerName': manufacturer_name, 'typeArr': typeArr } ) return BaseResponse[List](**data) def query_rates_ranking_sync(self, rank_type: int) -> BaseResponse[List]: """同步查询在线率排名信息""" data = self._request_sync( "POST", const_url_rate_ranking, json={'type': rank_type} ) return BaseResponse[List](**data) async def query_rates_ranking(self, rank_type: int) -> BaseResponse[Dict]: """异步查询在线率排名信息""" data = await self._request_async( "POST", const_url_rate_ranking, json={'type': rank_type} ) return BaseResponse[Dict](**data) def query_rates_month_sync(self, year: str, areaCode: str, typeArr: str) -> BaseResponse[List]: """同步查询按月度统计的在线率信息""" data = self._request_sync( "POST", const_url_rate_month, json={ 'year': year, 'areaCode': areaCode, 'typeArr': typeArr } ) return BaseResponse[List](**data) class WarningClient(BaseHttpClient): """预警查询客户端""" def query_warning_statistics(self, start_time: str, end_time: str, area_code: str) -> BaseResponse[Dict]: """同步查询预警统计信息""" data = self._request_sync( "POST", const_url_warning, json={ 'startTime': start_time, 'endTime': end_time, 'areaCode': area_code } ) return BaseResponse[Dict](**data) def query_warning_month_statistics(self, year: str, areaCode: str) -> BaseResponse[List]: """同步查询按月度统计的预警统计信息""" data = self._request_sync( "POST", const_url_warning_month, json={ 'year': year, 'areaCode': areaCode } ) return BaseResponse[List](**data)