Commit f50cc798 by tinywell

增加在线率接口、工具调用

parent 6a19aee9
This source diff could not be displayed because it is too large. You can view the blob instead.
import pandas as pd
from typing import List, Optional, Dict, Tuple
import os
class AreaCodeTool:
def __init__(self, csv_path: str = None):
"""
初始化行政区划代码工具
Args:
csv_path: CSV文件路径,如果为None则使用默认路径
"""
if csv_path is None:
# 获取当前文件所在目录
current_dir = os.path.dirname(os.path.abspath(__file__))
csv_path = os.path.join(current_dir, "area_code.csv")
# 读取CSV文件
self.df = pd.read_csv(csv_path, dtype={'code': str})
# 确保code列为字符串类型
self.df['code'] = self.df['code'].astype(str)
# 构建区域名称到代码的映射
self._build_name_maps()
def _build_name_maps(self):
"""构建区域名称到代码的映射"""
self.full_name_map = dict(zip(self.df['name'], self.df['code']))
# 构建省级映射
self.province_map = {}
# 构建市级映射
self.city_map = {}
# 构建区县级映射
self.district_map = {}
for _, row in self.df.iterrows():
name = row['name'].strip()
code = row['code']
parts = name.split('省' if '省' in name else '市')
if '省' in name:
province = parts[0] + '省'
self.province_map[province] = code
if len(parts) > 1 and parts[1]:
city_parts = parts[1].split('市')
if city_parts[0]:
city = city_parts[0] + '市'
self.city_map[city] = code
if len(city_parts) > 1 and city_parts[1]:
district = city_parts[1]
self.district_map[district] = code
else:
# 处理直辖市等特殊情况
if parts[0]:
self.city_map[parts[0] + '市'] = code
def find_code(self, area_name: str) -> List[Tuple[str, str]]:
"""
查找区域代码
Args:
area_name: 区域名称,可以是完整或部分名称
Returns:
List[Tuple[str, str]]: 返回匹配的(区域名称, 代码)列表
"""
results = []
# 尝试完整匹配
if area_name in self.full_name_map:
results.append((area_name, self.full_name_map[area_name]))
return results
# 尝试省级匹配
if area_name.endswith('省') and area_name in self.province_map:
results.append((area_name, self.province_map[area_name]))
# 尝试市级匹配
if area_name.endswith('市') and area_name in self.city_map:
results.append((area_name, self.city_map[area_name]))
# 尝试区县级匹配
if area_name in self.district_map:
results.append((area_name, self.district_map[area_name]))
# 模糊匹配
if not results:
mask = self.df['name'].str.contains(area_name, na=False)
matches = self.df[mask]
results.extend([(row['name'], row['code']) for _, row in matches.iterrows()])
return results
def get_full_name(self, code: str) -> Optional[str]:
"""
根据代码获取完整的区域名称
Args:
code: 区域代码
Returns:
Optional[str]: 完整的区域名称,如果未找到则返回None
"""
mask = self.df['code'] == code
matches = self.df[mask]
if not matches.empty:
return matches.iloc[0]['name']
return None
# 使用示例
def example_usage():
tool = AreaCodeTool()
# 测试不同类型的查询
test_cases = [
"安徽省",
"安庆市",
"迎江区",
"安徽省安庆市",
"安徽省安庆市迎江区",
"安庆" # 模糊查询
]
for query in test_cases:
print(f"\n查询: {query}")
results = tool.find_code(query)
for name, code in results:
print(f"匹配结果: {name} -> {code}")
# 测试代码反查
code = "340802" # 安徽省安庆市迎江区
full_name = tool.get_full_name(code)
if full_name:
print(f"\n代码反查: {code} -> {full_name}")
if __name__ == "__main__":
example_usage()
import httpx
from typing import List, Optional, Dict, TypeVar, Generic, Any
from pydantic import BaseModel
import asyncio
from urllib.parse import urljoin
# 泛型类型定义
T = TypeVar('T')
class BaseResponse(BaseModel, Generic[T]):
"""通用响应模型"""
type: int
resultcode: int
message: str
resultdata: T
class BaseHttpClient:
"""基础HTTP客户端"""
def __init__(self, base_url: str = "http://localhost:5001"):
self.base_url = base_url.rstrip('/')
self.timeout = 30.0
async def _request_async(self, method: str, endpoint: str, **kwargs) -> Any:
"""通用异步请求方法"""
async with httpx.AsyncClient(timeout=self.timeout) as client:
url = urljoin(self.base_url, endpoint)
response = await client.request(method, url, **kwargs)
response.raise_for_status()
return response.json()
def _request_sync(self, method: str, endpoint: str, **kwargs) -> Any:
"""通用同步请求方法"""
with httpx.Client(timeout=self.timeout) as client:
url = urljoin(self.base_url, endpoint)
response = client.request(method, url, **kwargs)
response.raise_for_status()
return response.json()
class MonitorPoint(BaseModel):
"""监测点数据模型"""
MONITORPOINTCODE: str
MONITORPOINTNAME: str
LOCATION: str
LATITUDE: str
LONGITUDE: str
ELEVATION: str
BUILDUNIT: str
MONITORUNIT: str
YWUNIT: str
SGDW: Optional[str] = None
MANUFACTURER: str = ""
class MonitorClient(BaseHttpClient):
"""监测点查询客户端"""
async def query_points(self, key: str) -> BaseResponse[List[MonitorPoint]]:
"""异步查询监测点信息"""
data = await self._request_async(
"POST",
"/api/monitor/points",
json={"key": key}
)
return BaseResponse[List[MonitorPoint]](**data)
def query_points_sync(self, key: str) -> BaseResponse[List[MonitorPoint]]:
"""同步查询监测点信息"""
data = self._request_sync(
"POST",
"/api/monitor/points",
json={"key": key}
)
return BaseResponse[List[MonitorPoint]](**data)
# 示例:添加新的数据接口客户端
class RateClient(BaseHttpClient):
"""在线率查询客户端"""
async def query_rates(self, params: Dict) -> BaseResponse[Dict]:
"""异步查询在线率信息"""
data = await self._request_async(
"POST",
"/api/device/rate",
json=params
)
return BaseResponse[Dict](**data)
def query_rates_sync(self, params: Dict) -> BaseResponse[Dict]:
"""同步查询在线率信息"""
data = self._request_sync(
"POST",
"/api/device/rate",
json=params
)
return BaseResponse[Dict](**data)
def query_rates_ranking_sync(self, params: Dict) -> BaseResponse[Dict]:
"""同步查询在线率排名信息"""
data = self._request_sync(
"POST",
"/api/device/rate/ranking",
json=params
)
return BaseResponse[Dict](**data)
async def query_rates_ranking(self, params: Dict) -> BaseResponse[Dict]:
"""异步查询在线率排名信息"""
data = await self._request_async(
"POST",
"/api/device/rate/ranking",
json=params
)
return BaseResponse[Dict](**data)
# 使用示例
async def example_async_usage():
# 监测点查询示例
monitor_client = MonitorClient()
try:
response = await monitor_client.query_points("湖南")
if response.resultcode == 1 and response.resultdata:
for point in response.resultdata:
print(f"监测点名称: {point.MONITORPOINTNAME}")
print(f"位置: {point.LOCATION}")
print("---")
except httpx.HTTPError as e:
print(f"HTTP请求错误: {e}")
# 在线率查询示例
rate_client = RateClient()
try:
response = await rate_client.query_rates({"region": "湖南", "type": "residential"})
if response.resultcode == 1:
print(f"在线率数据: {response.resultdata}")
except httpx.HTTPError as e:
print(f"HTTP请求错误: {e}")
if __name__ == "__main__":
asyncio.run(example_async_usage())
from typing import Dict, Any, Optional from typing import Dict, Any, Optional,List
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from langchain_core.tools import BaseTool from langchain_core.tools import BaseTool
from src.server.http_tools import MonitorClient from .http_tools import MonitorClient, RateClient
class MonitorPointResponse():
"""监测点查询结果"""
status: str = Field(..., description="状态")
message: str = Field(..., description="消息")
points: List[Dict[str, Any]] = Field(..., description="监测点列表")
class MonitorPointArgs(BaseModel): class MonitorPointArgs(BaseModel):
"""监测点查询参数""" """监测点查询参数"""
...@@ -45,11 +51,11 @@ class MonitorPointTool(BaseTool): ...@@ -45,11 +51,11 @@ class MonitorPointTool(BaseTool):
# 格式化返回结果 # 格式化返回结果
if not response.resultdata: if not response.resultdata:
return { return MonitorPointResponse(
"status": "success", status="success",
"message": f"未在{key}找到监测点信息", message=f"未在{key}找到监测点信息",
"points": [] points=[]
} )
# 提取关键信息并格式化 # 提取关键信息并格式化
points_info = [] points_info = []
...@@ -64,18 +70,18 @@ class MonitorPointTool(BaseTool): ...@@ -64,18 +70,18 @@ class MonitorPointTool(BaseTool):
"监测单位": point.MONITORUNIT "监测单位": point.MONITORUNIT
}) })
return { return MonitorPointResponse(
"status": "success", status="success",
"message": f"在{key}找到{len(points_info)}个监测点", message=f"在{key}找到{len(points_info)}个监测点",
"points": points_info points=points_info
} )
except Exception as e: except Exception as e:
return { return MonitorPointResponse(
"status": "error", status="error",
"message": f"查询失败: {str(e)}", message=f"查询失败: {str(e)}",
"points": [] points=[]
} )
def _arun(self, key: str) -> Dict[str, Any]: def _arun(self, key: str) -> Dict[str, Any]:
""" """
......
...@@ -7,6 +7,10 @@ from pydantic import BaseModel, Field ...@@ -7,6 +7,10 @@ from pydantic import BaseModel, Field
from typing import Type from typing import Type
from langchain_core.tools import BaseTool from langchain_core.tools import BaseTool
from .http_tools import RateClient
from .code import AreaCodeTool
code_tool = AreaCodeTool()
class BaseRateTool(BaseTool): class BaseRateTool(BaseTool):
"""设备在线率分析基础工具类""" """设备在线率分析基础工具类"""
db: Any = Field(None, exclude=True) db: Any = Field(None, exclude=True)
...@@ -69,45 +73,41 @@ class RegionRateTool(BaseRateTool): ...@@ -69,45 +73,41 @@ class RegionRateTool(BaseRateTool):
name = "region_online_rate" name = "region_online_rate"
description = "查询指定地区在指定时间段内的设备在线率" description = "查询指定地区在指定时间段内的设备在线率"
args_schema: Type[BaseModel] = RegionRateArgs args_schema: Type[BaseModel] = RegionRateArgs
client: Any = Field(None, exclude=True)
def __init__(self, base_url: str = "http://localhost:5001", **data):
super().__init__(**data)
self.client = RateClient(base_url=base_url)
def _run(self, region_name: str, start_time: str, end_time: str) -> Dict[str, Any]: def _run(self, region_name: str, start_time: str, end_time: str) -> Dict[str, Any]:
return self.get_region_online_rate(region_name, start_time, end_time) return self.get_region_online_rate(region_name, start_time, end_time)
def get_region_online_rate(self, region_name: str, start_time: str, end_time: str) -> Dict[str, Any]: def get_region_online_rate(self, region_name: str, start_time: str, end_time: str) -> Dict[str, Any]:
# 保持原有的实现...
# 查询数据 # 查询数据
sql = """region""" code = code_tool.find_code(region_name)
df = self.db.query(sql, { if not code:
'region_name': region_name, return {
'start_time': start_time, 'code': 400,
'end_time': end_time 'message': f'未找到匹配的区域代码: {region_name}'
}
df = self.client.query_rates_sync({
'startDate': start_time,
'endDate': end_time,
'areaCode': code[0][1]
}) })
# 生成图表
fig = px.line(df,
x='date',
y='online_rate',
title=f'{region_name}设备在线率趋势')
fig.update_layout(
xaxis_title='日期',
yaxis_title='在线率',
yaxis_tickformat='.2%'
)
# 准备数据 # 准备数据
data = { data = {
'region': region_name, 'region': region_name,
'average_rate': df['online_rate'].mean(), 'region_code': code[0][1],
'max_rate': df['online_rate'].max(), 'rate_data': df.resultdata,
'min_rate': df['online_rate'].min(),
'total_devices': df['device_count'].mean(),
'date_range': { 'date_range': {
'start': start_time, 'start': start_time,
'end': end_time 'end': end_time
} }
} }
return self.format_response(data, fig) return data
class RankingRateArgs(BaseModel): class RankingRateArgs(BaseModel):
"""排名查询参数""" """排名查询参数"""
......
...@@ -5,28 +5,92 @@ from typing import List, Optional, Dict ...@@ -5,28 +5,92 @@ from typing import List, Optional, Dict
app = FastAPI(title="Monitor Points API") app = FastAPI(title="Monitor Points API")
# 多级行政区划数据结构 # 行政区划数据结构
area_data = { area_data = {
"湖南省": { "520000": {
"长沙市": ["芙蓉区", "天心区", "岳麓区", "开福区", "雨花区"], "name": "贵州省",
"株洲市": ["天元区", "荷塘区", "芦淞区", "石峰区", "醴陵市"], "children": {
"湘潭市": ["雨湖区", "岳塘区", "湘乡市", "韶山市"], "520100": {"name": "贵阳市"},
}, "520200": {"name": "六盘水市"},
"湖北省": { "520300": {"name": "遵义市"},
"武汉市": ["江岸区", "江汉区", "硚口区", "汉阳区", "武昌区"], "520400": {"name": "安顺市"},
"宜昌市": ["西陵区", "伍家岗区", "点军区", "猇亭区"], "522401": {"name": "毕节市"},
}, "520600": {"name": "铜仁市"},
"广东省": { "520700": {"name": "双龙新区"},
"广州市": ["越秀区", "海珠区", "荔湾区", "天河区", "白云区"], "522300": {"name": "黔西南布依族苗族自治州"},
"深圳市": ["福田区", "罗湖区", "南山区", "宝安区", "龙岗区"], "522600": {"name": "黔东南苗族侗族自治州"},
"522700": {"name": "黔南布依族苗族自治州"}
} }
},
# 可以继续添加其他省份数据...
} }
# 用于生成随机地名的组件 # 用于生成随机地名的组件
towns = ["茶山镇", "李畋镇", "泗汾镇", "东富镇", "新华镇", "龙泉镇"] towns = ["茶山镇", "李畋镇", "泗汾镇", "东富镇", "新华镇", "龙泉镇", "石桥镇", "金山镇", "阳光镇", "河东镇"]
villages = ["大西垅村", "石桥村", "龙门村", "青山村", "凤凰村", "红星村"] villages = ["大西垅村", "石桥村", "龙门村", "青山村", "凤凰村", "红星村", "新华村", "和平村", "兴隆村", "富民村"]
# 经纬度范围配置
region_coordinates = {
"贵州省": {"lat": (24.6, 29.1), "lng": (103.6, 109.6)},
# 可以继续添加其他省份的经纬度范围...
}
def get_area_info(area_code: str) -> Optional[Dict]:
"""获取区域信息"""
province_code = area_code[:2] + "0000"
if province_code not in area_data:
return None
if area_code == province_code:
return {"code": area_code, "name": area_data[province_code]["name"]}
for city_code, city_info in area_data[province_code]["children"].items():
if city_code == area_code:
return {"code": area_code, "name": city_info["name"]}
return None
def get_sub_areas(area_code: str) -> List[Dict]:
"""获取下级区域列表"""
province_code = area_code[:2] + "0000"
if province_code not in area_data:
return []
if area_code == province_code:
return [
{"code": code, "name": info["name"]}
for code, info in area_data[province_code]["children"].items()
]
return []
def generate_random_point(area_info: Dict) -> dict:
"""生成随机的检测点数据"""
area_name = area_info["name"]
area_code = area_info["code"]
# 获取经纬度范围
coords = region_coordinates.get(area_name, {"lat": (24.6, 29.1), "lng": (103.6, 109.6)})
lat_range = coords["lat"]
lng_range = coords["lng"]
town = random.choice(towns)
village = random.choice(villages)
return {
"MONITORPOINTCODE": f"{area_code[:2]}{random.randint(1000, 9999)}000{random.randint(100, 999)}",
"MONITORPOINTNAME": f"{area_name}{town}{village}地质灾害隐患点",
"LOCATION": f"{area_name}{town}{village}",
"LATITUDE": f"{random.uniform(lat_range[0], lat_range[1]):.8f}",
"LONGITUDE": f"{random.uniform(lng_range[0], lng_range[1]):.8f}",
"ELEVATION": f"{random.uniform(30.0, 2000.0):.4f}",
"BUILDUNIT": f"{area_name}地质工程勘察院",
"MONITORUNIT": f"{area_name}地质灾害监测中心",
"YWUNIT": "致力工程科技有限公司",
"SGDW": f"{area_name}地质工程有限公司",
"MANUFACTURER": random.choice(["华测检测", "中地装备", "中煤科工", "地质云"])
}
# 定义请求和响应模型
class QueryRequest(BaseModel): class QueryRequest(BaseModel):
key: str key: str
...@@ -45,110 +109,131 @@ class MonitorPoint(BaseModel): ...@@ -45,110 +109,131 @@ class MonitorPoint(BaseModel):
class QueryResponse(BaseModel): class QueryResponse(BaseModel):
type: int = 1 type: int = 1
resultcode: int resultcode: int = 1
message: str = "" message: str = ""
resultdata: List[MonitorPoint] resultdata: List[MonitorPoint]
def find_area_info(key: str) -> tuple[Optional[str], Optional[str], Optional[str]]: @app.post("/api/monitor/points", response_model=QueryResponse)
""" async def query_points(request: QueryRequest):
查找行政区划信息 """检测点查询接口"""
返回: (省, 市, 区/县) print(f"进入 query_points 接口, 查询监测点信息: {request.key}")
""" try:
# 移除可能包含的"省"、"市"、"区"等后缀 # 在预定义的区域数据中查找匹配项
search_key = key.replace("省", "").replace("市", "").replace("区", "").replace("县", "") points = []
for province_code, province_info in area_data.items():
# 在省级查找 if request.key in province_info["name"]:
for province, cities in area_data.items(): area_info = {"code": province_code, "name": province_info["name"]}
if search_key in province: num_points = random.randint(1, 5)
return province, None, None points.extend([generate_random_point(area_info) for _ in range(num_points)])
# 在市级查找
for city, districts in cities.items():
if search_key in city:
return province, city, None
# 在区级查找
for district in districts:
if search_key in district:
return province, city, district
return None, None, None
def generate_random_point(province: str, city: Optional[str] = None, district: Optional[str] = None) -> dict:
"""生成随机的检测点数据,根据行政区划信息生成相应的位置"""
# 确定行政区划信息
if not city:
city = random.choice(list(area_data[province].keys()))
if not district:
district = random.choice(area_data[province][city])
town = random.choice(towns) # 如果是省级查询,也生成下级区域的点位
village = random.choice(villages) for city_code, city_info in province_info["children"].items():
if request.key in province_info["name"]:
area_info = {"code": city_code, "name": city_info["name"]}
num_points = random.randint(1, 5)
points.extend([generate_random_point(area_info) for _ in range(num_points)])
# 根据不同省份设置不同的经纬度范围 response = QueryResponse(
latitude_ranges = { type=1,
"湖南省": (24.6, 30.2), resultcode=1,
"湖北省": (29.0, 33.2), message="" if points else "未找到匹配的行政区划",
"广东省": (20.2, 25.5) resultdata=points
} )
longitude_ranges = { print(f"查询监测点信息成功, 返回 {len(points)} 个点位")
"湖南省": (108.8, 114.2), return response
"湖北省": (108.3, 116.1),
"广东省": (109.7, 117.3)
}
lat_range = latitude_ranges.get(province, (20.0, 35.0)) except Exception as e:
lng_range = longitude_ranges.get(province, (108.0, 118.0)) print(f"查询监测点信息失败, 错误信息: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
return { class DeviceRateRequest(BaseModel):
"MONITORPOINTCODE": f"{random.randint(430000, 439999)}000{random.randint(100, 999)}", startDate: str
"MONITORPOINTNAME": f"{city}{district}{town}{village}地质灾害隐患点", endDate: str
"LOCATION": f"{province}{city}{district}{town}{village}", areaCode: str
"LATITUDE": f"{random.uniform(lat_range[0], lat_range[1]):.8f}",
"LONGITUDE": f"{random.uniform(lng_range[0], lng_range[1]):.8f}",
"ELEVATION": f"{random.uniform(30.0, 100.0):.4f}",
"BUILDUNIT": "省地质工程勘察院有限公司",
"MONITORUNIT": "省地质工程勘察院有限公司",
"YWUNIT": "致力工程科技有限公司",
"SGDW": None,
"MANUFACTURER": ""
}
@app.post("/api/monitor/points", response_model=QueryResponse) class DeviceRateItem(BaseModel):
async def query_points(request: QueryRequest): name: str
""" rate: str
检测点查询接口
- **key**: 行政区划名称(省/市/区县级别均可) class DeviceRateResponse(BaseModel):
""" type: int = 1
print(f"进入 query_points 接口, 查询监测点信息: {request.key}") resultcode: int = 1
message: str = ""
resultdata: List[DeviceRateItem]
otherinfo: Optional[Dict] = None
def generate_rate_data(area_code: str) -> List[DeviceRateItem]:
"""生成设备在线率数据"""
result_data = []
# 获取区域信息
area_info = get_area_info(area_code)
if not area_info:
return []
# 生成本级区域数据
if "自治州" in area_info["name"]:
base_rate = random.uniform(80, 90)
elif "新区" in area_info["name"]:
base_rate = random.uniform(50, 60)
elif area_info["name"].endswith("省"):
base_rate = random.uniform(80, 90)
elif area_info["name"].endswith("市"):
base_rate = random.uniform(85, 95)
else:
base_rate = random.uniform(75, 95)
result_data.append(DeviceRateItem(
name=area_info["name"],
rate=f"{base_rate:.2f}"
))
# 生成下级区域数据
sub_areas = get_sub_areas(area_code)
for sub_area in sub_areas:
if "自治州" in sub_area["name"]:
rate = random.uniform(80, 90)
elif "新区" in sub_area["name"]:
rate = random.uniform(50, 60)
else:
rate = random.uniform(85, 95)
result_data.append(DeviceRateItem(
name=sub_area["name"],
rate=f"{rate:.2f}"
))
return result_data
@app.post("/api/device/rate", response_model=DeviceRateResponse)
async def query_device_rate(request: DeviceRateRequest):
"""查询不同时间段不同地区设备在线率"""
print(f"进入 query_device_rate 接口, 查询参数: {request}")
try: try:
province, city, district = find_area_info(request.key) result_data = generate_rate_data(request.areaCode)
if not province: if not result_data:
return QueryResponse( return DeviceRateResponse(
type=1, type=1,
resultcode=1, resultcode=0,
message="未找到匹配的行政区划", message="未找到该地区数据",
resultdata=[] resultdata=[],
otherinfo=None
) )
# 生成1-5个随机检测点数据 response = DeviceRateResponse(
num_points = random.randint(1, 5)
points = [generate_random_point(province, city, district) for _ in range(num_points)]
response = QueryResponse(
type=1, type=1,
resultcode=1, resultcode=1,
message="", message="",
resultdata=points resultdata=result_data,
otherinfo=None
) )
print(f"查询监测点信息成功, 返回结果: {response}") print(f"查询设备在线率成功, 返回 {len(result_data)} 条数据")
return response return response
except Exception as e: except Exception as e:
print(f"查询监测点信息失败, 错误信息: {str(e)}") print(f"查询设备在线率失败, 错误信息: {str(e)}")
raise HTTPException( raise HTTPException(status_code=500, detail=str(e))
status_code=500,
detail=str(e)
)
if __name__ == "__main__": if __name__ == "__main__":
import uvicorn import uvicorn
......
import httpx
from typing import List, Optional, Dict
from pydantic import BaseModel
import asyncio
from urllib.parse import urljoin
class MonitorPoint(BaseModel):
MONITORPOINTCODE: str
MONITORPOINTNAME: str
LOCATION: str
LATITUDE: str
LONGITUDE: str
ELEVATION: str
BUILDUNIT: str
MONITORUNIT: str
YWUNIT: str
SGDW: Optional[str] = None
MANUFACTURER: str = ""
class QueryResponse(BaseModel):
type: int
resultcode: int
message: str
resultdata: List[MonitorPoint]
class MonitorClient:
def __init__(self, base_url: str = "http://localhost:5001"):
"""
初始化监测点查询客户端
Args:
base_url: API服务器基础URL
"""
self.base_url = base_url.rstrip('/')
self.timeout = 30.0
async def query_points(self, key: str) -> QueryResponse:
"""
异步查询监测点信息
Args:
key: 行政区划关键字(省/市/区县级别均可)
Returns:
QueryResponse: 查询响应对象
Raises:
httpx.HTTPError: 当HTTP请求失败时
ValueError: 当响应数据格式不正确时
"""
async with httpx.AsyncClient(timeout=self.timeout) as client:
url = urljoin(self.base_url, "/api/monitor/points")
response = await client.post(url, json={"key": key})
response.raise_for_status()
return QueryResponse(**response.json())
def query_points_sync(self, key: str) -> QueryResponse:
"""
同步查询监测点信息
Args:
key: 行政区划关键字(省/市/区县级别均可)
Returns:
QueryResponse: 查询响应对象
Raises:
httpx.HTTPError: 当HTTP请求失败时
ValueError: 当响应数据格式不正确时
"""
with httpx.Client(timeout=self.timeout) as client:
url = urljoin(self.base_url, "/api/monitor/points")
response = client.post(url, json={"key": key})
response.raise_for_status()
return QueryResponse(**response.json())
# 使用示例
async def example_async_usage():
client = MonitorClient()
try:
# 异步查询示例
response = await client.query_points("湖南")
if response.resultcode == 1 and response.resultdata:
for point in response.resultdata:
print(f"监测点名称: {point.MONITORPOINTNAME}")
print(f"位置: {point.LOCATION}")
print(f"经纬度: {point.LONGITUDE}, {point.LATITUDE}")
print("---")
except httpx.HTTPError as e:
print(f"HTTP请求错误: {e}")
except Exception as e:
print(f"发生错误: {e}")
def example_sync_usage():
client = MonitorClient()
try:
# 同步查询示例
response = client.query_points_sync("长沙")
if response.resultcode == 1 and response.resultdata:
for point in response.resultdata:
print(f"监测点名称: {point.MONITORPOINTNAME}")
print(f"位置: {point.LOCATION}")
print(f"经纬度: {point.LONGITUDE}, {point.LATITUDE}")
print("---")
except httpx.HTTPError as e:
print(f"HTTP请求错误: {e}")
except Exception as e:
print(f"发生错误: {e}")
if __name__ == "__main__":
# 异步调用示例
asyncio.run(example_async_usage())
# 同步调用示例
example_sync_usage()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment