Commit c65bb953 by tinywell

mock 剔除版本管理

parent 3cce519e
...@@ -61,3 +61,5 @@ ccc/ ...@@ -61,3 +61,5 @@ ccc/
.env .env
lae_pg_data lae_pg_data
tmp tmp
mock
\ No newline at end of file
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import random
from typing import List, Optional, Dict
app = FastAPI(title="Monitor Points API")
# 行政区划数据结构
area_data = {
"520000": {
"name": "贵州省",
"children": {
"520100": {"name": "贵阳市"},
"520200": {"name": "六盘水市"},
"520300": {"name": "遵义市"},
"520400": {"name": "安顺市"},
"522401": {"name": "毕节市"},
"520600": {"name": "铜仁市"},
"520700": {"name": "双龙新区"},
"522300": {"name": "黔西南布依族苗族自治州"},
"522600": {"name": "黔东南苗族侗族自治州"},
"522700": {"name": "黔南布依族苗族自治州"}
}
},
# 可以继续添加其他省份数据...
}
# 用于生成随机地名的组件
towns = ["茶山镇", "李畋镇", "泗汾镇", "东富镇", "新华镇", "龙泉镇", "石桥镇", "金山镇", "阳光镇", "河东镇"]
villages = ["大西垅村", "石桥村", "龙门村", "青山村", "凤凰村", "红星村", "新华村", "和平村", "兴隆村", "富民村"]
# 经纬度范围配置
region_coordinates = {
"贵州省": {"lat": (24.6, 29.1), "lng": (103.6, 109.6)},
# 可以继续添加其他省份的经纬度范围...
}
def get_area_info(area_code: str) -> Optional[Dict]:
"""获取区域信息"""
province_code = area_code[:2] + "0000"
if province_code not in area_data:
return None
if area_code == province_code:
return {"code": area_code, "name": area_data[province_code]["name"]}
for city_code, city_info in area_data[province_code]["children"].items():
if city_code == area_code:
return {"code": area_code, "name": city_info["name"]}
return None
def get_sub_areas(area_code: str) -> List[Dict]:
"""获取下级区域列表"""
province_code = area_code[:2] + "0000"
if province_code not in area_data:
return []
if area_code == province_code:
return [
{"code": code, "name": info["name"]}
for code, info in area_data[province_code]["children"].items()
]
return []
def generate_random_point(area_info: Dict) -> dict:
"""生成随机的检测点数据"""
area_name = area_info["name"]
area_code = area_info["code"]
# 获取经纬度范围
coords = region_coordinates.get(area_name, {"lat": (24.6, 29.1), "lng": (103.6, 109.6)})
lat_range = coords["lat"]
lng_range = coords["lng"]
town = random.choice(towns)
village = random.choice(villages)
return {
"MONITORPOINTCODE": f"{area_code[:2]}{random.randint(1000, 9999)}000{random.randint(100, 999)}",
"MONITORPOINTNAME": f"{area_name}{town}{village}地质灾害隐患点",
"LOCATION": f"{area_name}{town}{village}",
"LATITUDE": f"{random.uniform(lat_range[0], lat_range[1]):.8f}",
"LONGITUDE": f"{random.uniform(lng_range[0], lng_range[1]):.8f}",
"ELEVATION": f"{random.uniform(30.0, 2000.0):.4f}",
"BUILDUNIT": f"{area_name}地质工程勘察院",
"MONITORUNIT": f"{area_name}地质灾害监测中心",
"YWUNIT": "致力工程科技有限公司",
"SGDW": f"{area_name}地质工程有限公司",
"MANUFACTURER": random.choice(["华测检测", "中地装备", "中煤科工", "地质云"])
}
class QueryRequest(BaseModel):
key: str
class MonitorPoint(BaseModel):
MONITORPOINTCODE: str
MONITORPOINTNAME: str
LOCATION: str
LATITUDE: str
LONGITUDE: str
ELEVATION: str
BUILDUNIT: str
MONITORUNIT: str
YWUNIT: str
SGDW: Optional[str] = None
MANUFACTURER: str = ""
class QueryResponse(BaseModel):
type: int = 1
resultcode: int = 1
message: str = ""
resultdata: List[MonitorPoint]
@app.post("/cigem/getMonitorPointAll", response_model=QueryResponse)
async def query_points(request: QueryRequest):
"""检测点查询接口"""
print(f"进入 query_points 接口, 查询监测点信息: {request.key}")
try:
# 在预定义的区域数据中查找匹配项
points = []
for province_code, province_info in area_data.items():
if request.key in province_info["name"]:
area_info = {"code": province_code, "name": province_info["name"]}
num_points = random.randint(1, 5)
points.extend([generate_random_point(area_info) for _ in range(num_points)])
# 如果是省级查询,也生成下级区域的点位
for city_code, city_info in province_info["children"].items():
if request.key in province_info["name"]:
area_info = {"code": city_code, "name": city_info["name"]}
num_points = random.randint(1, 5)
points.extend([generate_random_point(area_info) for _ in range(num_points)])
response = QueryResponse(
type=1,
resultcode=1,
message="" if points else "未找到匹配的行政区划",
resultdata=points
)
print(f"查询监测点信息成功, 返回 {len(points)} 个点位")
return response
except Exception as e:
print(f"查询监测点信息失败, 错误信息: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
class DeviceRateRequest(BaseModel):
startDate: str
endDate: str
areaCode: str
class DeviceRateItem(BaseModel):
name: str
rate: float
class DeviceRateResponse(BaseModel):
type: int = 1
resultcode: int = 1
message: str = ""
resultdata: List[DeviceRateItem]
otherinfo: Optional[Dict] = None
def generate_rate_data(area_code: str) -> List[DeviceRateItem]:
"""生成设备在线率数据"""
result_data = []
# 获取区域信息
area_info = get_area_info(area_code)
if not area_info:
return []
# 生成本级区域数据
if "自治州" in area_info["name"]:
base_rate = random.uniform(80, 90)
elif "新区" in area_info["name"]:
base_rate = random.uniform(50, 60)
elif area_info["name"].endswith("省"):
base_rate = random.uniform(80, 90)
elif area_info["name"].endswith("市"):
base_rate = random.uniform(85, 95)
else:
base_rate = random.uniform(75, 95)
result_data.append(DeviceRateItem(
name=area_info["name"],
rate=f"{base_rate:.2f}"
))
# 生成下级区域数据
sub_areas = get_sub_areas(area_code)
for sub_area in sub_areas:
if "自治州" in sub_area["name"]:
rate = random.uniform(80, 90)
elif "新区" in sub_area["name"]:
rate = random.uniform(50, 60)
else:
rate = random.uniform(85, 95)
result_data.append(DeviceRateItem(
name=sub_area["name"],
rate=rate
))
return result_data
@app.post("/cigem/getAvgOnlineRate", response_model=DeviceRateResponse)
async def query_device_rate(request: DeviceRateRequest):
"""查询不同时间段不同地区设备在线率"""
print(f"进入 query_device_rate 接口, 查询参数: {request}")
try:
result_data = generate_rate_data(request.areaCode)
if not result_data:
return DeviceRateResponse(
type=1,
resultcode=0,
message="未找到该地区数据",
resultdata=[],
otherinfo=None
)
response = DeviceRateResponse(
type=1,
resultcode=1,
message="",
resultdata=result_data,
otherinfo=None
)
print(f"查询设备在线率成功, 返回 {len(result_data)} 条数据")
return response
except Exception as e:
print(f"查询设备在线率失败, 错误信息: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
# 添加厂商数据结构
manufacturer_data = [
{"name": "中物九联", "fullname": "中物九联(成都)科技有限责任公司", "base_rate": (95, 100)},
{"name": "沃特兰德", "fullname": "北京沃特兰德科技有限公司", "base_rate": (95, 100)},
{"name": "川德克斯", "fullname": "四川德克斯科技有限公司", "base_rate": (95, 100)},
{"name": "武汉中地恒", "fullname": "武汉中地恒达科技有限公司", "base_rate": (95, 100)},
{"name": "智联时空", "fullname": "深圳市智联时空科技有限公司", "base_rate": (95, 100)},
{"name": "成都来回", "fullname": "成都来回科技有限公司", "base_rate": (90, 99.5)},
{"name": "诚飞科技", "fullname": "四川诚飞信息科技有限公司", "base_rate": (90, 99)},
{"name": "京工高科", "fullname": "京工高科成都光电有限公司", "base_rate": (90, 98)},
{"name": "中移物联", "fullname": "中移物联网有限公司", "base_rate": (85, 95)},
{"name": "华测检测", "fullname": "华测检测认证集团股份有限公司", "base_rate": (80, 95)},
{"name": "中地装备", "fullname": "中地装备工程有限公司", "base_rate": (75, 90)},
{"name": "中煤科工", "fullname": "中煤科工集团重庆研究院有限公司", "base_rate": (70, 90)},
{"name": "地质云", "fullname": "地质云科技有限公司", "base_rate": (60, 85)},
]
province_data = [
{"name": "贵州", "fullname": "贵州省", "base_rate": (75, 95)},
{"name": "四川", "fullname": "四川省", "base_rate": (75, 95)},
{"name": "云南", "fullname": "云南省", "base_rate": (75, 95)},
{"name": "重庆", "fullname": "重庆市", "base_rate": (75, 95)},
{"name": "陕西", "fullname": "陕西省", "base_rate": (75, 95)},
{"name": "甘肃", "fullname": "甘肃省", "base_rate": (75, 95)},
{"name": "青海", "fullname": "青海省", "base_rate": (75, 95)},
{"name": "宁夏", "fullname": "宁夏回族自治区", "base_rate": (75, 95)},
{"name": "新疆", "fullname": "新疆维吾尔自治区", "base_rate": (75, 95)},
]
# 添加请求模型
class DeviceRateRankingRequest(BaseModel):
type: str # "1" 为省份排名,"2" 为厂商排名
class RankingItem(BaseModel):
name: str
fullname: str
onlineRate: float
class RankingResponse(BaseModel):
type: int = 1
resultcode: int = 1
message: str = ""
resultdata: List[RankingItem]
otherinfo: Optional[Dict] = None
def generate_rate_ranking_data_by_manufacturer() -> List[RankingItem]:
"""生成厂商设备在线率排名数据"""
result_data = []
# 为每个厂商生成在线率
for mfr in manufacturer_data:
base_min, base_max = mfr["base_rate"]
rate = round(random.uniform(base_min, base_max), 2)
result_data.append(RankingItem(
name=mfr["name"],
fullname=mfr["fullname"],
onlineRate=rate
))
# 按在线率降序排序
result_data.sort(key=lambda x: x.onlineRate, reverse=True)
return result_data
def generate_rate_ranking_data_by_province() -> List[RankingItem]:
"""生成省份设备在线率排名数据"""
result_data = []
# 为每个省份生成在线率
for province in province_data:
# 生成一个基准在线率(75-95之间)
rate = round(random.uniform(75, 95), 2)
result_data.append(RankingItem(
name=province["name"],
fullname=province["fullname"],
onlineRate=rate
))
# 按在线率降序排序
result_data.sort(key=lambda x: x.onlineRate, reverse=True)
return result_data
@app.post("/cigem/getOnlineRateRank", response_model=RankingResponse)
async def query_device_rate_ranking(request: DeviceRateRankingRequest):
"""
查询设备在线率排名
Args:
request: DeviceRateRankingRequest
- type: 排名类型 "1"省份排名,"2"厂商排名
- startDate: 开始日期(可选)
- endDate: 结束日期(可选)
- areaCode: 区域代码(可选)
Returns:
RankingResponse:
- type: 固定值1
- resultcode: 1成功,0失败
- message: 响应消息
- resultdata: 排名数据列表
- otherinfo: 其他信息(可选)
"""
print(f"进入 query_device_rate_ranking 接口, 查询参数: {request}")
try:
if request.type == "2": # 厂商排名
result_data = generate_rate_ranking_data_by_manufacturer()
else: # 省份排名
result_data = generate_rate_ranking_data_by_province()
response = RankingResponse(
type=1,
resultcode=1,
message="",
resultdata=result_data,
otherinfo=None
)
print(f"查询设备在线率排名成功, 返回 {len(result_data)} 条数据")
return response
except Exception as e:
print(f"查询设备在线率排名失败, 错误信息: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=5001)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment