from .k_db import PostgresDB # paragraph_id BIGSERIAL primary key, TABLE_TXT_DOC = """ create table txt_doc ( hash varchar(40) primary key, text text not null, matadate text ); """ TABLE_TXT_DOC_HASH_INDEX = """ CREATE UNIQUE INDEX hash_index ON txt_doc (hash); """ # CREATE UNIQUE INDEX idx_name ON your_table (column_name); class TxtDoc: def __init__(self, db: PostgresDB) -> None: self.db = db def insert(self, texts): query = f"INSERT INTO txt_doc(hash,text,matadate) VALUES " args = [] for value in texts: value = list(value) query += "(%s,%s,%s)," args.extend(value) query = query[:len(query) - 1] query += f"ON conflict(hash) DO UPDATE SET text = EXCLUDED.text;" self.db.execute_args(query, args) def delete(self, ids): for item in ids: query = f"delete FROM txt_doc WHERE hash = %s" % item self.db.execute(query) def search(self, item): query = "SELECT text,matadate FROM txt_doc WHERE hash = %s" self.db.execute_args(query, [item]) answer = self.db.fetchall() if len(answer) > 0: return answer[0] else: return None # return Document(page_content=self.db.fetchall()[0][0], metadata=dict(page=self.db.fetchall()[0][1])) # answer = self.db.fetchall()[0][0] # return answer def create_table(self): query = f"SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'txt_doc')" self.db.execute(query) exists = self.db.fetchall()[0][0] if not exists: query = TABLE_TXT_DOC self.db.execute(query) # self.db.execute(TABLE_TXT_DOC_HASH_INDEX) def drop_table(self): query = f"SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'txt_doc')" self.db.execute(query) exists = self.db.fetchall()[0][0] if exists: query = "DROP TABLE txt_doc" self.db.format(query) print("drop table txt_doc ok") def find_like_doc(self,item:str): query = "select text,matadate FROM txt_doc WHERE matadate like '%"+item+"%' or text like '%"+item+"%' " self.db.execute(query) answer = self.db.fetchall() if len(answer) > 0: return answer else: return None