From c5ea37cd301d9043acdf46ce545ff22308a2aa5f Mon Sep 17 00:00:00 2001 From: KevinHuSh <kevinhu.sh@gmail.com> Date: Wed, 7 Feb 2024 19:27:23 +0800 Subject: [PATCH] Add resume parser and fix bugs (#59) * Update .gitignore * Update .gitignore * Add resume parser and fix bugs --- .gitignore | 4 + api/apps/chunk_app.py | 12 ++- api/apps/conversation_app.py | 65 +++++++++++- api/apps/document_app.py | 8 +- api/db/db_models.py | 4 +- api/db/services/dialog_service.py | 1 + api/db/services/knowledgebase_service.py | 28 +++++ api/utils/file_utils.py | 36 ++++++- rag/app/paper.py | 1 - rag/app/resume.py | 102 ++++++++++++++++++ rag/app/table.py | 52 ++++++---- rag/nlp/search.py | 23 +++- rag/nlp/surname.py | 127 +++++++++++++++++++++++ rag/svr/task_broker.py | 12 ++- rag/svr/task_executor.py | 11 +- rag/utils/es_conn.py | 22 +++- 16 files changed, 451 insertions(+), 57 deletions(-) create mode 100644 rag/app/resume.py create mode 100644 rag/nlp/surname.py diff --git a/.gitignore b/.gitignore index fa70b5d..93c4cb4 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,10 @@ debug/ target/ __pycache__/ +hudet/ +cv/ +layout_app.py +resume/ # Remove Cargo.lock from gitignore if creating an executable, leave it for libraries # More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html diff --git a/api/apps/chunk_app.py b/api/apps/chunk_app.py index 9a5a168..11d60d5 100644 --- a/api/apps/chunk_app.py +++ b/api/apps/chunk_app.py @@ -47,17 +47,20 @@ def list(): tenant_id = DocumentService.get_tenant_id(req["doc_id"]) if not tenant_id: return get_data_error_result(retmsg="Tenant not found!") + e, doc = DocumentService.get_by_id(doc_id) + if not e: + return get_data_error_result(retmsg="Document not found!") query = { "doc_ids": [doc_id], "page": page, "size": size, "question": question } if "available_int" in req: query["available_int"] = int(req["available_int"]) sres = retrievaler.search(query, search.index_name(tenant_id)) - res = {"total": sres.total, "chunks": []} + res = {"total": sres.total, "chunks": [], "doc": doc.to_dict()} for id in sres.ids: d = { "chunk_id": id, - "content_with_weight": rmSpace(sres.highlight[id]) if question else sres.field[id]["content_with_weight"], + "content_with_weight": rmSpace(sres.highlight[id]) if question else sres.field[id].get("content_with_weight", ""), "doc_id": sres.field[id]["doc_id"], "docnm_kwd": sres.field[id]["docnm_kwd"], "important_kwd": sres.field[id].get("important_kwd", []), @@ -110,7 +113,7 @@ def get(): "important_kwd") def set(): req = request.json - d = {"id": req["chunk_id"]} + d = {"id": req["chunk_id"], "content_with_weight": req["content_with_weight"]} d["content_ltks"] = huqie.qie(req["content_with_weight"]) d["content_sm_ltks"] = huqie.qieqie(d["content_ltks"]) d["important_kwd"] = req["important_kwd"] @@ -181,11 +184,12 @@ def create(): md5 = hashlib.md5() md5.update((req["content_with_weight"] + req["doc_id"]).encode("utf-8")) chunck_id = md5.hexdigest() - d = {"id": chunck_id, "content_ltks": huqie.qie(req["content_with_weight"])} + d = {"id": chunck_id, "content_ltks": huqie.qie(req["content_with_weight"]), "content_with_weight": req["content_with_weight"]} d["content_sm_ltks"] = huqie.qieqie(d["content_ltks"]) d["important_kwd"] = req.get("important_kwd", []) d["important_tks"] = huqie.qie(" ".join(req.get("important_kwd", []))) d["create_time"] = str(datetime.datetime.now()).replace("T", " ")[:19] + d["create_timestamp_flt"] = datetime.datetime.now().timestamp() try: e, doc = DocumentService.get_by_id(req["doc_id"]) diff --git a/api/apps/conversation_app.py b/api/apps/conversation_app.py index c5c8466..c48b586 100644 --- a/api/apps/conversation_app.py +++ b/api/apps/conversation_app.py @@ -13,16 +13,21 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import re + from flask import request from flask_login import login_required from api.db.services.dialog_service import DialogService, ConversationService from api.db import LLMType -from api.db.services.llm_service import LLMService, TenantLLMService, LLMBundle +from api.db.services.knowledgebase_service import KnowledgebaseService +from api.db.services.llm_service import LLMService, LLMBundle +from api.settings import access_logger from api.utils.api_utils import server_error_response, get_data_error_result, validate_request from api.utils import get_uuid from api.utils.api_utils import get_json_result from rag.llm import ChatModel from rag.nlp import retrievaler +from rag.nlp.search import index_name from rag.utils import num_tokens_from_string, encoder @@ -163,6 +168,17 @@ def chat(dialog, messages, **kwargs): if not llm: raise LookupError("LLM(%s) not found"%dialog.llm_id) llm = llm[0] + question = messages[-1]["content"] + embd_mdl = LLMBundle(dialog.tenant_id, LLMType.EMBEDDING) + chat_mdl = LLMBundle(dialog.tenant_id, LLMType.CHAT, dialog.llm_id) + + field_map = KnowledgebaseService.get_field_map(dialog.kb_ids) + ## try to use sql if field mapping is good to go + if field_map: + markdown_tbl,chunks = use_sql(question, field_map, dialog.tenant_id, chat_mdl) + if markdown_tbl: + return {"answer": markdown_tbl, "retrieval": {"chunks": chunks}} + prompt_config = dialog.prompt_config for p in prompt_config["parameters"]: if p["key"] == "knowledge":continue @@ -170,9 +186,6 @@ def chat(dialog, messages, **kwargs): if p["key"] not in kwargs: prompt_config["system"] = prompt_config["system"].replace("{%s}"%p["key"], " ") - question = messages[-1]["content"] - embd_mdl = LLMBundle(dialog.tenant_id, LLMType.EMBEDDING) - chat_mdl = LLMBundle(dialog.tenant_id, LLMType.CHAT, dialog.llm_id) kbinfos = retrievaler.retrieval(question, embd_mdl, dialog.tenant_id, dialog.kb_ids, 1, dialog.top_n, dialog.similarity_threshold, dialog.vector_similarity_weight, top=1024, aggs=False) knowledges = [ck["content_with_weight"] for ck in kbinfos["chunks"]] @@ -196,4 +209,46 @@ def chat(dialog, messages, **kwargs): vtweight=dialog.vector_similarity_weight) for c in kbinfos["chunks"]: if c.get("vector"):del c["vector"] - return {"answer": answer, "retrieval": kbinfos} \ No newline at end of file + return {"answer": answer, "retrieval": kbinfos} + + +def use_sql(question,field_map, tenant_id, chat_mdl): + sys_prompt = "ä˝ ćŻä¸€ä¸ŞDBAă€‚ä˝ éś€č¦čż™ĺŻąä»Ąä¸‹čˇ¨çš„ĺ—ć®µç»“ćž„ďĽŚć ąćŤ®ć‘çš„é—®é˘ĺ†™ĺ‡şsql。" + user_promt = """ +表ĺŤďĽš{}; +数据库表ĺ—段说ćŽĺ¦‚下: +{} + +é—®é˘ďĽš{} +请写出SQL。 +""".format( + index_name(tenant_id), + "\n".join([f"{k}: {v}" for k,v in field_map.items()]), + question + ) + sql = chat_mdl.chat(sys_prompt, [{"role": "user", "content": user_promt}], {"temperature": 0.1}) + sql = re.sub(r".*?select ", "select ", sql, flags=re.IGNORECASE) + sql = re.sub(r" +", " ", sql) + if sql[:len("select ")].lower() != "select ": + return None, None + if sql[:len("select *")].lower() != "select *": + sql = "select doc_id,docnm_kwd," + sql[6:] + + tbl = retrievaler.sql_retrieval(sql) + if not tbl: return None, None + + docid_idx = set([ii for ii, c in enumerate(tbl["columns"]) if c["name"] == "doc_id"]) + docnm_idx = set([ii for ii, c in enumerate(tbl["columns"]) if c["name"] == "docnm_kwd"]) + clmn_idx = [ii for ii in range(len(tbl["columns"])) if ii not in (docid_idx|docnm_idx)] + + clmns = "|".join([re.sub(r"/.*", "", field_map.get(tbl["columns"][i]["name"], f"C{i}")) for i in clmn_idx]) + "|原文" + line = "|".join(["------" for _ in range(len(clmn_idx))]) + "|------" + rows = ["|".join([str(r[i]) for i in clmn_idx])+"|" for r in tbl["rows"]] + if not docid_idx or not docnm_idx: + access_logger.error("SQL missing field: " + sql) + return "\n".join([clmns, line, "\n".join(rows)]), [] + + rows = "\n".join([r+f"##{ii}$$" for ii,r in enumerate(rows)]) + docid_idx = list(docid_idx)[0] + docnm_idx = list(docnm_idx)[0] + return "\n".join([clmns, line, rows]), [{"doc_id": r[docid_idx], "docnm_kwd": r[docnm_idx]} for r in tbl["rows"]] diff --git a/api/apps/document_app.py b/api/apps/document_app.py index 32b8d12..65b480d 100644 --- a/api/apps/document_app.py +++ b/api/apps/document_app.py @@ -21,9 +21,6 @@ import flask from elasticsearch_dsl import Q from flask import request from flask_login import login_required, current_user - -from api.db.db_models import Task -from api.db.services.task_service import TaskService from rag.nlp import search from rag.utils import ELASTICSEARCH from api.db.services import duplicate_name @@ -35,7 +32,7 @@ from api.db.services.document_service import DocumentService from api.settings import RetCode from api.utils.api_utils import get_json_result from rag.utils.minio_conn import MINIO -from api.utils.file_utils import filename_type +from api.utils.file_utils import filename_type, thumbnail @manager.route('/upload', methods=['POST']) @@ -78,7 +75,8 @@ def upload(): "type": filename_type(filename), "name": filename, "location": location, - "size": len(blob) + "size": len(blob), + "thumbnail": thumbnail(filename, blob) }) return get_json_result(data=doc.to_json()) except Exception as e: diff --git a/api/db/db_models.py b/api/db/db_models.py index db641cf..09fe499 100644 --- a/api/db/db_models.py +++ b/api/db/db_models.py @@ -474,7 +474,7 @@ class Knowledgebase(DataBaseModel): vector_similarity_weight = FloatField(default=0.3) parser_id = CharField(max_length=32, null=False, help_text="default parser ID", default=ParserType.GENERAL.value) - parser_config = JSONField(null=False, default={"from_page":0, "to_page": 100000}) + parser_config = JSONField(null=False, default={"pages":[[0,1000000]]}) status = CharField(max_length=1, null=True, help_text="is it validate(0: wasted,1: validate)", default="1") def __str__(self): @@ -489,7 +489,7 @@ class Document(DataBaseModel): thumbnail = TextField(null=True, help_text="thumbnail base64 string") kb_id = CharField(max_length=256, null=False, index=True) parser_id = CharField(max_length=32, null=False, help_text="default parser ID") - parser_config = JSONField(null=False, default={"from_page":0, "to_page": 100000}) + parser_config = JSONField(null=False, default={"pages":[[0,1000000]]}) source_type = CharField(max_length=128, null=False, default="local", help_text="where dose this document from") type = CharField(max_length=32, null=False, help_text="file extension") created_by = CharField(max_length=32, null=False, help_text="who created it") diff --git a/api/db/services/dialog_service.py b/api/db/services/dialog_service.py index bb770eb..2864e4f 100644 --- a/api/db/services/dialog_service.py +++ b/api/db/services/dialog_service.py @@ -21,5 +21,6 @@ class DialogService(CommonService): model = Dialog + class ConversationService(CommonService): model = Conversation diff --git a/api/db/services/knowledgebase_service.py b/api/db/services/knowledgebase_service.py index a99346a..0851e6b 100644 --- a/api/db/services/knowledgebase_service.py +++ b/api/db/services/knowledgebase_service.py @@ -63,3 +63,31 @@ class KnowledgebaseService(CommonService): d = kbs[0].to_dict() d["embd_id"] = kbs[0].tenant.embd_id return d + + @classmethod + @DB.connection_context() + def update_parser_config(cls, id, config): + e, m = cls.get_by_id(id) + if not e:raise LookupError(f"knowledgebase({id}) not found.") + def dfs_update(old, new): + for k,v in new.items(): + if k not in old: + old[k] = v + continue + if isinstance(v, dict): + assert isinstance(old[k], dict) + dfs_update(old[k], v) + else: old[k] = v + dfs_update(m.parser_config, config) + cls.update_by_id(id, m.parser_config) + + + @classmethod + @DB.connection_context() + def get_field_map(cls, ids): + conf = {} + for k in cls.get_by_ids(ids): + if k.parser_config and "field_map" in k.parser_config: + conf.update(k.parser_config) + return conf + diff --git a/api/utils/file_utils.py b/api/utils/file_utils.py index 92771f4..7c54302 100644 --- a/api/utils/file_utils.py +++ b/api/utils/file_utils.py @@ -13,11 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. # - +import base64 import json import os import re +from io import BytesIO +import fitz +from PIL import Image from cachetools import LRUCache, cached from ruamel.yaml import YAML @@ -150,4 +153,33 @@ def filename_type(filename): return FileType.AURAL.value if re.match(r".*\.(jpg|jpeg|png|tif|gif|pcx|tga|exif|fpx|svg|psd|cdr|pcd|dxf|ufo|eps|ai|raw|WMF|webp|avif|apng|icon|ico|mpg|mpeg|avi|rm|rmvb|mov|wmv|asf|dat|asx|wvx|mpe|mpa|mp4)$", filename): - return FileType.VISUAL \ No newline at end of file + return FileType.VISUAL + + +def thumbnail(filename, blob): + filename = filename.lower() + if re.match(r".*\.pdf$", filename): + pdf = fitz.open(stream=blob, filetype="pdf") + pix = pdf[0].get_pixmap(matrix=fitz.Matrix(0.03, 0.03)) + buffered = BytesIO() + Image.frombytes("RGB", [pix.width, pix.height], + pix.samples).save(buffered, format="png") + return "data:image/png;base64," + base64.b64encode(buffered.getvalue()) + + if re.match(r".*\.(jpg|jpeg|png|tif|gif|icon|ico|webp)$", filename): + return ("data:image/%s;base64,"%filename.split(".")[-1]) + base64.b64encode(Image.open(BytesIO(blob)).thumbnail((30, 30)).tobytes()) + + if re.match(r".*\.(ppt|pptx)$", filename): + import aspose.slides as slides + import aspose.pydrawing as drawing + try: + with slides.Presentation(BytesIO(blob)) as presentation: + buffered = BytesIO() + presentation.slides[0].get_thumbnail(0.03, 0.03).save(buffered, drawing.imaging.ImageFormat.png) + return "data:image/png;base64," + base64.b64encode(buffered.getvalue()) + except Exception as e: + pass + + + + diff --git a/rag/app/paper.py b/rag/app/paper.py index 3fca9a9..52c7a62 100644 --- a/rag/app/paper.py +++ b/rag/app/paper.py @@ -3,7 +3,6 @@ import re from collections import Counter from api.db import ParserType -from rag.cv.ppdetection import PPDet from rag.parser import tokenize from rag.nlp import huqie from rag.parser.pdf_parser import HuParser diff --git a/rag/app/resume.py b/rag/app/resume.py new file mode 100644 index 0000000..f62d2c5 --- /dev/null +++ b/rag/app/resume.py @@ -0,0 +1,102 @@ +import copy +import json +import os +import re +import requests +from api.db.services.knowledgebase_service import KnowledgebaseService +from rag.nlp import huqie + +from rag.settings import cron_logger +from rag.utils import rmSpace + + +def chunk(filename, binary=None, callback=None, **kwargs): + if not re.search(r"\.(pdf|doc|docx|txt)$", filename, flags=re.IGNORECASE): raise NotImplementedError("file type not supported yet(pdf supported)") + + url = os.environ.get("INFINIFLOW_SERVER") + if not url:raise EnvironmentError("Please set environment variable: 'INFINIFLOW_SERVER'") + token = os.environ.get("INFINIFLOW_TOKEN") + if not token:raise EnvironmentError("Please set environment variable: 'INFINIFLOW_TOKEN'") + + if not binary: + with open(filename, "rb") as f: binary = f.read() + def remote_call(): + nonlocal filename, binary + for _ in range(3): + try: + res = requests.post(url + "/v1/layout/resume/", files=[(filename, binary)], + headers={"Authorization": token}, timeout=180) + res = res.json() + if res["retcode"] != 0: raise RuntimeError(res["retmsg"]) + return res["data"] + except RuntimeError as e: + raise e + except Exception as e: + cron_logger.error("resume parsing:" + str(e)) + + resume = remote_call() + print(json.dumps(resume, ensure_ascii=False, indent=2)) + + field_map = { + "name_kwd": "姓ĺŤ/ĺŤĺ—", + "gender_kwd": "性ĺ«ďĽç”·ďĽŚĺĄłďĽ‰", + "age_int": "ĺą´éľ„/ĺ˛/年纪", + "phone_kwd": "电话/手机/微信", + "email_tks": "email/e-mail/邮箱", + "position_name_tks": "čŚä˝Ť/čŚč˝/岗位/čŚč´Ł", + "expect_position_name_tks": "ćśźćś›čŚä˝Ť/ćśźćś›čŚč˝/期望岗位", + + "hightest_degree_kwd": "最é«ĺ¦ĺŽ†ďĽé«ä¸ďĽŚčŚé«ďĽŚçˇ•ĺŁ«ďĽŚćś¬ç§‘,博士,ĺťä¸ďĽŚä¸ćŠ€ďĽŚä¸ä¸“,专科,专升本,MPA,MBA,EMBA)", + "first_degree_kwd": "第一ĺ¦ĺŽ†ďĽé«ä¸ďĽŚčŚé«ďĽŚçˇ•ĺŁ«ďĽŚćś¬ç§‘,博士,ĺťä¸ďĽŚä¸ćŠ€ďĽŚä¸ä¸“,专科,专升本,MPA,MBA,EMBA)", + "first_major_tks": "第一ĺ¦ĺŽ†ä¸“业", + "first_school_name_tks": "第一ĺ¦ĺŽ†ćŻ•ä¸šĺ¦ć ˇ", + "edu_first_fea_kwd": "第一ĺ¦ĺŽ†ć ‡çľďĽ211,留ĺ¦ďĽŚĺŹŚä¸€ćµďĽŚ985,海外知ĺŤďĽŚé‡Ťç‚ąĺ¤§ĺ¦ďĽŚä¸ä¸“,专升本,专科,本科,大专)", + + "degree_kwd": "过往ĺ¦ĺŽ†ďĽé«ä¸ďĽŚčŚé«ďĽŚçˇ•ĺŁ«ďĽŚćś¬ç§‘,博士,ĺťä¸ďĽŚä¸ćŠ€ďĽŚä¸ä¸“,专科,专升本,MPA,MBA,EMBA)", + "major_tks": "ĺ¦čż‡çš„专业/过往专业", + "school_name_tks": "ĺ¦ć ˇ/ćŻ•ä¸šé™˘ć ˇ", + "sch_rank_kwd": "ĺ¦ć ˇć ‡çľďĽéˇ¶ĺ°–ĺ¦ć ˇďĽŚç˛ľč‹±ĺ¦ć ˇďĽŚäĽč´¨ĺ¦ć ˇďĽŚä¸€č¬ĺ¦ć ˇďĽ‰", + "edu_fea_kwd": "ć•™č‚˛ć ‡çľďĽ211,留ĺ¦ďĽŚĺŹŚä¸€ćµďĽŚ985,海外知ĺŤďĽŚé‡Ťç‚ąĺ¤§ĺ¦ďĽŚä¸ä¸“,专升本,专科,本科,大专)", + + "work_exp_flt": "工作年é™/工作年份/N年经验/毕业了多少年", + "birth_dt": "生日/出生年份", + "corp_nm_tks": "ĺ°±čŚčż‡çš„公司/之前的公司/上过çŹçš„公司", + "corporation_name_tks": "最近就čŚ(上çŹ)的公司/上一家公司", + "edu_end_int": "毕业年份", + "expect_city_names_tks": "期望城市", + "industry_name_tks": "所在行业" + } + titles = [] + for n in ["name_kwd", "gender_kwd", "position_name_tks", "age_int"]: + v = resume.get(n, "") + if isinstance(v, list):v = v[0] + if n.find("tks") > 0: v = rmSpace(v) + titles.append(str(v)) + doc = { + "docnm_kwd": filename, + "title_tks": huqie.qie("-".join(titles)+"-简历") + } + doc["title_sm_tks"] = huqie.qieqie(doc["title_tks"]) + pairs = [] + for n,m in field_map.items(): + if not resume.get(n):continue + v = resume[n] + if isinstance(v, list):v = " ".join(v) + if n.find("tks") > 0: v = rmSpace(v) + pairs.append((m, str(v))) + + doc["content_with_weight"] = "\n".join(["{}: {}".format(re.sub(r"ďĽ[^ďĽďĽ‰]+)", "", k), v) for k,v in pairs]) + doc["content_ltks"] = huqie.qie(doc["content_with_weight"]) + doc["content_sm_ltks"] = huqie.qieqie(doc["content_ltks"]) + for n, _ in field_map.items(): doc[n] = resume[n] + + print(doc) + KnowledgebaseService.update_parser_config(kwargs["kb_id"], {"field_map": field_map}) + return [doc] + + +if __name__ == "__main__": + import sys + def dummy(a, b): + pass + chunk(sys.argv[1], callback=dummy) diff --git a/rag/app/table.py b/rag/app/table.py index aba795f..ee66356 100644 --- a/rag/app/table.py +++ b/rag/app/table.py @@ -1,13 +1,13 @@ import copy -import random import re from io import BytesIO from xpinyin import Pinyin import numpy as np import pandas as pd -from nltk import word_tokenize from openpyxl import load_workbook from dateutil.parser import parse as datetime_parse + +from api.db.services.knowledgebase_service import KnowledgebaseService from rag.parser import is_english, tokenize from rag.nlp import huqie, stemmer @@ -27,18 +27,19 @@ class Excel(object): ws = wb[sheetname] rows = list(ws.rows) headers = [cell.value for cell in rows[0]] - missed = set([i for i,h in enumerate(headers) if h is None]) - headers = [cell.value for i,cell in enumerate(rows[0]) if i not in missed] + missed = set([i for i, h in enumerate(headers) if h is None]) + headers = [cell.value for i, cell in enumerate(rows[0]) if i not in missed] data = [] for i, r in enumerate(rows[1:]): - row = [cell.value for ii,cell in enumerate(r) if ii not in missed] + row = [cell.value for ii, cell in enumerate(r) if ii not in missed] if len(row) != len(headers): fails.append(str(i)) continue data.append(row) done += 1 if done % 999 == 0: - callback(done * 0.6/total, ("Extract records: {}".format(len(res)) + (f"{len(fails)} failure({sheetname}), line: %s..."%(",".join(fails[:3])) if fails else ""))) + callback(done * 0.6 / total, ("Extract records: {}".format(len(res)) + ( + f"{len(fails)} failure({sheetname}), line: %s..." % (",".join(fails[:3])) if fails else ""))) res.append(pd.DataFrame(np.array(data), columns=headers)) callback(0.6, ("Extract records: {}. ".format(done) + ( @@ -61,9 +62,10 @@ def trans_bool(s): def column_data_type(arr): uni = len(set([a for a in arr if a is not None])) counts = {"int": 0, "float": 0, "text": 0, "datetime": 0, "bool": 0} - trans = {t:f for f,t in [(int, "int"), (float, "float"), (trans_datatime, "datetime"), (trans_bool, "bool"), (str, "text")]} + trans = {t: f for f, t in + [(int, "int"), (float, "float"), (trans_datatime, "datetime"), (trans_bool, "bool"), (str, "text")]} for a in arr: - if a is None:continue + if a is None: continue if re.match(r"[+-]?[0-9]+(\.0+)?$", str(a).replace("%%", "")): counts["int"] += 1 elif re.match(r"[+-]?[0-9.]+$", str(a).replace("%%", "")): @@ -72,17 +74,18 @@ def column_data_type(arr): counts["bool"] += 1 elif trans_datatime(str(a)): counts["datetime"] += 1 - else: counts["text"] += 1 - counts = sorted(counts.items(), key=lambda x: x[1]*-1) + else: + counts["text"] += 1 + counts = sorted(counts.items(), key=lambda x: x[1] * -1) ty = counts[0][0] for i in range(len(arr)): - if arr[i] is None:continue + if arr[i] is None: continue try: arr[i] = trans[ty](str(arr[i])) except Exception as e: arr[i] = None if ty == "text": - if len(arr) > 128 and uni/len(arr) < 0.1: + if len(arr) > 128 and uni / len(arr) < 0.1: ty = "keyword" return arr, ty @@ -123,48 +126,51 @@ def chunk(filename, binary=None, callback=None, **kwargs): dfs = [pd.DataFrame(np.array(rows), columns=headers)] - else: raise NotImplementedError("file type not supported yet(excel, text, csv supported)") + else: + raise NotImplementedError("file type not supported yet(excel, text, csv supported)") res = [] PY = Pinyin() fieds_map = {"text": "_tks", "int": "_int", "keyword": "_kwd", "float": "_flt", "datetime": "_dt", "bool": "_kwd"} for df in dfs: for n in ["id", "_id", "index", "idx"]: - if n in df.columns:del df[n] + if n in df.columns: del df[n] clmns = df.columns.values txts = list(copy.deepcopy(clmns)) py_clmns = [PY.get_pinyins(n)[0].replace("-", "_") for n in clmns] clmn_tys = [] for j in range(len(clmns)): - cln,ty = column_data_type(df[clmns[j]]) + cln, ty = column_data_type(df[clmns[j]]) clmn_tys.append(ty) df[clmns[j]] = cln if ty == "text": txts.extend([str(c) for c in cln if c]) clmns_map = [(py_clmns[j] + fieds_map[clmn_tys[j]], clmns[j]) for i in range(len(clmns))] - # TODO: set this column map to KB parser configuration eng = is_english(txts) - for ii,row in df.iterrows(): + for ii, row in df.iterrows(): d = {} row_txt = [] for j in range(len(clmns)): - if row[clmns[j]] is None:continue + if row[clmns[j]] is None: continue fld = clmns_map[j][0] d[fld] = row[clmns[j]] if clmn_tys[j] != "text" else huqie.qie(row[clmns[j]]) row_txt.append("{}:{}".format(clmns[j], row[clmns[j]])) - if not row_txt:continue + if not row_txt: continue tokenize(d, "; ".join(row_txt), eng) - print(d) res.append(d) + + KnowledgebaseService.update_parser_config(kwargs["kb_id"], {"field_map": {k: v for k, v in clmns_map}}) callback(0.6, "") return res - -if __name__== "__main__": +if __name__ == "__main__": import sys + + def dummy(a, b): pass - chunk(sys.argv[1], callback=dummy) + + chunk(sys.argv[1], callback=dummy) diff --git a/rag/nlp/search.py b/rag/nlp/search.py index 4c8c215..9a23781 100644 --- a/rag/nlp/search.py +++ b/rag/nlp/search.py @@ -74,7 +74,9 @@ class Dealer: s = s.highlight("title_ltks") if not qst: s = s.sort( - {"create_time": {"order": "desc", "unmapped_type": "date"}}) + {"create_time": {"order": "desc", "unmapped_type": "date"}}, + {"create_timestamp_flt": {"order": "desc", "unmapped_type": "float"}} + ) if qst: s = s.highlight_options( @@ -298,3 +300,22 @@ class Dealer: ranks["doc_aggs"][dnm] += 1 return ranks + + def sql_retrieval(self, sql, fetch_size=128): + sql = re.sub(r"[ ]+", " ", sql) + replaces = [] + for r in re.finditer(r" ([a-z_]+_l?tks like |[a-z_]+_l?tks ?= ?)'([^']+)'", sql): + fld, v = r.group(1), r.group(2) + fld = re.sub(r" ?(like|=)$", "", fld).lower() + if v[0] == "%%": v = v[1:-1] + match = " MATCH({}, '{}', 'operator=OR;fuzziness=AUTO:1,3;minimum_should_match=30%') ".format(fld, huqie.qie(v)) + replaces.append((r.group(1)+r.group(2), match)) + + for p, r in replaces: sql.replace(p, r) + + try: + tbl = self.es.sql(sql, fetch_size) + return tbl + except Exception as e: + es_logger(f"SQL failure: {sql} =>" + str(e)) + diff --git a/rag/nlp/surname.py b/rag/nlp/surname.py new file mode 100644 index 0000000..e7098ce --- /dev/null +++ b/rag/nlp/surname.py @@ -0,0 +1,127 @@ +#-*- coding: utf-8 -*- +m = set(["čµµ","é’±","ĺ™","李", +"周","ĺ´","é‘","王", +"冯","é™","褚","卫", +"č’‹","ć˛","éź©","杨", +"ćś±","秦","ĺ°¤","许", +"何","ĺ•","ć–˝","ĺĽ ", +"ĺ”","曹","严","华", +"金","éŹ","陶","姜", +"ćš","č°˘","é‚ą","ĺ–»", +"柏","ć°´","窦","ç« ", +"äş‘","č‹Ź","ć˝","č‘›", +"奚","čŚ","ĺ˝","éŽ", +"é˛","韦","ćŚ","马", +"č‹—","凤","花","ć–ą", +"äżž","ä»»","č˘","ćźł", +"é…†","鲍","史","ĺ”", +"č´ą","廉","岑","č–›", +"é›·","č´ş","倪","汤", +"滕","ć®·","ç˝—","毕", +"éť","邬","安","常", +"äą","于","ć—¶","ĺ‚…", +"çš®","卞","é˝","ĺş·", +"伍","ä˝™","ĺ…","卜", +"顾","ĺź","ĺął","黄", +"ĺ’Ś","穆","č§","ĺ°ą", +"姚","邵","ćą›","汪", +"çĄ","毛","禹","ç‹„", +"米","č´ť","ćŽ","臧", +"计","伏","ć","ć´", +"č°","宋","茅","ĺşž", +"熊","纪","č’","ĺ±", +"项","祝","č‘Ł","ć˘", +"ćťś","é®","č“ť","é—µ", +"ĺ¸","ĺŁ","éş»","强", +"č´ľ","č·Ż","娄","危", +"江","ç«Ą","颜","é", +"梅","ç››","ćž—","ĺ", +"é’ź","ĺľ","邱","骆", +"é«","夏","蔡","ç”°", +"樊","čˇ","凌","霍", +"虞","万","支","柯", +"ćť","管","卢","莫", +"经","ćż","čŁ","缪", +"干","解","ĺş”","ĺ®—", +"ä¸","宣","č´˛","é‚“", +"é","单","ćť","ć´Ş", +"包","诸","ĺ·¦","çźł", +"ĺ´”","ĺ‰","é’®","éľš", +"程","嵇","邢","滑", +"裴","陆","荣","çż", +"荀","羊","ć–Ľ","ć ", +"甄","曲","家","ĺ°", +"芮","çľż","储","éťł", +"汲","é‚´","çłś","ćťľ", +"äş•","段","富","ĺ·«", +"乌","焦","ĺ·´","弓", +"牧","éš—","ĺ±±","č°·", +"车","侯","宓","蓬", +"ĺ…¨","é—","çŹ","ä»°", +"秋","仲","伊","宫", +"ĺ®","仇","ć ľ","ćš´", +"ç”","é’","厉","ćŽ", +"祖","ć¦","符","ĺ", +"景","č©ą","ćťź","éľ™", +"叶","幸","司","韶", +"éś","黎","č“ź","č–„", +"印","宿","ç™˝","怀", +"č’˛","é‚°","从","é„‚", +"ç´˘","ĺ’¸","籍","čµ–", +"卓","蔺","ĺ± ","č’™", +"ć± ","äą”","é´","鬱", +"čĄ","č˝","č‹Ť","双", +"é—»","čŽ","ĺ…š","çżź", +"č°","č´ˇ","劳","逄", +"姬","申","扶","ĺ µ", +"冉","ĺ®°","é¦","雍", +"é¤","ç’©","桑","桂", +"ćż®","牛","寿","通", +"čľą","ć‰","燕","冀", +"éŹ","浦","ĺ°š","农", +"温","ĺ«","ĺş„","晏", +"ćź´","çžż","éŽ","ĺ……", +"ć…•","čżž","茹","äą ", +"宦","艾","鱼","容", +"ĺ‘","古","ć“","ć…Ž", +"ć","ĺ»–","ĺşľ","ç»", +"暨","ĺ±…","衡","ćĄ", +"é˝","耿","满","ĺĽ", +"匡","ĺ›˝","ć–‡","寇", +"ĺąż","禄","é™","东", +"欧","殳","ć˛","ĺ©", +"蔚","越","夔","隆", +"ĺ¸","ĺ·©","厍","č‚", +"ć™","ĺ‹ľ","ć•–","融", +"冷","訾","čľ›","éš", +"é‚Ł","简","饶","ç©ş", +"曾","母","沙","äąś", +"ĺ…»","éž ","须","丰", +"ĺ·˘","ĺ…ł","č’Ż","相", +"查","ĺŽ","荆","红", +"游","ç«ş","ćť","逯", +"ç›–","益","桓","ĺ…¬", +"ĺ…°","原","äąž","西","éż","č‚–","丑","位","ć›˝","ĺ·¨","ĺľ·","代","圆","ĺ°‰","仵","çşł","仝","脱","ä¸","但","展","迪","ä»","č¦","ć™—","特","éš‹","č‹‘","奥","漆","č°Ś","é„","ç»","扎","é‚ť","ć¸ ","信","é—¨","陳","化","原","密","ćł®","éąż","赫", +"万俟","司马","上ĺ®","欧éł", +"夏侯","诸葛","闻人","东方", +"赫连","皇甫","尉迟","公羊", +"澹台","公冶","宗政","ćż®éł", +"淳于","单于","太叔","ç”łĺ± ", +"ĺ…¬ĺ™","仲ĺ™","轩辕","令ç‹", +"钟离","宇文","é•żĺ™","慕容", +"鲜于","é—ľä¸","司徒","司空", +"äş“ĺ®","司寇","仉督","ĺ车", +"颛ĺ™","端木","巫马","公西", +"漆雕","äąćŁ","壤驷","公良", +"ć‹“č·‹","夹谷","ĺ®°ç¶","榖ć˘", +"晋","楚","é—«","ćł•","汝","鄢","涂","é’¦", +"段干","百里","东é","南门", +"呼延","ĺ˝’","ćµ·","羊čŚ","ĺľ®","生", +"岳","帅","缑","亢","况","ĺŽ","有","ç´", +"ć˘ä¸","ĺ·¦ä¸","东门","西门", +"商","牟","ä˝","ä˝´","伯","赏","南宫", +"墨","ĺ“","č°Ż","笪","ĺą´","ç±","éł","佟", +"第五","言","福"]) + +def isit(n):return n.strip() in m + diff --git a/rag/svr/task_broker.py b/rag/svr/task_broker.py index 76aa7f9..e501042 100644 --- a/rag/svr/task_broker.py +++ b/rag/svr/task_broker.py @@ -81,11 +81,13 @@ def dispatch(): tsks = [] if r["type"] == FileType.PDF.value: pages = HuParser.total_page_number(r["name"], MINIO.get(r["kb_id"], r["location"])) - for p in range(0, pages, 10): - task = new_task() - task["from_page"] = p - task["to_page"] = min(p + 10, pages) - tsks.append(task) + for s,e in r["parser_config"].get("pages", [(0,100000)]): + e = min(e, pages) + for p in range(s, e, 10): + task = new_task() + task["from_page"] = p + task["to_page"] = min(p + 10, e) + tsks.append(task) else: tsks.append(new_task()) print(tsks) diff --git a/rag/svr/task_executor.py b/rag/svr/task_executor.py index e945dad..def9889 100644 --- a/rag/svr/task_executor.py +++ b/rag/svr/task_executor.py @@ -58,7 +58,7 @@ FACTORY = { } -def set_progress(task_id, from_page, to_page, prog=None, msg="Processing..."): +def set_progress(task_id, from_page=0, to_page=-1, prog=None, msg="Processing..."): cancel = TaskService.do_cancel(task_id) if cancel: msg += " [Canceled]" @@ -110,7 +110,7 @@ def collect(comm, mod, tm): def build(row, cvmdl): if row["size"] > DOC_MAXIMUM_SIZE: - set_progress(row["id"], -1, "File size exceeds( <= %dMb )" % + set_progress(row["id"], prog=-1, msg="File size exceeds( <= %dMb )" % (int(DOC_MAXIMUM_SIZE / 1024 / 1024))) return [] @@ -119,7 +119,7 @@ def build(row, cvmdl): try: cron_logger.info("Chunkking {}/{}".format(row["location"], row["name"])) cks = chunker.chunk(row["name"], MINIO.get(row["kb_id"], row["location"]), row["from_page"], row["to_page"], - callback) + callback, kb_id=row["kb_id"]) except Exception as e: if re.search("(No such file|not found)", str(e)): callback(-1, "Can not find file <%s>" % row["doc_name"]) @@ -144,6 +144,7 @@ def build(row, cvmdl): md5.update((ck["content_with_weight"] + str(d["doc_id"])).encode("utf-8")) d["_id"] = md5.hexdigest() d["create_time"] = str(datetime.datetime.now()).replace("T", " ")[:19] + d["create_timestamp_flt"] = datetime.datetime.now().timestamp() if not d.get("image"): docs.append(d) continue @@ -197,15 +198,15 @@ def main(comm, mod): tmf = open(tm_fnm, "a+") for _, r in rows.iterrows(): + callback = partial(set_progress, r["id"], r["from_page"], r["to_page"]) try: embd_mdl = LLMBundle(r["tenant_id"], LLMType.EMBEDDING) cv_mdl = LLMBundle(r["tenant_id"], LLMType.IMAGE2TEXT) # TODO: sequence2text model except Exception as e: - set_progress(r["id"], -1, str(e)) + callback(prog=-1, msg=str(e)) continue - callback = partial(set_progress, r["id"], r["from_page"], r["to_page"]) st_tm = timer() cks = build(r, cv_mdl) if not cks: diff --git a/rag/utils/es_conn.py b/rag/utils/es_conn.py index 0c7f2da..fb24e8e 100644 --- a/rag/utils/es_conn.py +++ b/rag/utils/es_conn.py @@ -3,13 +3,14 @@ import json import time import copy import elasticsearch +from elastic_transport import ConnectionTimeout from elasticsearch import Elasticsearch from elasticsearch_dsl import UpdateByQuery, Search, Index from rag.settings import es_logger from rag import settings from rag.utils import singleton -es_logger.info("Elasticsearch version: "+ str(elasticsearch.__version__)) +es_logger.info("Elasticsearch version: "+str(elasticsearch.__version__)) @singleton @@ -57,7 +58,7 @@ class HuEs: body=d, id=id, doc_type="doc", - refresh=False, + refresh=True, retry_on_conflict=100) else: r = self.es.update( @@ -65,7 +66,7 @@ class HuEs: self.idxnm if not idxnm else idxnm), body=d, id=id, - refresh=False, + refresh=True, retry_on_conflict=100) es_logger.info("Successfully upsert: %s" % id) T = True @@ -240,6 +241,18 @@ class HuEs: es_logger.error("ES search timeout for 3 times!") raise Exception("ES search timeout.") + def sql(self, sql, fetch_size=128, format="json", timeout=2): + for i in range(3): + try: + res = self.es.sql.query(body={"query": sql, "fetch_size": fetch_size}, format=format, request_timeout=timeout) + return res + except ConnectionTimeout as e: + es_logger.error("Timeoută€Q】:" + sql) + continue + es_logger.error("ES search timeout for 3 times!") + raise ConnectionTimeout() + + def get(self, doc_id, idxnm=None): for i in range(3): try: @@ -308,7 +321,8 @@ class HuEs: try: r = self.es.delete_by_query( index=idxnm if idxnm else self.idxnm, - body=Search().query(query).to_dict()) + refresh = True, + body=Search().query(query).to_dict()) return True except Exception as e: es_logger.error("ES updateByQuery deleteByQuery: " + -- GitLab