import httpx
from typing import TypeVar, Generic, Any, Optional, List, Dict
from pydantic import BaseModel
from urllib.parse import urljoin
# 泛型类型定义
T = TypeVar('T')
# const_base_url = "http://172.30.0.37:30007"
const_base_url = "http://localhost:5001"
const_url_point = "/cigem/getMonitorPointAll"
const_url_rate = "/cigem/getAvgOnlineRate"
const_url_rate_ranking = "/cigem/getOnlineRateRank"
class BaseResponse(BaseModel, Generic[T]):
"""通用响应模型"""
type: int
resultcode: int
message: str
resultdata: T
class BaseHttpClient:
"""基础HTTP客户端"""
def __init__(self, base_url: str = const_base_url):
self.base_url = base_url.rstrip('/')
self.timeout = 30.0
async def _request_async(self, method: str, endpoint: str, **kwargs) -> Any:
"""通用异步请求方法"""
async with httpx.AsyncClient(timeout=self.timeout) as client:
url = urljoin(self.base_url, endpoint)
response = await client.request(method, url, **kwargs)
response.raise_for_status()
return response.json()
def _request_sync(self, method: str, endpoint: str, **kwargs) -> Any:
"""通用同步请求方法"""
with httpx.Client(timeout=self.timeout) as client:
url = urljoin(self.base_url, endpoint)
response = client.request(method, url, **kwargs)
response.raise_for_status()
return response.json()
class MonitorPoint(BaseModel):
"""监测点数据模型"""
MONITORPOINTCODE: str
MONITORPOINTNAME: str
LOCATION: str
LATITUDE: str
LONGITUDE: str
ELEVATION: str
BUILDUNIT: str
MONITORUNIT: str
YWUNIT: str
SGDW: Optional[str] = None
MANUFACTURER: str = ""
MONITORTYPE: str
class MonitorClient(BaseHttpClient):
"""监测点查询客户端"""
async def query_points(self, key: str) -> BaseResponse[List[MonitorPoint]]:
"""异步查询监测点信息"""
data = await self._request_async(
"POST",
const_url_point,
json={"key": key}
)
return BaseResponse[List[MonitorPoint]](**data)
def query_points_sync(self, key: str) -> BaseResponse[List[MonitorPoint]]:
"""同步查询监测点信息"""
data = self._request_sync(
"POST",
const_url_point,
json={"key": key}
)
return BaseResponse[List[MonitorPoint]](**data)
# 示例:添加新的数据接口客户端
class RateClient(BaseHttpClient):
"""在线率查询客户端"""
async def query_rates(self, areacode: str, startDate: str, endDate: str) -> BaseResponse[Dict]:
"""异步查询在线率信息"""
data = await self._request_async(
"POST",
const_url_rate,
json={
'areaCode': areacode,
'startDate': startDate,
'endDate': endDate
}
)
return BaseResponse[Dict](**data)
def query_rates_sync(self, areacode: str, startDate: str, endDate: str) -> BaseResponse[List]:
"""同步查询在线率信息"""
data = self._request_sync(
"POST",
const_url_rate,
json={
'areaCode': areacode,
'startDate': startDate,
'endDate': endDate
}
)
return BaseResponse[List](**data)
def query_rates_ranking_sync(self, rank_type: int) -> BaseResponse[List]:
"""同步查询在线率排名信息"""
data = self._request_sync(
"POST",
const_url_rate_ranking,
json={'type': rank_type}
)
return BaseResponse[List](**data)
async def query_rates_ranking(self, rank_type: int) -> BaseResponse[List]:
"""异步查询在线率排名信息"""
data = await self._request_async(
"POST",
const_url_rate_ranking,
json={'type': rank_type}
)
return BaseResponse[List](**data)
# 使用示例
async def example_async_usage():
# 监测点查询示例
monitor_client = MonitorClient()
try:
response = await monitor_client.query_points("湖南")
if response.resultcode == 1 and response.resultdata:
for point in response.resultdata:
print(f"监测点名称: {point.MONITORPOINTNAME}")
print(f"位置: {point.LOCATION}")
print("---")
except httpx.HTTPError as e:
print(f"HTTP请求错误: {e}")
# 在线率查询示例
rate_client = RateClient()
try:
response = await rate_client.query_rates("520100", "2024-01-01", "2024-01-31")
if response.resultcode == 1:
print(f"在线率数据: {response.resultdata}")
except httpx.HTTPError as e:
print(f"HTTP请求错误: {e}")
if __name__ == "__main__":
asyncio.run(example_async_usage())