txt_doc_table.py 2.07 KB
Newer Older
1
from .k_db import PostgresDB
陈正乐 committed
2 3

# paragraph_id BIGSERIAL primary key,
4 5 6 7 8 9 10 11 12 13 14
TABLE_TXT_DOC = """
create table txt_doc (
    hash varchar(40) primary key, 
    text text not null,
    matadate text
);
"""
TABLE_TXT_DOC_HASH_INDEX = """
CREATE UNIQUE INDEX hash_index ON txt_doc (hash);

"""
陈正乐 committed
15 16


17 18 19 20 21 22 23 24 25 26
# CREATE UNIQUE INDEX idx_name ON your_table (column_name);
class TxtDoc:
    def __init__(self, db: PostgresDB) -> None:
        self.db = db

    def insert(self, texts):
        query = f"INSERT INTO txt_doc(hash,text,matadate) VALUES "
        args = []
        for value in texts:
            value = list(value)
陈正乐 committed
27
            query += "(%s,%s,%s),"
28
            args.extend(value)
陈正乐 committed
29
        query = query[:len(query) - 1]
30
        query += f"ON conflict(hash) DO UPDATE SET text = EXCLUDED.text;"
陈正乐 committed
31 32 33 34 35
        self.db.execute_args(query, args)

    def delete(self, ids):
        for item in ids:
            query = f"delete FROM txt_doc WHERE hash = %s" % item
36
            self.db.execute(query)
陈正乐 committed
37 38

    def search(self, item):
39
        query = "SELECT text,matadate FROM txt_doc WHERE hash = %s"
陈正乐 committed
40
        self.db.execute_args(query, [item])
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
        answer = self.db.fetchall()
        if len(answer) > 0:
            return answer[0]
        else:
            return None
        #     return Document(page_content=self.db.fetchall()[0][0], metadata=dict(page=self.db.fetchall()[0][1]))
        # answer = self.db.fetchall()[0][0]
        # return answer

    def create_table(self):
        query = f"SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'txt_doc')"
        self.db.execute(query)
        exists = self.db.fetchall()[0][0]
        if not exists:
            query = TABLE_TXT_DOC
            self.db.execute(query)
            # self.db.execute(TABLE_TXT_DOC_HASH_INDEX)

    def drop_table(self):
        query = f"SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'txt_doc')"
        self.db.execute(query)
        exists = self.db.fetchall()[0][0]
        if exists:
            query = "DROP TABLE txt_doc"
            self.db.format(query)
            print("drop table txt_doc ok")