diff --git a/apps/llm/patterns/core.py b/apps/llm/patterns/core.py index 4ef8133a9fed1b1e62f1ceb578c6bdb5a93b12a5..dec94f5d2ac4282d46a6ef349f3c72a6c0e98713 100644 --- a/apps/llm/patterns/core.py +++ b/apps/llm/patterns/core.py @@ -4,39 +4,41 @@ from abc import ABC, abstractmethod from textwrap import dedent +from apps.schemas.enum_var import LanguageType + class CorePattern(ABC): """基础大模型范式抽象类""" - system_prompt: str = "" - """系统提示词""" - user_prompt: str = "" - """用户提示词""" - input_tokens: int = 0 - """输入Token数量""" - output_tokens: int = 0 - """输出Token数量""" - + @staticmethod + def _default() -> tuple[dict[LanguageType, str], dict[LanguageType, str]]: + """默认的Prompt内容;分别返回系统提示词和用户提示词""" + return {}, {} - def __init__(self, system_prompt: str | None = None, user_prompt: str | None = None) -> None: + def __init__( + self, + system_prompt: dict[LanguageType, str] | None = None, + user_prompt: dict[LanguageType, str] | None = None, + ) -> None: """ 检查是否已经自定义了Prompt;有的话就用自定义的;同时对Prompt进行空格清除 :param system_prompt: 系统提示词,f-string格式 :param user_prompt: 用户提示词,f-string格式 """ + self.input_tokens = 0 + self.output_tokens = 0 + + self.system_prompt, self.user_prompt = self._default() + if system_prompt is not None: self.system_prompt = system_prompt if user_prompt is not None: self.user_prompt = user_prompt - if not self.user_prompt: - err = "必须设置用户提示词!" - raise ValueError(err) - - self.system_prompt = dedent(self.system_prompt).strip("\n") - self.user_prompt = dedent(self.user_prompt).strip("\n") + self.system_prompt = {lang: dedent(prompt).strip("\n") for lang, prompt in self.system_prompt.items()} + self.user_prompt = {lang: dedent(prompt).strip("\n") for lang, prompt in self.user_prompt.items()} @abstractmethod async def generate(self, **kwargs): # noqa: ANN003, ANN201 diff --git a/apps/llm/patterns/executor.py b/apps/llm/patterns/executor.py index 94d66575083f8c22de68f12bfe95f98137a58153..ecb514a6ea0770cb05a81d7a7e2d153c0721949e 100644 --- a/apps/llm/patterns/executor.py +++ b/apps/llm/patterns/executor.py @@ -5,6 +5,7 @@ from typing import TYPE_CHECKING, Any from apps.llm.reasoning import ReasoningLLM from apps.llm.snippet import convert_context_to_prompt, facts_to_prompt +from apps.schemas.enum_var import LanguageType from .core import CorePattern @@ -15,40 +16,84 @@ if TYPE_CHECKING: class ExecutorThought(CorePattern): """通过大模型生成Executor的思考内容""" - user_prompt: str = r""" - - - 你是一个可以使用工具的智能助手。 - 在回答用户的问题时,你为了获取更多的信息,使用了一个工具。 - 请简明扼要地总结工具的使用过程,提供你的见解,并给出下一步的行动。 - - 注意: - 工具的相关信息在标签中给出。 - 为了使你更好的理解发生了什么,你之前的思考过程在标签中给出。 - 输出时请不要包含XML标签,输出时请保持简明和清晰。 - - - - - {tool_name} - {tool_description} - {tool_output} - - - - {last_thought} - - - - 你当前需要解决的问题是: - {user_question} - - - 请综合以上信息,再次一步一步地进行思考,并给出见解和行动: - """ - """用户提示词""" - - def __init__(self, system_prompt: str | None = None, user_prompt: str | None = None) -> None: + @staticmethod + def _default() -> tuple[dict[LanguageType, str], dict[LanguageType, str]]: + """默认的Prompt内容""" + return { + LanguageType.CHINESE: r"You are a helpful assistant.", + LanguageType.ENGLISH: r"You are a helpful assistant.", + }, { + LanguageType.CHINESE: r""" + + + 你是一个可以使用工具的智能助手。 + 在回答用户的问题时,你为了获取更多的信息,使用了一个工具。 + 请简明扼要地总结工具的使用过程,提供你的见解,并给出下一步的行动。 + + 注意: + 工具的相关信息在标签中给出。 + 为了使你更好的理解发生了什么,你之前的思考过程在标签中给出。 + 输出时请不要包含XML标签,输出时请保持简明和清晰。 + + + + + {tool_name} + {tool_description} + {tool_output} + + + + {last_thought} + + + + 你当前需要解决的问题是: + {user_question} + + + 请综合以上信息,再次一步一步地进行思考,并给出见解和行动: + """, + LanguageType.ENGLISH: r""" + + + You are an intelligent assistant who can use tools. + When answering user questions, you use a tool to get more information. + Please summarize the process of using the tool briefly, provide your insights, \ +and give the next action. + + Note: + The information about the tool is given in the tag. + To help you better understand what happened, your previous thought process is given in the \ + tag. + Do not include XML tags in the output, and keep the output brief and clear. + + + + + {tool_name} + {tool_description} + {tool_output} + + + + {last_thought} + + + + The question you need to solve is: + {user_question} + + + Please integrate the above information, think step by step again, provide insights, and give actions: + """, + } + + def __init__( + self, + system_prompt: dict[LanguageType, str] | None = None, + user_prompt: dict[LanguageType, str] | None = None, + ) -> None: """处理Prompt""" super().__init__(system_prompt, user_prompt) @@ -57,16 +102,20 @@ class ExecutorThought(CorePattern): last_thought: str = kwargs["last_thought"] user_question: str = kwargs["user_question"] tool_info: dict[str, Any] = kwargs["tool_info"] + language: LanguageType = kwargs.get("language", LanguageType.CHINESE) messages = [ {"role": "system", "content": "You are a helpful assistant."}, - {"role": "user", "content": self.user_prompt.format( - last_thought=last_thought, - user_question=user_question, - tool_name=tool_info["name"], - tool_description=tool_info["description"], - tool_output=tool_info["output"], - )}, + { + "role": "user", + "content": self.user_prompt[language].format( + last_thought=last_thought, + user_question=user_question, + tool_name=tool_info["name"], + tool_description=tool_info["description"], + tool_output=tool_info["output"], + ), + }, ] llm = ReasoningLLM() @@ -82,45 +131,76 @@ class ExecutorThought(CorePattern): class ExecutorSummary(CorePattern): """使用大模型进行生成Executor初始背景""" - user_prompt: str = r""" - - 根据给定的对话记录和关键事实,生成一个三句话背景总结。这个总结将用于后续对话的上下文理解。 - - 生成总结的要求如下: - 1. 突出重要信息点,例如时间、地点、人物、事件等。 - 2. “关键事实”中的内容可在生成总结时作为已知信息。 - 3. 输出时请不要包含XML标签,确保信息准确性,不得编造信息。 - 4. 总结应少于3句话,应少于300个字。 - - 对话记录将在标签中给出,关键事实将在标签中给出。 - - - {conversation} - - - {facts} - - - 现在,请开始生成背景总结: - """ - """用户提示词""" - - def __init__(self, system_prompt: str | None = None, user_prompt: str | None = None) -> None: - """初始化Background模式""" - super().__init__(system_prompt, user_prompt) + @staticmethod + def _default() -> tuple[dict[LanguageType, str], dict[LanguageType, str]]: + """默认的Prompt内容""" + return { + LanguageType.CHINESE: r"You are a helpful assistant.", + LanguageType.ENGLISH: r"You are a helpful assistant.", + }, { + LanguageType.CHINESE: r""" + + 根据给定的对话记录和关键事实,生成一个三句话背景总结。这个总结将用于后续对话的上下文理解。 + + 生成总结的要求如下: + 1. 突出重要信息点,例如时间、地点、人物、事件等。 + 2. “关键事实”中的内容可在生成总结时作为已知信息。 + 3. 输出时请不要包含XML标签,确保信息准确性,不得编造信息。 + 4. 总结应少于3句话,应少于300个字。 + + 对话记录将在标签中给出,关键事实将在标签中给出。 + + + {conversation} + + + {facts} + + + 现在,请开始生成背景总结: + """, + LanguageType.ENGLISH: r""" + + Based on the given conversation records and key facts, generate a three-sentence background \ +summary.This summary will be used for context understanding in subsequent conversations. + + The requirements for generating the summary are as follows: + 1. Highlight important information points, such as time, location, people, events, etc. + 2. The content in the "key facts" can be used as known information when generating the summary. + 3. Do not include XML tags in the output, ensure the accuracy of the information, and do not \ +make up information. + 4. The summary should be less than 3 sentences and less than 300 words. + + The conversation records will be given in the tag, and the key facts will be given \ +in the tag. + + + {conversation} + + + {facts} + + + Now, please start generating the background summary: + """, + } async def generate(self, **kwargs) -> str: # noqa: ANN003 """进行初始背景生成""" background: ExecutorBackground = kwargs["background"] conversation_str = convert_context_to_prompt(background.conversation) facts_str = facts_to_prompt(background.facts) + language = kwargs.get("language", LanguageType.CHINESE) messages = [ {"role": "system", "content": "You are a helpful assistant."}, - {"role": "user", "content": self.user_prompt.format( - facts=facts_str, - conversation=conversation_str, - )}, + { + "role": "user", + "content": self.user_prompt[language].format( + facts=facts_str, + conversation=conversation_str, + ), + }, ] result = "" diff --git a/apps/llm/patterns/facts.py b/apps/llm/patterns/facts.py index 0cafbc7752aa64c724c55770df640f8073038518..c8510b271491bb04be21f894e2956f9aa819e47e 100644 --- a/apps/llm/patterns/facts.py +++ b/apps/llm/patterns/facts.py @@ -8,6 +8,7 @@ from pydantic import BaseModel, Field from apps.llm.function import JsonGenerator from apps.llm.reasoning import ReasoningLLM from apps.llm.snippet import convert_context_to_prompt +from apps.schemas.enum_var import LanguageType from .core import CorePattern @@ -23,62 +24,112 @@ class FactsResult(BaseModel): class Facts(CorePattern): """事实提取""" - system_prompt: str = "You are a helpful assistant." - """系统提示词(暂不使用)""" - - user_prompt: str = r""" - - - 从对话中提取关键信息,并将它们组织成独一无二的、易于理解的事实,包含用户偏好、关系、实体等有用信息。 - 以下是需要关注的信息类型以及有关如何处理输入数据的详细说明。 - - **你需要关注的信息类型** - 1. 实体:对话中涉及到的实体。例如:姓名、地点、组织、事件等。 - 2. 偏好:对待实体的态度。例如喜欢、讨厌等。 - 3. 关系:用户与实体之间,或两个实体之间的关系。例如包含、并列、互斥等。 - 4. 动作:对实体产生影响的具体动作。例如查询、搜索、浏览、点击等。 - - **要求** - 1. 事实必须准确,只能从对话中提取。不要将样例中的信息体现在输出中。 - 2. 事实必须清晰、简洁、易于理解。必须少于30个字。 - 3. 必须按照以下JSON格式输出: - - {{ - "facts": ["事实1", "事实2", "事实3"] - }} - - - - - 杭州西湖有哪些景点? - 杭州西湖是中国浙江省杭州市的一个著名景点,以其美丽的自然风光和丰富的文化遗产而闻名。西湖周围有许多著名的景点,包括著名的苏堤、白堤、断桥、三潭印月等。西湖以其清澈的湖水和周围的山脉而著名,是中国最著名的湖泊之一。 - - + @staticmethod + def _default() -> tuple[dict[LanguageType, str], dict[LanguageType, str]]: + """默认的Prompt内容""" + return { + LanguageType.CHINESE: r"You are a helpful assistant.", + LanguageType.ENGLISH: r"You are a helpful assistant.", + }, { + LanguageType.CHINESE: r""" + + + 从对话中提取关键信息,并将它们组织成独一无二的、易于理解的事实,包含用户偏好、关系、实体等有用信息。 + 以下是需要关注的信息类型以及有关如何处理输入数据的详细说明。 + + **你需要关注的信息类型** + 1. 实体:对话中涉及到的实体。例如:姓名、地点、组织、事件等。 + 2. 偏好:对待实体的态度。例如喜欢、讨厌等。 + 3. 关系:用户与实体之间,或两个实体之间的关系。例如包含、并列、互斥等。 + 4. 动作:对实体产生影响的具体动作。例如查询、搜索、浏览、点击等。 + + **要求** + 1. 事实必须准确,只能从对话中提取。不要将样例中的信息体现在输出中。 + 2. 事实必须清晰、简洁、易于理解。必须少于30个字。 + 3. 必须按照以下JSON格式输出: + + {{ + "facts": ["事实1", "事实2", "事实3"] + }} + + + + + 杭州西湖有哪些景点? + 杭州西湖是中国浙江省杭州市的一个著名景点,以其美丽的自然风光和丰富的文化遗产而闻名。\ +西湖周围有许多著名的景点,包括著名的苏堤、白堤、断桥、三潭印月等。西湖以其清澈的湖水和周围的山脉而著名,是中国最著名的湖泊之一。 + + + + {{ + "facts": ["杭州西湖有苏堤、白堤、断桥、三潭印月等景点"] + }} + + + + + {conversation} - {{ - "facts": ["杭州西湖有苏堤、白堤、断桥、三潭印月等景点"] - }} - - - - - {conversation} - - """ - """用户提示词""" - - - def __init__(self, system_prompt: str | None = None, user_prompt: str | None = None) -> None: - """初始化Prompt""" - super().__init__(system_prompt, user_prompt) + """, + LanguageType.ENGLISH: r""" + + + Extract key information from the conversation and organize it into unique, easily \ +understandable facts that include user preferences, relationships, entities, etc. + The following are the types of information to be paid attention to and detailed instructions \ +on how to handle the input data. + + **Types of information to be paid attention to** + 1. Entities: Entities involved in the conversation. For example: names, locations, \ +organizations, events, etc. + 2. Preferences: Attitudes towards entities. For example: like, dislike, etc. + 3. Relationships: Relationships between the user and entities, or between two entities. \ +For example: include, parallel, exclusive, etc. + 4. Actions: Specific actions that affect entities. For example: query, search, browse, \ +click, etc. + + **Requirements** + 1. Facts must be accurate and can only be extracted from the conversation. Do not include \ +information from the sample in the output. + 2. Facts must be clear, concise, and easy to understand. Must be less than 30 words. + 3. Output in the following JSON format: + + {{ + "facts": ["fact1", "fact2", "fact3"] + }} + + + + + What are the attractions in West Lake, Hangzhou? + West Lake in Hangzhou is a famous scenic spot in Hangzhou, Zhejiang Province, \ +China, famous for its beautiful natural scenery and rich cultural heritage. There are many famous attractions around \ +West Lake, including the famous Su Causeway, Bai Causeway, Broken Bridge, Three Pools Mirroring the Moon, etc. West \ +Lake is famous for its clear water and surrounding mountains, and is one of the most famous lakes in China. + + + + {{ + "facts": ["West Lake has the famous attractions of Suzhou Embankment, Bai Embankment, \ +Qiantang Bridge, San Tang Yin Yue, etc."] + }} + + + + + {conversation} + + """, + } async def generate(self, **kwargs) -> list[str]: # noqa: ANN003 """事实提取""" conversation = convert_context_to_prompt(kwargs["conversation"]) + language = kwargs.get("language", LanguageType.CHINESE) messages = [ {"role": "system", "content": self.system_prompt}, - {"role": "user", "content": self.user_prompt.format(conversation=conversation)}, + {"role": "user", "content": self.user_prompt[language].format(conversation=conversation)}, ] result = "" llm = ReasoningLLM() diff --git a/apps/llm/patterns/rewrite.py b/apps/llm/patterns/rewrite.py index 46d89e4ad50810bc932a453c2595a1c831b6b7fa..23d20c7a62f9d17612390b4400d43fe0f8882f20 100644 --- a/apps/llm/patterns/rewrite.py +++ b/apps/llm/patterns/rewrite.py @@ -8,6 +8,7 @@ from pydantic import BaseModel, Field from apps.llm.function import JsonGenerator from apps.llm.reasoning import ReasoningLLM from apps.llm.token import TokenCalculator +from apps.schemas.enum_var import LanguageType from .core import CorePattern @@ -23,67 +24,131 @@ class QuestionRewriteResult(BaseModel): class QuestionRewrite(CorePattern): """问题补全与重写""" - system_prompt: str = "You are a helpful assistant." - """系统提示词""" - - user_prompt: str = r""" - - - 根据历史对话,推断用户的实际意图并补全用户的提问内容,历史对话被包含在标签中,用户意图被包含在标签中。 - 要求: - 1. 请使用JSON格式输出,参考下面给出的样例;不要包含任何XML标签,不要包含任何解释说明; - 2. 若用户当前提问内容与对话上文不相关,或你认为用户的提问内容已足够完整,请直接输出用户的提问内容。 - 3. 补全内容必须精准、恰当,不要编造任何内容。 - 4. 请输出补全后的问题,不要输出其他内容。 - 输出格式样例: - {{ - "question": "补全后的问题" - }} - - - - - + @staticmethod + def _default() -> tuple[dict[LanguageType, str], dict[LanguageType, str]]: + """默认的Prompt内容""" + return { + LanguageType.CHINESE: r"You are a helpful assistant.", + LanguageType.ENGLISH: r"You are a helpful assistant.", + }, { + LanguageType.CHINESE: r""" + + + 根据历史对话,推断用户的实际意图并补全用户的提问内容,历史对话被包含在标签中,用户意图被包含在标签中。 + 要求: + 1. 请使用JSON格式输出,参考下面给出的样例;不要包含任何XML标签,不要包含任何解释说明; + 2. 若用户当前提问内容与对话上文不相关,或你认为用户的提问内容已足够完整,请直接输出用户的提问内容。 + 3. 补全内容必须精准、恰当,不要编造任何内容。 + 4. 请输出补全后的问题,不要输出其他内容。 + 输出格式样例: + {{ + "question": "补全后的问题" + }} + + + + + + + openEuler的优势有哪些? + + + openEuler的优势包括开源、社区支持、以及对云计算和边缘计算的优化。 + + + + + + 详细点? + + + {{ + "question": "详细说明openEuler操作系统的优势和应用场景" + }} + + + + + + {history} + - openEuler的优势有哪些? + {question} - - openEuler的优势包括开源、社区支持、以及对云计算和边缘计算的优化。 - - - - - - 详细点? - - - {{ - "question": "详细说明openEuler操作系统的优势和应用场景" - }} - - - - - - {history} - - - {question} - - - 现在,请输出补全后的问题: - - """ - """用户提示词""" + + 现在,请输出补全后的问题: + + """, + LanguageType.ENGLISH: r""" + + + Based on the historical dialogue, infer the user's actual intent and complete the user's question. \ +The historical dialogue is contained within the tags, and the user's intent is contained within the \ + tags. + Requirements: + 1. Please output in JSON format, referring to the example provided below; do not include any XML \ +tags or any explanatory notes; + 2. If the user's current question is unrelated to the previous dialogue or you believe the \ +user's question is already complete enough, directly output the user's question. + 3. The completed content must be precise and appropriate; do not fabricate any content. + 4. Output only the completed question; do not include any other content. + Example output format: + {{ + "question": "The completed question" + }} + + + + + + + What are the features of openEuler? + + + Compared to other operating systems, openEuler's features include support for multiple \ +hardware architectures and providing a stable, secure, and efficient operating system platform. + + + + + What are the advantages of openEuler? + + + The advantages of openEuler include being open-source, having community support, \ +and optimizations for cloud and edge computing. + + + + + + More details? + + + {{ + "question": "What are the features of openEuler? Please elaborate on its advantages and \ +application scenarios." + }} + + + + + {history} + + + {question} + + """, + } async def generate(self, **kwargs) -> str: # noqa: ANN003 """问题补全与重写""" history = kwargs.get("history", []) question = kwargs["question"] + language = kwargs.get("language", LanguageType.CHINESE) messages = [ - {"role": "system", "content": self.system_prompt}, - {"role": "user", "content": self.user_prompt.format(history="", question=question)}, + {"role": "system", "content": self.system_prompt[language]}, + {"role": "user", "content": self.user_prompt[language].format(history="", question=question)}, ] llm = kwargs.get("llm") if not llm: @@ -95,8 +160,8 @@ class QuestionRewrite(CorePattern): return question index = 0 qa = "" - while index < len(history)-1 and leave_tokens > 0: - q = history[index-1].get("content", "") + while index < len(history) - 1 and leave_tokens > 0: + q = history[index - 1].get("content", "") a = history[index].get("content", "") sub_qa = f"\n\n{q}\n\n\n{a}\n\n" leave_tokens -= TokenCalculator().calculate_token_length( @@ -109,7 +174,7 @@ class QuestionRewrite(CorePattern): qa = sub_qa + qa index += 2 - messages[1]["content"] = self.user_prompt.format(history=qa, question=question) + messages[1]["content"] = self.user_prompt[language].format(history=qa, question=question) result = "" async for chunk in llm.call(messages, streaming=False): result += chunk diff --git a/apps/llm/patterns/select.py b/apps/llm/patterns/select.py index a0b950fc85370d2c56c1f55f5170e1ce460ffdc4..d28a1c84b8c01a5ab4a7303eed405e61f0a67ade 100644 --- a/apps/llm/patterns/select.py +++ b/apps/llm/patterns/select.py @@ -5,11 +5,12 @@ import asyncio import json import logging from collections import Counter -from typing import Any, ClassVar +from typing import Any from apps.llm.function import JsonGenerator from apps.llm.reasoning import ReasoningLLM from apps.llm.snippet import choices_to_prompt +from apps.schemas.enum_var import LanguageType from .core import CorePattern @@ -19,78 +20,140 @@ logger = logging.getLogger(__name__) class Select(CorePattern): """通过投票选择最佳答案""" - system_prompt: str = "You are a helpful assistant." - """系统提示词""" + @staticmethod + def _default() -> tuple[dict[LanguageType, str], dict[LanguageType, str]]: + """默认的Prompt内容""" + return { + LanguageType.CHINESE: r"You are a helpful assistant.", + LanguageType.ENGLISH: r"You are a helpful assistant.", + }, { + LanguageType.CHINESE: r""" + + + 根据历史对话(包括工具调用结果)和用户问题,从给出的选项列表中,选出最符合要求的那一项。 + 在输出之前,请先思考,并使用“”标签给出思考过程。 + 结果需要使用JSON格式输出,输出格式为:{{ "choice": "选项名称" }} + + + + + 使用天气API,查询明天杭州的天气信息 + + + + API + HTTP请求,获得返回的JSON数据 + + + SQL + 查询数据库,获得数据库表中的数据 + + + + + + API 工具可以通过 API 来获取外部数据,而天气信息可能就存储在外部数据中,由于用户说明中明确 \ +提到了天气 API 的使用,因此应该优先使用 API 工具。SQL 工具用于从数据库中获取信息,考虑到天气数据的可变性和动态性\ +,不太可能存储在数据库中,因此 SQL 工具的优先级相对较低,最佳选择似乎是“API:请求特定 API,获取返回的 JSON 数据”。 + + + + {{ "choice": "API" }} + + + - user_prompt: str = r""" - - - 根据历史对话(包括工具调用结果)和用户问题,从给出的选项列表中,选出最符合要求的那一项。 - 在输出之前,请先思考,并使用“”标签给出思考过程。 - 结果需要使用JSON格式输出,输出格式为:{{ "choice": "选项名称" }} - - - - 使用天气API,查询明天杭州的天气信息 + + {question} + - - API - HTTP请求,获得返回的JSON数据 - - - SQL - 查询数据库,获得数据库表中的数据 - + {choice_list} - API 工具可以通过 API 来获取外部数据,而天气信息可能就存储在外部数据中,由于用户说明中明确提到了 \ - 天气 API 的使用,因此应该优先使用 API 工具。\ - SQL 工具用于从数据库中获取信息,考虑到天气数据的可变性和动态性,不太可能存储在数据库中,因此 \ - SQL 工具的优先级相对较低,\ - 最佳选择似乎是“API:请求特定 API,获取返回的 JSON 数据”。 - - - - {{ "choice": "API" }} - - - - - - - {question} - - - - {choice_list} - - - - - 让我们一步一步思考。 - """ - """用户提示词""" - - slot_schema: ClassVar[dict[str, Any]] = { - "type": "object", - "properties": { - "choice": { - "type": "string", - "description": "The choice of the option.", - }, - }, - "required": ["choice"], - } - """最终输出的JSON Schema""" - - - def __init__(self, system_prompt: str | None = None, user_prompt: str | None = None) -> None: + 让我们一步一步思考。 + """, + LanguageType.ENGLISH: r""" + + + Based on the historical dialogue (including tool call results) and user question, select the \ +most suitable option from the given option list. + Before outputting, please think carefully and use the "" tag to give the thinking \ +process. + The output needs to be in JSON format, the output format is: {{ "choice": "option name" }} + + + + + Use the weather API to query the weather information of Hangzhou \ +tomorrow + + + + API + HTTP request, get the returned JSON data + + + SQL + Query the database, get the data in the database table + + + + + + The API tool can get external data through API, and the weather information may be stored \ +in external data. Since the user clearly mentioned the use of weather API, it should be given priority to the API \ +tool. The SQL tool is used to get information from the database, considering the variability and dynamism of weather \ +data, it is unlikely to be stored in the database, so the priority of the SQL tool is relatively low, \ +The best choice seems to be "API: request a specific API, get the returned JSON data". + + + + {{ "choice": "API" }} + + + + + + {question} + + + + {choice_list} + + + + + Let's think step by step. + + + """, + } + + def __init__( + self, + system_prompt: dict[LanguageType, str] | None = None, + user_prompt: dict[LanguageType, str] | None = None, + slot_schema: dict[str, Any] | None = None, + ) -> None: """初始化Prompt""" super().__init__(system_prompt, user_prompt) + if slot_schema is not None: + self.slot_schema = slot_schema + else: + self.slot_schema = { + "type": "object", + "properties": { + "choice": { + "type": "string", + "description": "The choice of the option.", + }, + }, + "required": ["choice"], + } async def _generate_single_attempt(self, user_input: str, choice_list: list[str]) -> str: @@ -130,6 +193,7 @@ class Select(CorePattern): background = kwargs.get("background", "无背景信息。") data_str = json.dumps(kwargs.get("data", {}), ensure_ascii=False) + language = kwargs.get("language", LanguageType.CHINESE) choice_prompt, choices_list = choices_to_prompt(kwargs["choices"]) @@ -142,7 +206,7 @@ class Select(CorePattern): return choices_list[0] logger.info("[Select] 选项列表: %s", choice_prompt) - user_input = self.user_prompt.format( + user_input = self.user_prompt[language].format( question=kwargs["question"], background=background, data=data_str, diff --git a/apps/models/task.py b/apps/models/task.py index ad5c96f6ad75e4774f986e2fe22e9c46053ff455..36f82152cbeb06cc5b995f6939aa720380870018 100644 --- a/apps/models/task.py +++ b/apps/models/task.py @@ -8,7 +8,7 @@ from sqlalchemy import DateTime, Enum, Float, ForeignKey, Integer, String, Text from sqlalchemy.dialects.postgresql import JSONB, UUID from sqlalchemy.orm import Mapped, mapped_column -from apps.schemas.enum_var import ExecutorStatus, Language, StepStatus +from apps.schemas.enum_var import ExecutorStatus, LanguageType, StepStatus from .base import Base @@ -66,7 +66,7 @@ class TaskRuntime(Base): """计划""" document: Mapped[list[dict[uuid.UUID, Any]]] = mapped_column(JSONB, nullable=False, default={}) """关联文档""" - language: Mapped[Language] = mapped_column(Enum(Language), nullable=False, default=Language.ZH) + language: Mapped[LanguageType] = mapped_column(Enum(LanguageType), nullable=False, default=LanguageType.CHINESE) """语言""" diff --git a/apps/routers/chat.py b/apps/routers/chat.py index 70491d7281c4339d00699620f400b42ed9078325..7bba12989b8aa115ea0f468d70f7e161408f62ea 100644 --- a/apps/routers/chat.py +++ b/apps/routers/chat.py @@ -15,10 +15,9 @@ from apps.common.wordscheck import WordsCheck from apps.dependency import verify_personal_token, verify_session from apps.scheduler.scheduler import Scheduler from apps.scheduler.scheduler.context import save_data -from apps.schemas.enum_var import ExecutorStatus +from apps.schemas.enum_var import ExecutorStatus, LanguageType from apps.schemas.request_data import RequestData, RequestDataApp from apps.schemas.response_data import ResponseData -from apps.schemas.task import Task from apps.services.activity import Activity from apps.services.blacklist import QuestionBlacklistManager, UserBlacklistManager from apps.services.conversation import ConversationManager diff --git a/apps/scheduler/call/api/api.py b/apps/scheduler/call/api/api.py index 8c43f4824758b2a29fbb588118fc18b8be0d1bff..008325af36a24cfc953763edf37cc4f82c289a68 100644 --- a/apps/scheduler/call/api/api.py +++ b/apps/scheduler/call/api/api.py @@ -14,7 +14,7 @@ from pydantic.json_schema import SkipJsonSchema from apps.common.oidc import oidc_provider from apps.scheduler.call.core import CoreCall -from apps.schemas.enum_var import CallOutputType, ContentType, HTTPMethod +from apps.schemas.enum_var import CallOutputType, ContentType, HTTPMethod, LanguageType from apps.schemas.scheduler import ( CallError, CallInfo, diff --git a/apps/scheduler/call/core.py b/apps/scheduler/call/core.py index 127498dc7626e631df3e14c47a9f2fd4a8541b2f..ca812693a5fc46cddeb072bde5ead403ab875d5c 100644 --- a/apps/scheduler/call/core.py +++ b/apps/scheduler/call/core.py @@ -17,7 +17,7 @@ from apps.llm.reasoning import ReasoningLLM from apps.models.llm import LLMData from apps.models.node import NodeInfo from apps.models.task import ExecutorHistory -from apps.schemas.enum_var import CallOutputType +from apps.schemas.enum_var import CallOutputType, LanguageType from apps.schemas.scheduler import ( CallError, CallIds, @@ -86,7 +86,7 @@ class CoreCall(BaseModel): @classmethod - def info(cls) -> CallInfo: + def info(cls, language: LanguageType = LanguageType.CHINESE) -> CallInfo: """返回Call的名称和描述""" err = "[CoreCall] 必须手动实现info方法" raise NotImplementedError(err) @@ -108,6 +108,7 @@ class CoreCall(BaseModel): history_order.append(item_obj.step_id) return CallVars( + language=executor.task.language, ids=CallIds( task_id=executor.task.id, flow_id=executor.task.state.flow_id, diff --git a/apps/scheduler/call/empty.py b/apps/scheduler/call/empty.py index 3c743f3bd6231b38b5f4a554e5a97270a31e9ac8..b7138206085289c9d8179d50e938dd8cf30b7982 100644 --- a/apps/scheduler/call/empty.py +++ b/apps/scheduler/call/empty.py @@ -4,7 +4,7 @@ from collections.abc import AsyncGenerator from typing import Any -from apps.schemas.enum_var import CallOutputType +from apps.schemas.enum_var import CallOutputType, LanguageType from apps.schemas.scheduler import CallInfo, CallOutputChunk, CallVars from .core import CoreCall, DataBase @@ -14,14 +14,18 @@ class Empty(CoreCall, input_model=DataBase, output_model=DataBase): """空Call""" @classmethod - def info(cls) -> CallInfo: + def info(cls, language: LanguageType = LanguageType.CHINESE) -> CallInfo: """ 返回Call的名称和描述 :return: Call的名称和描述 :rtype: CallInfo """ - return CallInfo(name="空白", description="空白节点,用于占位") + i18n_info = { + LanguageType.CHINESE: CallInfo(name="空白", description="空白节点,用于占位"), + LanguageType.ENGLISH: CallInfo(name="Empty", description="Empty node, used for placeholder"), + } + return i18n_info[language] async def _init(self, call_vars: CallVars) -> DataBase: diff --git a/apps/scheduler/call/facts/facts.py b/apps/scheduler/call/facts/facts.py index cfe9276d8ea3625bc8fa30836b91b7b84dc96b8b..ee95b495f294bfe99cbabd24b095378dab8b7578 100644 --- a/apps/scheduler/call/facts/facts.py +++ b/apps/scheduler/call/facts/facts.py @@ -10,7 +10,7 @@ from pydantic import Field from apps.models.node import NodeInfo from apps.scheduler.call.core import CoreCall -from apps.schemas.enum_var import CallOutputType +from apps.schemas.enum_var import CallOutputType, LanguageType from apps.schemas.scheduler import CallInfo, CallOutputChunk, CallVars from apps.services.user_tag import UserTagManager @@ -33,9 +33,16 @@ class FactsCall(CoreCall, input_model=FactsInput, output_model=FactsOutput): @classmethod - def info(cls) -> CallInfo: + def info(cls, language: LanguageType = LanguageType.CHINESE) -> CallInfo: """返回Call的名称和描述""" - return CallInfo(name="提取事实", description="从对话上下文和文档片段中提取事实。") + i18n_info = { + LanguageType.CHINESE: CallInfo(name="提取事实", description="从对话上下文和文档片段中提取事实。"), + LanguageType.ENGLISH: CallInfo( + name="Fact Extraction", + description="Extract facts from the conversation context and document snippets.", + ), + } + return i18n_info[language] @classmethod @@ -79,7 +86,7 @@ class FactsCall(CoreCall, input_model=FactsInput, output_model=FactsOutput): ) # 提取事实信息 - facts_tpl = env.from_string(FACTS_PROMPT) + facts_tpl = env.from_string(FACTS_PROMPT[self._sys_vars.language]) facts_prompt = facts_tpl.render(conversation=data.message) facts_obj: FactsGen = await self._json([ {"role": "system", "content": "You are a helpful assistant."}, @@ -87,7 +94,7 @@ class FactsCall(CoreCall, input_model=FactsInput, output_model=FactsOutput): ], FactsGen) # type: ignore[arg-type] # 更新用户画像 - domain_tpl = env.from_string(DOMAIN_PROMPT) + domain_tpl = env.from_string(DOMAIN_PROMPT[self._sys_vars.language]) domain_prompt = domain_tpl.render(conversation=data.message) domain_list: DomainGen = await self._json([ {"role": "system", "content": "You are a helpful assistant."}, diff --git a/apps/scheduler/call/facts/prompt.py b/apps/scheduler/call/facts/prompt.py index b2b2513f2c28feb4d19f5fb4ee3eba1bd616a4dc..39b3e7fc8fde178456b3c54288abfe387105b0ca 100644 --- a/apps/scheduler/call/facts/prompt.py +++ b/apps/scheduler/call/facts/prompt.py @@ -3,81 +3,184 @@ from textwrap import dedent -DOMAIN_PROMPT: str = dedent(r""" - - - 根据对话上文,提取推荐系统所需的关键词标签,要求: - 1. 实体名词、技术术语、时间范围、地点、产品等关键信息均可作为关键词标签 - 2. 至少一个关键词与对话的话题有关 - 3. 标签需精简,不得重复,不得超过10个字 - 4. 使用JSON格式输出,不要包含XML标签,不要包含任何解释说明 - - - - - 北京天气如何? - 北京今天晴。 - - - - { - "keywords": ["北京", "天气"] - } - - - - - - {% for item in conversation %} - <{{item['role']}}> - {{item['content']}} - - {% endfor %} - - -""") -FACTS_PROMPT: str = dedent(r""" - - - 从对话中提取关键信息,并将它们组织成独一无二的、易于理解的事实,包含用户偏好、关系、实体等有用信息。 - 以下是需要关注的信息类型以及有关如何处理输入数据的详细说明。 - - **你需要关注的信息类型** - 1. 实体:对话中涉及到的实体。例如:姓名、地点、组织、事件等。 - 2. 偏好:对待实体的态度。例如喜欢、讨厌等。 - 3. 关系:用户与实体之间,或两个实体之间的关系。例如包含、并列、互斥等。 - 4. 动作:对实体产生影响的具体动作。例如查询、搜索、浏览、点击等。 - - **要求** - 1. 事实必须准确,只能从对话中提取。不要将样例中的信息体现在输出中。 - 2. 事实必须清晰、简洁、易于理解。必须少于30个字。 - 3. 必须按照以下JSON格式输出: - - { - "facts": ["事实1", "事实2", "事实3"] - } - - - +from apps.schemas.enum_var import LanguageType + +DOMAIN_PROMPT: dict[LanguageType, str] = { + LanguageType.CHINESE: dedent( + r""" + + + 根据对话上文,提取推荐系统所需的关键词标签,要求: + 1. 实体名词、技术术语、时间范围、地点、产品等关键信息均可作为关键词标签 + 2. 至少一个关键词与对话的话题有关 + 3. 标签需精简,不得重复,不得超过10个字 + 4. 使用JSON格式输出,不要包含XML标签,不要包含任何解释说明 + + + + + 北京天气如何? + 北京今天晴。 + + + + { + "keywords": ["北京", "天气"] + } + + + + + + {% for item in conversation %} + <{{item['role']}}> + {{item['content']}} + + {% endfor %} + + + """, + ), + LanguageType.ENGLISH: dedent( + r""" + + + Extract keywords for recommendation system based on the previous conversation, requirements: + 1. Entity nouns, technical terms, time range, location, product, etc. can be keyword tags + 2. At least one keyword is related to the topic of the conversation + 3. Tags should be concise and not repeated, not exceeding 10 characters + 4. Output in JSON format, do not include XML tags, do not include any explanatory notes + + + + + What's the weather like in Beijing? + Beijing is sunny today. + + + + { + "keywords": ["Beijing", "weather"] + } + + + + + + {% for item in conversation %} + <{{item['role']}}> + {{item['content']}} + + {% endfor %} + + + """, + ), +} + +FACTS_PROMPT: dict[str, str] = { + LanguageType.CHINESE: dedent( + r""" + + + 从对话中提取关键信息,并将它们组织成独一无二的、易于理解的事实,包含用户偏好、关系、实体等有用信息。 + 以下是需要关注的信息类型以及有关如何处理输入数据的详细说明。 + + **你需要关注的信息类型** + 1. 实体:对话中涉及到的实体。例如:姓名、地点、组织、事件等。 + 2. 偏好:对待实体的态度。例如喜欢、讨厌等。 + 3. 关系:用户与实体之间,或两个实体之间的关系。例如包含、并列、互斥等。 + 4. 动作:对实体产生影响的具体动作。例如查询、搜索、浏览、点击等。 + + **要求** + 1. 事实必须准确,只能从对话中提取。不要将样例中的信息体现在输出中。 + 2. 事实必须清晰、简洁、易于理解。必须少于30个字。 + 3. 必须按照以下JSON格式输出: + + { + "facts": ["事实1", "事实2", "事实3"] + } + + + + + 杭州西湖有哪些景点? + 杭州西湖是中国浙江省杭州市的一个著名景点,以其美丽的自然风光和丰富的文化遗产而闻名。西湖周围有许多著名的景点,\ +包括著名的苏堤、白堤、断桥、三潭印月等。西湖以其清澈的湖水和周围的山脉而著名,是中国最著名的湖泊之一。 + + + + { + "facts": ["杭州西湖有苏堤、白堤、断桥、三潭印月等景点"] + } + + + + - 杭州西湖有哪些景点? - 杭州西湖是中国浙江省杭州市的一个著名景点,以其美丽的自然风光和丰富的文化遗产而闻名。西湖周围有许多著名的景点,包括著名的苏堤、白堤、断桥、三潭印月等。西湖以其清澈的湖水和周围的山脉而著名,是中国最著名的湖泊之一。 + {% for item in conversation %} + <{{item['role']}}> + {{item['content']}} + + {% endfor %} + + """, + ), + LanguageType.ENGLISH: dedent( + r""" + + + Extract key information from the conversation and organize it into unique, easily understandable \ +facts, including user preferences, relationships, entities, etc. + The following are the types of information you need to pay attention to and detailed instructions \ +on how to handle input data. + **Types of information you need to pay attention to** + 1. Entities: Entities involved in the conversation. For example: names, locations, organizations, \ +events, etc. + 2. Preferences: Attitudes towards entities. For example: like, dislike, etc. + 3. Relationships: Relationships between users and entities, or between two entities. For example: \ +include, parallel, mutually exclusive, etc. + 4. Actions: Specific actions that affect entities. For example: query, search, browse, click, etc. + + **Requirements** + 1. Facts must be accurate and can only be extracted from the conversation. Do not include the \ +information in the example in the output. + 2. Facts must be clear, concise, and easy to understand. Must be less than 30 words. + 3. Output in the following JSON format: + + { + "facts": ["Fact 1", "Fact 2", "Fact 3"] + } + + + + + What are the attractions in Hangzhou West Lake ? + West Lake in Hangzhou, Zhejiang Province, China, is a famous scenic spot known for \ +its beautiful natural scenery and rich cultural heritage. Many notable attractions surround West Lake, including the \ +renowned Su Causeway, Bai Causeway, Broken Bridge, and the Three Pools Mirroring the Moon. Famous for its \ +crystal-clear waters and the surrounding mountains, West Lake is one of China's most famous lakes. + + + + { + "facts": ["Hangzhou West Lake has famous attractions such as Suzhou Embankment, Bai Budi, \ +Qiantang Bridge, San Tang Yue, etc."] + } + + + + + + {% for item in conversation %} + <{{item['role']}}> + {{item['content']}} + + {% endfor %} + - { - "facts": ["杭州西湖有苏堤、白堤、断桥、三潭印月等景点"] - } - - - - - - {% for item in conversation %} - <{{item['role']}}> - {{item['content']}} - - {% endfor %} - - -""") + """, + ), +} diff --git a/apps/scheduler/executor/flow.py b/apps/scheduler/executor/flow.py index aadb0f26c5e6629134342c941c62c41641efcc63..083625c68ee4183861d912e1d1a5dfc82fa160d7 100644 --- a/apps/scheduler/executor/flow.py +++ b/apps/scheduler/executor/flow.py @@ -10,7 +10,7 @@ from pydantic import Field from apps.models.task import ExecutorHistory, Task from apps.scheduler.call.llm.prompt import LLM_ERROR_PROMPT -from apps.schemas.enum_var import EventType, ExecutorStatus, SpecialCallType, StepStatus +from apps.schemas.enum_var import EventType, ExecutorStatus, LanguageType, SpecialCallType, StepStatus from apps.schemas.flow import Flow, Step from apps.schemas.request_data import RequestDataApp from apps.schemas.task import StepQueueItem @@ -22,21 +22,37 @@ from .step import StepExecutor logger = logging.getLogger(__name__) # 开始前的固定步骤 FIXED_STEPS_BEFORE_START = [ - Step( - name="理解上下文", - description="使用大模型,理解对话上下文", - node=SpecialCallType.SUMMARY.value, - type=SpecialCallType.SUMMARY.value, - ), + { + LanguageType.CHINESE: Step( + name="理解上下文", + description="使用大模型,理解对话上下文", + node=SpecialCallType.SUMMARY.value, + type=SpecialCallType.SUMMARY.value, + ), + LanguageType.ENGLISH: Step( + name="Understand context", + description="Use large model to understand the context of the dialogue", + node=SpecialCallType.SUMMARY.value, + type=SpecialCallType.SUMMARY.value, + ), + }, ] # 结束后的固定步骤 FIXED_STEPS_AFTER_END = [ - Step( - name="记忆存储", - description="理解对话答案,并存储到记忆中", - node=SpecialCallType.FACTS.value, - type=SpecialCallType.FACTS.value, - ), + { + LanguageType.CHINESE: Step( + name="记忆存储", + description="理解对话答案,并存储到记忆中", + node=SpecialCallType.FACTS.value, + type=SpecialCallType.FACTS.value, + ), + LanguageType.ENGLISH: Step( + name="Memory storage", + description="Understand the answer of the dialogue and store it in the memory", + node=SpecialCallType.FACTS.value, + type=SpecialCallType.FACTS.value, + ), + }, ] @@ -59,7 +75,11 @@ class FlowExecutor(BaseExecutor): """初始化FlowExecutor""" logger.info("[FlowExecutor] 加载Executor状态") # 尝试恢复State - if self.task.state and self.task.state.flow_status != ExecutorStatus.INIT: + if ( + self.task.state + and self.task.state.flow_status != FlowStatus.INIT + and self.task.state.flow_status != FlowStatus.UNKNOWN + ): self.task.context = await TaskManager.get_context_by_task_id(self.task.id) else: # 创建ExecutorState @@ -71,7 +91,7 @@ class FlowExecutor(BaseExecutor): step_status=StepStatus.RUNNING, app_id=str(self.post_body_app.app_id), step_id="start", - step_name="开始", + step_name="开始" if self.task.language == LanguageType.CHINESE else "Start", ) self.validate_flow_state(self.task) # 是否到达Flow结束终点(变量) @@ -175,12 +195,14 @@ class FlowExecutor(BaseExecutor): # 头插开始前的系统步骤,并执行 for step in FIXED_STEPS_BEFORE_START: - self.step_queue.append(StepQueueItem( - step_id=uuid.uuid4(), - step=step, - enable_filling=False, - to_user=False, - )) + self.step_queue.append( + StepQueueItem( + step_id=uuid.uuid4(), + step=step.get(self.task.language, step[LanguageType.CHINESE]), + enable_filling=False, + to_user=False, + ), + ) await self._step_process() # 插入首个步骤 @@ -194,23 +216,29 @@ class FlowExecutor(BaseExecutor): if self.state.stepStatus == StepStatus.ERROR: logger.warning("[FlowExecutor] Executor出错,执行错误处理步骤") self.step_queue.clear() - self.step_queue.appendleft(StepQueueItem( - step_id=str(uuid.uuid4()), - step=Step( - name="错误处理", - description="错误处理", - node=SpecialCallType.LLM.value, - type=SpecialCallType.LLM.value, - params={ - "user_prompt": LLM_ERROR_PROMPT.replace( - "{{ error_info }}", - self.task.state.error_info["err_msg"], + self.step_queue.appendleft( + StepQueueItem( + step_id=str(uuid.uuid4()), + step=Step( + name=( + "错误处理" if self.task.language == LanguageType.CHINESE else "Error Handling" + ), + description=( + "错误处理" if self.task.language == LanguageType.CHINESE else "Error Handling" ), - }, + node=SpecialCallType.LLM.value, + type=SpecialCallType.LLM.value, + params={ + "user_prompt": LLM_ERROR_PROMPT[self.task.language].replace( + "{{ error_info }}", + self.task.state.error_info["err_msg"], # type: ignore[arg-type] + ), + }, + ), + enable_filling=False, + to_user=False, ), - enable_filling=False, - to_user=False, - )) + ) is_error = True # 错误处理后结束 self._reached_end = True @@ -234,10 +262,12 @@ class FlowExecutor(BaseExecutor): # 尾插运行结束后的系统步骤 for step in FIXED_STEPS_AFTER_END: - self.step_queue.append(StepQueueItem( - step_id=uuid.uuid4(), - step=step, - )) + self.step_queue.append( + StepQueueItem( + step_id=uuid.uuid4(), + step=step.get(self.task.language, step[LanguageType.CHINESE]), + ), + ) await self._step_process() # FlowStop需要返回总时间,需要倒推最初的开始时间(当前时间减去当前已用总时间) diff --git a/apps/scheduler/pool/loader/flow.py b/apps/scheduler/pool/loader/flow.py index dc83faf1cbb6a645c5af733c40c47e8e5020de3d..e1d912f4b31177ad92c8fc6885fd294bcdfa6946 100644 --- a/apps/scheduler/pool/loader/flow.py +++ b/apps/scheduler/pool/loader/flow.py @@ -18,7 +18,7 @@ from apps.models.app import App, AppHashes from apps.models.flow import Flow as FlowInfo from apps.models.vectors import FlowPoolVector from apps.scheduler.util import yaml_enum_presenter, yaml_str_presenter -from apps.schemas.enum_var import EdgeType +from apps.schemas.enum_var import EdgeType, NodeType from apps.schemas.flow import Flow from apps.services.node import NodeManager @@ -86,26 +86,18 @@ class FlowLoader: err = f"[FlowLoader] 步骤名称不能以下划线开头:{key}" logger.error(err) raise ValueError(err) - if key == "start": - step["name"] = "开始" - step["description"] = "开始节点" - step["type"] = "start" - elif key == "end": - step["name"] = "结束" - step["description"] = "结束节点" - step["type"] = "end" - else: - node_info = await NodeManager.get_node(step["node"]) - try: - step["type"] = node_info.callId - except ValueError as e: - logger.warning("[FlowLoader] 获取节点call_id失败:%s,错误信息:%s", node_info.id, e) - step["type"] = "Empty" - step["name"] = ( - node_info.name - if "name" not in step or step["name"] == "" - else step["name"] - ) + if step["type"]==NodeType.START.value or step["type"]==NodeType.END.value: + continue + try: + step["type"] = await NodeManager.get_node_call_id(step["node"]) + except ValueError as e: + logger.warning("[FlowLoader] 获取节点call_id失败:%s,错误信息:%s", step["node"], e) + step["type"] = "Empty" + step["name"] = ( + (await NodeManager.get_node_name(step["node"])) + if "name" not in step or step["name"] == "" + else step["name"] + ) return flow_yaml diff --git a/apps/scheduler/pool/loader/mcp.py b/apps/scheduler/pool/loader/mcp.py index 7a2aa374502051248c8db7f501e63b5e9e1cc81c..b203c54545f9210e73431da885853e5ab5b27841 100644 --- a/apps/scheduler/pool/loader/mcp.py +++ b/apps/scheduler/pool/loader/mcp.py @@ -8,7 +8,7 @@ from typing import Any import asyncer from anyio import Path -from sqlalchemy import and_, delete, select +from sqlalchemy import and_, delete, select, update from apps.common.postgres import postgres from apps.common.process_handler import ProcessHandler @@ -92,29 +92,28 @@ class MCPLoader(metaclass=SingletonMeta): elif isinstance(mcp_config, MCPServerStdioConfig): print(f"[Installer] Stdio方式的MCP模板,开始自动安装: {mcp_id}") # noqa: T201 if "uv" in mcp_config.command: - new_config = await install_uvx(mcp_id, mcp_config) + mcp_config = await install_uvx(mcp_id, mcp_config) elif "npx" in mcp_config.command: - new_config = await install_npx(mcp_id, mcp_config) + mcp_config = await install_npx(mcp_id, mcp_config) - if new_config is None: + if mcp_config is None: logger.error("[MCPLoader] MCP模板安装失败: %s", mcp_id) await MCPLoader.update_template_status(mcp_id, MCPInstallStatus.FAILED) return - mcp_config = new_config - + item.mcpServers[mcp_id] = mcp_config # 重新保存config template_config = MCP_PATH / "template" / mcp_id / "config.json" f = await template_config.open("w+", encoding="utf-8") - config_data = config.model_dump(by_alias=True, exclude_none=True) + config_data = item.model_dump(by_alias=True, exclude_none=True) await f.write(json.dumps(config_data, indent=4, ensure_ascii=False)) await f.aclose() else: logger.info("[Installer] SSE/StreamableHTTP方式的MCP模板,无需安装: %s", mcp_id) - mcp_config.autoInstall = False + item.mcpServers[mcp_id].autoInstall = False - await MCPLoader._insert_template_tool(mcp_id, mcp_config) + await MCPLoader._insert_template_tool(mcp_id, item) await MCPLoader.update_template_status(mcp_id, MCPInstallStatus.READY) logger.info("[Installer] MCP模板安装成功: %s", mcp_id) except Exception: @@ -126,20 +125,21 @@ class MCPLoader(metaclass=SingletonMeta): @staticmethod async def clear_ready_or_failed_mcp_installation() -> None: """清除状态为ready或failed的MCP安装任务""" - mcp_collection = MongoDB().get_collection("mcp") mcp_ids = ProcessHandler.get_all_task_ids() - # 检索_id在mcp_ids且状态为ready或者failed的MCP的内容 - db_service_list = await mcp_collection.find( - {"_id": {"$in": mcp_ids}, "status": {"$in": [MCPInstallStatus.READY, MCPInstallStatus.FAILED]}}, - ).to_list(None) - for db_service in db_service_list: - try: - item = MCPCollection.model_validate(db_service) - except Exception as e: - logger.error("[MCPLoader] MCP模板数据验证失败: %s, 错误: %s", db_service["_id"], e) - continue - ProcessHandler.remove_task(item.id) - logger.info("[MCPLoader] 删除已完成或失败的MCP安装进程: %s", item.id) + async with postgres.session() as session: + # 检索_id在mcp_ids且状态为ready或者failed的MCP的内容 + result = await session.scalars( + select(MCPInfo).where( + and_( + MCPInfo.id.in_(mcp_ids), + MCPInfo.status.in_([MCPInstallStatus.READY, MCPInstallStatus.FAILED]), + ), + ), + ) + db_service_list = list(result.all()) + for db_service in db_service_list: + ProcessHandler.remove_task(db_service.id) + logger.info("[MCPLoader] 删除已完成或失败的MCP安装进程: %s", db_service.id) @staticmethod @@ -179,8 +179,7 @@ class MCPLoader(metaclass=SingletonMeta): """取消正在安装的MCP模板任务""" template_path = MCP_PATH / "template" logger.info("[MCPLoader] 初始化所有MCP模板: %s", template_path) - mongo = MongoDB() - mcp_collection = mongo.get_collection("mcp") + # 遍历所有模板 mcp_ids = [] async for mcp_dir in template_path.iterdir(): @@ -188,13 +187,18 @@ class MCPLoader(metaclass=SingletonMeta): if not await mcp_dir.is_dir(): logger.warning("[MCPLoader] 跳过非目录: %s", mcp_dir.as_posix()) continue - mcp_ids.append(mcp_dir.name) - # 更新数据库状态 - await mcp_collection.update_many( - {"_id": {"$in": mcp_ids}, "status": MCPInstallStatus.INSTALLING}, - {"$set": {"status": MCPInstallStatus.CANCELLED}}, - ) + + async with postgres.session() as session: + await session.execute( + update(MCPInfo).where( + and_( + MCPInfo.status == MCPInstallStatus.INSTALLING, + MCPInfo.id.in_(mcp_ids), + ), + ).values(status=MCPInstallStatus.CANCELLED), + ) + await session.commit() @staticmethod @@ -408,22 +412,25 @@ class MCPLoader(metaclass=SingletonMeta): symlinks=True, ) + mcpsvc = mcp_config.mcpServers[mcp_id] if mcp_env is not None: mcp_config.mcpServers[mcp_id].env.update(mcp_env) - if mcp_config.mcpType == MCPType.STDIO: + if mcp_config.mcpType == MCPType.STDIO and isinstance(mcpsvc, MCPServerStdioConfig): index = None - for i in range(len(mcp_config.config.args)): - if mcp_config.config.args[i] == "--directory": + for i in range(len(mcpsvc.args)): + if mcpsvc.args[i] == "--directory": index = i + 1 break if index is not None: - if index < len(mcp_config.config.args): - mcp_config.config.args[index] = str(user_path)+'/project' + if index < len(mcpsvc.args): + mcpsvc.args[index] = str(user_path) + "/project" else: - mcp_config.config.args.append(str(user_path)+'/project') + mcpsvc.args.append(str(user_path) + "/project") else: - mcp_config.config.args = ["--directory", str(user_path)+'/project'] + mcp_config.config.args + mcpsvc.args = ["--directory", str(user_path) + "/project", *mcpsvc.args] + user_config_path = user_path / "config.json" + mcp_config.mcpServers[mcp_id] = mcpsvc # 更新用户配置 f = await user_config_path.open("w", encoding="utf-8", errors="ignore") await f.write( @@ -442,7 +449,6 @@ class MCPLoader(metaclass=SingletonMeta): )) await session.commit() - @staticmethod async def user_deactive_template(user_sub: str, mcp_id: str) -> None: """ diff --git a/apps/scheduler/pool/loader/metadata.py b/apps/scheduler/pool/loader/metadata.py index 809bac6dbfcca5b4a51955f13d671a593253f44e..fb73ea299f84e8a164de08501648c8b5b0ff40c0 100644 --- a/apps/scheduler/pool/loader/metadata.py +++ b/apps/scheduler/pool/loader/metadata.py @@ -63,8 +63,7 @@ class MetadataLoader: raise RuntimeError(err) from e elif metadata_type == MetadataType.SERVICE.value: try: - service_id = uuid.UUID(file_path.parent.name) - metadata = ServiceMetadata(id=service_id, **metadata_dict) + metadata = ServiceMetadata(id=file_path.parent.name, **metadata_dict) except Exception as e: err = "[MetadataLoader] Service metadata.yaml格式错误" logger.exception(err) diff --git a/apps/schemas/enum_var.py b/apps/schemas/enum_var.py index 4477bd59050c831fb1b685e40331bd6040938cfd..c69ced6f6dcc02d130ab43067485b73480d0f413 100644 --- a/apps/schemas/enum_var.py +++ b/apps/schemas/enum_var.py @@ -197,8 +197,8 @@ class AgentState(str, Enum): ERROR = "ERROR" -class Language(str, Enum): +class LanguageType(str, Enum): """语言""" - ZH = "zh" - EN = "en" + CHINESE = "zh" + ENGLISH = "en" diff --git a/apps/schemas/request_data.py b/apps/schemas/request_data.py index d30c57da6a7b2905f83ca6a713be17017c4d910e..4aa1773163b08260798b9d637f2d778b172e15e2 100644 --- a/apps/schemas/request_data.py +++ b/apps/schemas/request_data.py @@ -7,7 +7,7 @@ from typing import Any from pydantic import BaseModel, Field from .appcenter import AppData -from .enum_var import CommentType +from .enum_var import CommentType, LanguageType from .flow_topology import FlowItem from .mcp import MCPType from .message import FlowParams @@ -28,7 +28,7 @@ class RequestData(BaseModel): conversation_id: uuid.UUID = Field( default=uuid.UUID("00000000-0000-0000-0000-000000000000"), alias="conversationId", description="聊天ID", ) - language: str = Field(default="zh", description="语言") + language: LanguageType = Field(default=LanguageType.CHINESE, description="语言") files: list[str] = Field(default=[], description="文件列表") app: RequestDataApp | None = Field(default=None, description="应用") debug: bool = Field(default=False, description="是否调试") diff --git a/apps/schemas/scheduler.py b/apps/schemas/scheduler.py index 476ae0c61f32ff29b6ad20196df3d047b9109072..c4a10ad6ab6b0ba56fcc4099718accc32e37fbe1 100644 --- a/apps/schemas/scheduler.py +++ b/apps/schemas/scheduler.py @@ -6,7 +6,7 @@ from typing import Any from pydantic import BaseModel, Field -from .enum_var import CallOutputType +from .enum_var import CallOutputType, LanguageType from .task import FlowStepHistory @@ -35,6 +35,7 @@ class CallVars(BaseModel): history: dict[str, FlowStepHistory] = Field(description="Executor中历史工具的结构化数据", default={}) history_order: list[str] = Field(description="Executor中历史工具的顺序", default=[]) ids: CallIds = Field(description="Call的ID") + language: LanguageType = Field(description="语言", default=LanguageType.CHINESE) class CallTokens(BaseModel): diff --git a/apps/services/service.py b/apps/services/service.py index e4ded160533310e128e4b1ece35780e24e4713e9..34fa0fa453ad0cb0464b4ff6e55f339fbaf9a7a7 100644 --- a/apps/services/service.py +++ b/apps/services/service.py @@ -396,7 +396,7 @@ class ServiceCenterManager: @staticmethod async def get_service_metadata( user_sub: str, - service_id: uuid.UUID, + service_id: str, ) -> ServiceMetadata: """获取服务元数据""" async with postgres.session() as session: