import httpx from typing import List, Optional, Dict, TypeVar, Generic, Any from pydantic import BaseModel import asyncio from urllib.parse import urljoin # 泛型类型定义 T = TypeVar('T') # const_base_url = "http://172.30.0.37:30007" const_base_url = "http://localhost:5001" const_url_point = "/cigem/getMonitorPointAll" const_url_rate = "/cigem/getAvgOnlineRate" const_url_rate_ranking = "/cigem/getOnlineRateRank" class BaseResponse(BaseModel, Generic[T]): """通用响应模型""" type: int resultcode: int message: str resultdata: T class BaseHttpClient: """基础HTTP客户端""" def __init__(self, base_url: str = const_base_url): self.base_url = base_url.rstrip('/') self.timeout = 30.0 async def _request_async(self, method: str, endpoint: str, **kwargs) -> Any: """通用异步请求方法""" async with httpx.AsyncClient(timeout=self.timeout) as client: url = urljoin(self.base_url, endpoint) response = await client.request(method, url, **kwargs) response.raise_for_status() return response.json() def _request_sync(self, method: str, endpoint: str, **kwargs) -> Any: """通用同步请求方法""" with httpx.Client(timeout=self.timeout) as client: url = urljoin(self.base_url, endpoint) response = client.request(method, url, **kwargs) response.raise_for_status() return response.json() class MonitorPoint(BaseModel): """监测点数据模型""" MONITORPOINTCODE: str MONITORPOINTNAME: str LOCATION: str LATITUDE: str LONGITUDE: str ELEVATION: str BUILDUNIT: str MONITORUNIT: str YWUNIT: str SGDW: Optional[str] = None MANUFACTURER: str = "" class MonitorClient(BaseHttpClient): """监测点查询客户端""" async def query_points(self, key: str) -> BaseResponse[List[MonitorPoint]]: """异步查询监测点信息""" data = await self._request_async( "POST", const_url_point, json={"key": key} ) return BaseResponse[List[MonitorPoint]](**data) def query_points_sync(self, key: str) -> BaseResponse[List[MonitorPoint]]: """同步查询监测点信息""" data = self._request_sync( "POST", const_url_point, json={"key": key} ) return BaseResponse[List[MonitorPoint]](**data) # 示例:添加新的数据接口客户端 class RateClient(BaseHttpClient): """在线率查询客户端""" async def query_rates(self, areacode: str, startDate: str, endDate: str) -> BaseResponse[Dict]: """异步查询在线率信息""" data = await self._request_async( "POST", const_url_rate, json={ 'areaCode': areacode, 'startDate': startDate, 'endDate': endDate } ) return BaseResponse[Dict](**data) def query_rates_sync(self, areacode: str, startDate: str, endDate: str) -> BaseResponse[Dict]: """同步查询在线率信息""" data = self._request_sync( "POST", const_url_rate, json={ 'areaCode': areacode, 'startDate': startDate, 'endDate': endDate } ) return BaseResponse[Dict](**data) def query_rates_ranking_sync(self, rank_type: int) -> BaseResponse[Dict]: """同步查询在线率排名信息""" data = self._request_sync( "POST", const_url_rate_ranking, json={'type': rank_type} ) return BaseResponse[Dict](**data) async def query_rates_ranking(self, rank_type: int) -> BaseResponse[Dict]: """异步查询在线率排名信息""" data = await self._request_async( "POST", const_url_rate_ranking, json={'type': rank_type} ) return BaseResponse[Dict](**data) # 使用示例 async def example_async_usage(): # 监测点查询示例 monitor_client = MonitorClient() try: response = await monitor_client.query_points("湖南") if response.resultcode == 1 and response.resultdata: for point in response.resultdata: print(f"监测点名称: {point.MONITORPOINTNAME}") print(f"位置: {point.LOCATION}") print("---") except httpx.HTTPError as e: print(f"HTTP请求错误: {e}") # 在线率查询示例 rate_client = RateClient() try: response = await rate_client.query_rates("520100", "2024-01-01", "2024-01-31") if response.resultcode == 1: print(f"在线率数据: {response.resultdata}") except httpx.HTTPError as e: print(f"HTTP请求错误: {e}") if __name__ == "__main__": asyncio.run(example_async_usage())