Commit 9d8ee0af by 陈正乐

会话相关数据库设计实现

parent bfd1cd58
# =============================
# 资料存储数据库配置
# =============================
VEC_DB_HOST = 'localhost'
VEC_DB_DBNAME='lae'
VEC_DB_USER='postgres'
VEC_DB_PASSWORD='chenzl'
VEC_DB_PORT='5432'
# =============================
# 聊天相关数据库配置
# =============================
CHAT_DB_HOST = 'localhost'
CHAT_DB_DBNAME='laechat'
CHAT_DB_USER='postgres'
CHAT_DB_PASSWORD='chenzl'
CHAT_DB_PORT='5432'
# =============================
# 向量化模型路径配置
# =============================
EMBEEDING_MODEL_PATH = 'C:\\Users\\15663\\AI\\models\\bge-large-zh-v1.5'
# =============================
# 模型服务URL配置
# =============================
LLM_SERVER_URL = '192.168.10.102:8002'
# =============================
# FAISS相似性查找配置
# =============================
SIMILARITY_SHOW_NUMBER = 5
SIMILARITY_THRESHOLD = 0.8
# =============================
# FAISS向量库文件存储路径配置
# =============================
FAISS_STORE_PATH = '../faiss'
KNOWLEDGE_PATH = 'C:\\Users\\15663\\Desktop\\work\\llm_gjjs\\兴火燎原知识库\\兴火燎原知识库\\law\\pdf'
INDEX_NAME = 'know'
\ No newline at end of file
INDEX_NAME = 'know'
# =============================
# 知识相关资料配置
# =============================
KNOWLEDGE_PATH = 'C:\\Users\\15663\\Desktop\\work\\llm_gjjs\\兴火燎原知识库\\兴火燎原知识库\\law\\pdf'
\ No newline at end of file
"""会话信息相关表"""
\ No newline at end of file
import psycopg2
from psycopg2 import OperationalError, InterfaceError
class UPostgresDB:
'''
psycopg2.connect(
dsn #指定连接参数。可以使用参数形式或 DSN 形式指定。
host #指定连接数据库的主机名。
dbname #指定数据库名。
user #指定连接数据库使用的用户名。
password #指定连接数据库使用的密码。
port #指定连接数据库的端口号。
connection_factory #指定创建连接对象的工厂类。
cursor_factory #指定创建游标对象的工厂类。
async_ #指定是否异步连接(默认False)。
sslmode #指定 SSL 模式。
sslrootcert #指定证书文件名。
sslkey #指定私钥文件名。
sslcert #指定公钥文件名。
)
'''
def __init__(self, host, database, user, password,port = 5432):
self.host = host
self.database = database
self.user = user
self.password = password
self.port = port
self.conn = None
self.cur = None
def connect(self):
try:
self.conn = psycopg2.connect(
host=self.host,
database=self.database,
user=self.user,
password=self.password,
port = self.port
)
self.cur = self.conn.cursor()
except Exception as e:
print(f"连接数据库出现错误: {e}")
def execute(self, query):
try:
if self.conn is None or self.conn.closed:
self.connect()
self.cur.execute(query)
self.conn.commit()
except InterfaceError as e:
print(f"数据库连接已经关闭: {e}")
except OperationalError as e:
print(f"数据库连接出现问题: {e}")
self.connect()
self.retry_execute(query)
except Exception as e:
print(f"执行sql语句出现错误: {e}")
self.conn.rollback()
def retry_execute(self, query):
try:
self.cur.execute(query)
self.conn.commit()
except Exception as e:
print(f"重新执行sql语句再次出现错误: {type(e).__name__}: {e}")
self.conn.rollback()
def execute_args(self, query, args):
try:
if self.conn is None or self.conn.closed:
self.connect()
self.cur.execute(query, args)
self.conn.commit()
except InterfaceError as e:
print(f"数据库连接已经关闭: {e}")
except OperationalError as e:
print(f"数据库操作出现问题: {e}")
self.connect()
self.retry_execute_args(query, args)
except Exception as e:
print(f"执行sql语句出现错误: {e}")
self.conn.rollback()
def retry_execute_args(self, query, args):
try:
self.cur.execute(query, args)
self.conn.commit()
except Exception as e:
print(f"重新执行sql语句再次出现错误: {type(e).__name__}: {e}")
self.conn.rollback()
def search(self, query, params=None):
if self.conn is None or self.conn.closed:
self.connect()
self.cur.execute(query, params)
def fetchall(self):
return self.cur.fetchall()
def fetchone(self):
return self.cur.fetchone()
def close(self):
self.cur.close()
self.conn.close()
def format(self, query):
try:
if self.conn is None or self.conn.closed:
self.connect()
self.cur.execute(query)
self.conn.commit()
except Exception as e:
print(f"An error occurred: {e}")
self.conn.rollback()
from .c_db import UPostgresDB
import json
TABLE_USER = """
DROP TABLE IF EXISTS "c_user";
CREATE TABLE c_user (
user_id varchar(1000) PRIMARY KEY,
account varchar(20) NOT NULL,
password varchar(50) NOT NULL
);
COMMENT ON COLUMN "c_user"."user_id" IS '用户id';
COMMENT ON COLUMN "c_user"."account" IS '用户帐户';
COMMENT ON COLUMN "c_user"."password" IS '用户密码';
COMMENT ON TABLE "c_user" IS '用户表';
"""
class CUser:
def __init__(self, db: UPostgresDB) -> None:
self.db = db
def insert(self, value):
query = f"INSERT INTO c_user(user_id, account, password) VALUES (%s,%s,%s)"
self.db.execute_args(query, ((value[0],value[1],value[2])))
def create_table(self):
query = TABLE_USER
self.db.execute(query)
\ No newline at end of file
from .c_db import UPostgresDB
import json
TABLE_CHAT = """
DROP TABLE IF EXISTS "chat";
CREATE TABLE chat (
chat_id varchar(1000) PRIMARY KEY,
user_id int,
info text,
create_time timestamp(6) DEFAULT current_timestamp,
deleted int2
);
COMMENT ON COLUMN "chat"."chat_id" IS '会话id';
COMMENT ON COLUMN "chat"."user_id" IS '会话创建用户id';
COMMENT ON COLUMN "chat"."info" IS '会话简介';
COMMENT ON COLUMN "chat"."create_time" IS '会话创建时间,默认为当前时间';
COMMENT ON COLUMN "chat"."deleted" IS '是否删除:0=否,1=是';
COMMENT ON TABLE "chat" IS '会话信息表';
"""
class Chat:
def __init__(self, db: UPostgresDB) -> None:
self.db = db
# 插入数据
def insert(self, value):
query = f"INSERT INTO chat(chat_id, user_id, info, deleted) VALUES (%s,%s,%s,%s)"
self.db.execute_args(query, ((value[0],value[1],value[2],value[3])))
# 创建表
def create_table(self):
query = TABLE_CHAT
self.db.execute(query)
\ No newline at end of file
from .c_db import UPostgresDB
import json
TABLE_CHAT = """
create table chat (
id varchar(1000) primary key,
user_id int,
info text,
chat_type_id int,
create_time date,
history json,
is_delete int,
status int
);
"""
class Chat:
def __init__(self, db: UPostgresDB) -> None:
self.db = db
def insert(self, value):
value[5] = json.dumps(value[5])
query = f"INSERT INTO chat(id,user_id,info,chat_type_id,create_time,history,is_delete,status) VALUES (%s,%s,%s,%s,%s,%s,%s,%s)"
self.db.execute_args(query, ((value[0],value[1],value[2],value[3],value[4],value[5],value[6],value[7])))
def delete_update(self,id):
query = f"UPDATE chat SET is_delete = 1 WHERE id = %s"
self.db.execute_args(query, (id,))
def search(self, id):
query = f"SELECT chat.id,user_id,info,t.name as type,chat.create_time,history,is_delete,status FROM chat left join chat_type t on t.id=chat.chat_type_id WHERE chat.id = %s and is_delete=0 ORDER BY chat.create_time DESC;"
self.db.execute_args(query, (id,))
answer = self.db.fetchall()
if len(answer) > 0:
return answer[0]
else:
return None
def manual_search(self, id):
query = f"SELECT user_id,status,history FROM chat WHERE id = %s and is_delete=0"
self.db.execute_args(query, (id,))
answer = self.db.fetchall()
if len(answer) > 0:
return answer[0]
else:
return None
def qa_search(self, id):
query = f"SELECT user_id,info,t.name as type,chat.create_time,history,status FROM chat left join chat_type t on t.id=chat.chat_type_id WHERE chat.id = %s and is_delete=0 ORDER BY chat.create_time DESC;"
self.db.execute_args(query, (id,))
answer = self.db.fetchall()
if len(answer) > 0:
return answer[0]
else:
return None
def detail_search(self, id):
query = f"SELECT chat.id,user_id,info,t.name as type,chat.create_time,history,status FROM chat left join chat_type t on t.id=chat.chat_type_id WHERE chat.id = %s and is_delete=0 ORDER BY chat.create_time DESC;"
self.db.execute_args(query, (id,))
answer = self.db.fetchall()
if len(answer) > 0:
return answer[0]
else:
return None
def delete_search(self, id):
query = f"SELECT user_id FROM chat WHERE id = %s and is_delete=0"
self.db.execute_args(query, (id,))
answer = self.db.fetchall()
if len(answer) > 0:
return answer[0]
else:
return None
def list_chat_search(self, chat_type, user_id):
query = f"SELECT chat.id,info,t.name as type,chat.create_time,history,chat.status FROM chat left join chat_type t on t.id=chat.chat_type_id WHERE t.name = %s AND is_delete=0 AND user_id = %s ORDER BY chat.create_time DESC;"
self.db.execute_args(query, (chat_type, user_id))
answer = self.db.fetchall()
if len(answer) > 0:
return answer
else:
return None
def search_history(self,id):
query = f"SELECT history FROM chat WHERE id = %s ORDER BY chat.create_time DESC;"
self.db.execute_args(query, (id,))
answer = self.db.fetchall()
if len(answer) > 0:
answer[0] = answer[0][0]
return answer[0]
else:
return None
def get_last_q(self):
query = f"SELECT id,create_time,info,chat_type_id FROM chat ORDER BY create_time DESC LIMIT 1 "
self.db.execute(query)
answer = self.db.fetchall()
if len(answer) > 0:
return answer[0]
else:
return None
def create_table(self):
query = f"SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'chat')"
self.db.execute(query)
exists = self.db.fetchall()[0][0]
if not exists:
query = TABLE_CHAT
self.db.execute(query)
def drop_table(self):
query = f"SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'chat')"
self.db.execute(query)
exists = self.db.fetchall()[0][0]
if exists:
query = "DROP TABLE chat"
self.db.format(query)
print("drop table chat ok")
def update(self, chat_id, history):
history = json.dumps(history)
query = f"UPDATE chat SET history = %s WHERE id = %s"
self.db.execute_args(query, (history,chat_id))
def history_update(self, chat_id, history):
history = json.dumps(history)
query = f"UPDATE chat SET history = %s WHERE id = %s"
self.db.execute_args(query, (history,chat_id))
def search_type_id(self, chat_id):
query = f"select chat_type_id from chat where id = %s"
self.db.execute_args(query, (chat_id,))
answer = self.db.fetchall()
if len(answer) > 0:
return answer[0][0]
else:
return None
def get_chat_status(self, chat_id):
query = f"Select status from chat where id = %s"
self.db.execute_args(query, (chat_id,))
answer = self.db.fetchall()
if len(answer) > 0:
return answer[0][0]
else:
return None
def set_chat_status(self, chat_id, status):
query = f"update chat set status = %s where id = %s"
self.db.execute_args(query, (status,chat_id))
\ No newline at end of file
from .c_db import UPostgresDB
import json
TABLE_CHAT = """
DROP TABLE IF EXISTS "turn_qa";
CREATE TABLE turn_qa (
turn_id varchar(1000) PRIMARY KEY,
chat_id varchar(1000),
question text,
answer text,
create_time timestamp(6) DEFAULT current_timestamp,
turn_number int,
is_last int2
);
COMMENT ON COLUMN "turn_qa"."turn_id" IS '会话轮次id';
COMMENT ON COLUMN "turn_qa"."chat_id" IS '会话id';
COMMENT ON COLUMN "turn_qa"."question" IS '该轮会话问题';
COMMENT ON COLUMN "turn_qa"."answer" IS '该轮会话答案';
COMMENT ON COLUMN "turn_qa"."create_time" IS '该轮会话创建时间,默认为当前时间';
COMMENT ON COLUMN "turn_qa"."turn_number" IS '会话轮数';
COMMENT ON COLUMN "turn_qa"."is_last" IS '是否为最后一轮对话:0=否,1=是';
COMMENT ON TABLE "turn_qa" IS '会话轮次信息表';
"""
class TurnQa:
def __init__(self, db: UPostgresDB) -> None:
self.db = db
# 插入数据
def insert(self, value):
query = f"INSERT INTO turn_qa(turn_id, chat_id, question, answer, turn_number, is_last) VALUES (%s,%s,%s,%s,%s,%s)"
self.db.execute_args(query, ((value[0],value[1],value[2],value[3],value[4],value[5])))
# 创建表
def create_table(self):
query = TABLE_CHAT
self.db.execute(query)
\ No newline at end of file
import sys
sys.path.append("../")
from src.pgdb.chat.c_db import UPostgresDB
from src.pgdb.chat.chat_table import Chat
from src.pgdb.chat.c_user_table import CUser
from src.pgdb.chat.turn_qa_table import TurnQa
"""测试会话相关数据可的连接"""
c_db = UPostgresDB(host="localhost", database="laechat", user="postgres", password="chenzl", port=5432)
chat = Chat(db=c_db)
c_user = CUser(db=c_db)
turn_qa = TurnQa(db=c_db)
chat.create_table()
c_user.create_table()
turn_qa.create_table()
# chat_id, user_id, info, deleted
chat.insert(["3333", "1111", "没有info", 0])
# user_id, account, password
c_user.insert(["111", "zhangsan", "111111"])
# turn_id, chat_id, question, answer, turn_number, is_last
turn_qa.insert(["222", "1111", "nihao", "nihao", 1, 0])
\ No newline at end of file
......@@ -26,14 +26,14 @@ class localCallback(BaseCallback):
return True
return (len(title+content) / (len(title.splitlines())+len(content.splitlines())) < 20) or "思考题" in title
"""测试资料入库(pgsql和faiss)"""
def test_faiss_from_dir():
vecstore_faiss = VectorStore_FAISS(
embedding_model_name=EMBEEDING_MODEL_PATH,
store_path=FAISS_STORE_PATH,
index_name=INDEX_NAME,
info={"port":VEC_DB_PORT,"host":VEC_DB_HOST,"dbname":VEC_DB_DBNAME,"username":VEC_DB_USER,"password":VEC_DB_PASSWORD},
show_number=3,
show_number=SIMILARITY_SHOW_NUMBER,
reset=True)
docs = loads_path(KNOWLEDGE_PATH,mode="paged",sentence_size=512,callbacks=[localCallback()])
print(len(docs))
......@@ -60,7 +60,7 @@ def test_faiss_from_dir():
print(vecstore_faiss._faiss.index.ntotal)
vecstore_faiss._save_local()
"""测试faiss向量数据库查询结果"""
def test_faiss_load():
vecstore_faiss = VectorStore_FAISS(
embedding_model_name=EMBEEDING_MODEL_PATH,
......@@ -69,9 +69,9 @@ def test_faiss_load():
info={"port":VEC_DB_PORT,"host":VEC_DB_HOST,"dbname":VEC_DB_DBNAME,"username":VEC_DB_USER,"password":VEC_DB_PASSWORD},
show_number=SIMILARITY_SHOW_NUMBER,
reset=False)
print(vecstore_faiss._join_document(vecstore_faiss.get_text_similarity("请介绍一下你理解的国际结算业务")))
print(vecstore_faiss._join_document(vecstore_faiss.get_text_similarity("征信业务有什么情况")))
if __name__ == "__main__":
test_faiss_from_dir()
# test_faiss_from_dir()
test_faiss_load()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment