From 923cbe488eaae8c519552d5efe862c478560fb47 Mon Sep 17 00:00:00 2001
From: KevinHuSh <kevinhu.sh@gmail.com>
Date: Wed, 10 Apr 2024 10:11:22 +0800
Subject: [PATCH] fix #258 task_executor occupy cpu too much (#288)

### What problem does this PR solve?

Issue link:#285

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
---
 rag/nlp/query.py         |  2 +-
 rag/svr/task_executor.py | 37 ++++++++++++++++++++-----------------
 rag/utils/minio_conn.py  |  2 +-
 3 files changed, 22 insertions(+), 19 deletions(-)

diff --git a/rag/nlp/query.py b/rag/nlp/query.py
index e62fb9f..9d9c855 100644
--- a/rag/nlp/query.py
+++ b/rag/nlp/query.py
@@ -62,7 +62,7 @@ class EsQueryer:
             return Q("bool",
                      must=Q("query_string", fields=self.flds,
                             type="best_fields", query=" ".join(q),
-                            boost=1, minimum_should_match=min_match)
+                            boost=1)#, minimum_should_match=min_match)
                      ), tks
 
         def needQieqie(tk):
diff --git a/rag/svr/task_executor.py b/rag/svr/task_executor.py
index 799d252..1f5e37a 100644
--- a/rag/svr/task_executor.py
+++ b/rag/svr/task_executor.py
@@ -21,16 +21,15 @@ import hashlib
 import copy
 import re
 import sys
+import time
 import traceback
 from functools import partial
-import signal
-from contextlib import contextmanager
 from rag.settings import database_logger
 from rag.settings import cron_logger, DOC_MAXIMUM_SIZE
-
+from multiprocessing import Pool
 import numpy as np
 from elasticsearch_dsl import Q
-
+from multiprocessing.context import TimeoutError
 from api.db.services.task_service import TaskService
 from rag.utils import ELASTICSEARCH
 from rag.utils import MINIO
@@ -92,23 +91,17 @@ def set_progress(task_id, from_page=0, to_page=-1,
 def collect(comm, mod, tm):
     tasks = TaskService.get_tasks(tm, mod, comm)
     if len(tasks) == 0:
+        time.sleep(1)
         return pd.DataFrame()
     tasks = pd.DataFrame(tasks)
     mtm = tasks["update_time"].max()
     cron_logger.info("TOTAL:{}, To:{}".format(len(tasks), mtm))
     return tasks
 
-@contextmanager
-def timeout(time):
-    # Register a function to raise a TimeoutError on the signal.
-    signal.signal(signal.SIGALRM, raise_timeout)
-    # Schedule the signal to be sent after ``time``.
-    signal.alarm(time)
-    yield
-
 
-def raise_timeout(signum, frame):
-    raise TimeoutError
+def get_minio_binary(bucket, name):
+    global MINIO
+    return MINIO.get(bucket, name)
 
 
 def build(row):
@@ -124,24 +117,34 @@ def build(row):
         row["from_page"],
         row["to_page"])
     chunker = FACTORY[row["parser_id"].lower()]
+    pool = Pool(processes=1)
     try:
         st = timer()
-        with timeout(30):
-            binary = MINIO.get(row["kb_id"], row["location"])
+        thr = pool.apply_async(get_minio_binary, args=(row["kb_id"], row["location"]))
+        binary = thr.get(timeout=90)
+        pool.terminate()
+        cron_logger.info(
+            "From minio({}) {}/{}".format(timer()-st, row["location"], row["name"]))
         cks = chunker.chunk(row["name"], binary=binary, from_page=row["from_page"],
                             to_page=row["to_page"], lang=row["language"], callback=callback,
                             kb_id=row["kb_id"], parser_config=row["parser_config"], tenant_id=row["tenant_id"])
         cron_logger.info(
             "Chunkking({}) {}/{}".format(timer()-st, row["location"], row["name"]))
+    except TimeoutError as e:
+        callback(-1, f"Internal server error: Fetch file timeout. Could you try it again.")
+        cron_logger.error(
+            "Chunkking {}/{}: Fetch file timeout.".format(row["location"], row["name"]))
+        return
     except Exception as e:
         if re.search("(No such file|not found)", str(e)):
             callback(-1, "Can not find file <%s>" % row["name"])
         else:
             callback(-1, f"Internal server error: %s" %
                      str(e).replace("'", ""))
+        pool.terminate()
         traceback.print_exc()
 
-        cron_logger.warn(
+        cron_logger.error(
             "Chunkking {}/{}: {}".format(row["location"], row["name"], str(e)))
 
         return
diff --git a/rag/utils/minio_conn.py b/rag/utils/minio_conn.py
index 18d3d3b..d5b0927 100644
--- a/rag/utils/minio_conn.py
+++ b/rag/utils/minio_conn.py
@@ -58,7 +58,7 @@ class HuMinio(object):
 
 
     def get(self, bucket, fnm):
-        for _ in range(10):
+        for _ in range(1):
             try:
                 r = self.conn.get_object(bucket, fnm)
                 return r.read()
-- 
GitLab