Skip to content
Snippets Groups Projects
Unverified Commit 923cbe48 authored by KevinHuSh's avatar KevinHuSh Committed by GitHub
Browse files

fix #258 task_executor occupy cpu too much (#288)

### What problem does this PR solve?

Issue link:#285

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
parent 28d29949
No related branches found
No related tags found
No related merge requests found
......@@ -62,7 +62,7 @@ class EsQueryer:
return Q("bool",
must=Q("query_string", fields=self.flds,
type="best_fields", query=" ".join(q),
boost=1, minimum_should_match=min_match)
boost=1)#, minimum_should_match=min_match)
), tks
def needQieqie(tk):
......
......@@ -21,16 +21,15 @@ import hashlib
import copy
import re
import sys
import time
import traceback
from functools import partial
import signal
from contextlib import contextmanager
from rag.settings import database_logger
from rag.settings import cron_logger, DOC_MAXIMUM_SIZE
from multiprocessing import Pool
import numpy as np
from elasticsearch_dsl import Q
from multiprocessing.context import TimeoutError
from api.db.services.task_service import TaskService
from rag.utils import ELASTICSEARCH
from rag.utils import MINIO
......@@ -92,23 +91,17 @@ def set_progress(task_id, from_page=0, to_page=-1,
def collect(comm, mod, tm):
tasks = TaskService.get_tasks(tm, mod, comm)
if len(tasks) == 0:
time.sleep(1)
return pd.DataFrame()
tasks = pd.DataFrame(tasks)
mtm = tasks["update_time"].max()
cron_logger.info("TOTAL:{}, To:{}".format(len(tasks), mtm))
return tasks
@contextmanager
def timeout(time):
# Register a function to raise a TimeoutError on the signal.
signal.signal(signal.SIGALRM, raise_timeout)
# Schedule the signal to be sent after ``time``.
signal.alarm(time)
yield
def raise_timeout(signum, frame):
raise TimeoutError
def get_minio_binary(bucket, name):
global MINIO
return MINIO.get(bucket, name)
def build(row):
......@@ -124,24 +117,34 @@ def build(row):
row["from_page"],
row["to_page"])
chunker = FACTORY[row["parser_id"].lower()]
pool = Pool(processes=1)
try:
st = timer()
with timeout(30):
binary = MINIO.get(row["kb_id"], row["location"])
thr = pool.apply_async(get_minio_binary, args=(row["kb_id"], row["location"]))
binary = thr.get(timeout=90)
pool.terminate()
cron_logger.info(
"From minio({}) {}/{}".format(timer()-st, row["location"], row["name"]))
cks = chunker.chunk(row["name"], binary=binary, from_page=row["from_page"],
to_page=row["to_page"], lang=row["language"], callback=callback,
kb_id=row["kb_id"], parser_config=row["parser_config"], tenant_id=row["tenant_id"])
cron_logger.info(
"Chunkking({}) {}/{}".format(timer()-st, row["location"], row["name"]))
except TimeoutError as e:
callback(-1, f"Internal server error: Fetch file timeout. Could you try it again.")
cron_logger.error(
"Chunkking {}/{}: Fetch file timeout.".format(row["location"], row["name"]))
return
except Exception as e:
if re.search("(No such file|not found)", str(e)):
callback(-1, "Can not find file <%s>" % row["name"])
else:
callback(-1, f"Internal server error: %s" %
str(e).replace("'", ""))
pool.terminate()
traceback.print_exc()
cron_logger.warn(
cron_logger.error(
"Chunkking {}/{}: {}".format(row["location"], row["name"], str(e)))
return
......
......@@ -58,7 +58,7 @@ class HuMinio(object):
def get(self, bucket, fnm):
for _ in range(10):
for _ in range(1):
try:
r = self.conn.get_object(bucket, fnm)
return r.read()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment