diff --git a/apps/common/postgres.py b/apps/common/postgres.py
index 74b0dcfb2d21277920574b77e0cb40bcb1273482..6631714ddae10b04dc08ce8cf9231f695b0dc1fc 100644
--- a/apps/common/postgres.py
+++ b/apps/common/postgres.py
@@ -7,9 +7,7 @@ from contextlib import asynccontextmanager
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine
-from apps.models import (
- Base,
-)
+from apps.models import Base
from .config import config
diff --git a/apps/common/queue.py b/apps/common/queue.py
index a916c4607fb67646f2ad78c377d1d515058bc4b8..c0981527276e7e5613294e38ed0cdf1b4a2c6490 100644
--- a/apps/common/queue.py
+++ b/apps/common/queue.py
@@ -56,22 +56,21 @@ class MessageQueue:
# 如果使用了Flow
flow = MessageFlow(
appId=task.state.appId,
- flowId=task.state.executorId,
- flowName=task.state.executorName,
- flowStatus=task.state.executorStatus,
+ executorId=task.state.executorId,
+ executorName=task.state.executorName,
+ executorStatus=task.state.executorStatus,
stepId=task.state.stepId,
stepName=task.state.stepName,
- stepDescription=task.state.stepDescription,
stepStatus=task.state.stepStatus,
+ stepType=task.state.stepType,
)
else:
flow = None
message = MessageBase(
event=event_type,
- id=task.metadata.record_id,
+ id=task.metadata.id,
conversationId=task.metadata.conversationId,
- taskId=task.metadata.id,
metadata=metadata,
flow=flow,
content=data,
diff --git a/apps/llm/embedding.py b/apps/llm/embedding.py
index 679683f7e2a69644be2b33d5a7f010f6a50cefad..61162a0b7a3ff9780fb09eda51f7a8a7f82f06c3 100644
--- a/apps/llm/embedding.py
+++ b/apps/llm/embedding.py
@@ -103,7 +103,7 @@ class Embedding:
embedding = await self.get_embedding(["测试文本"])
return len(embedding[0])
- async def _delete_vector(self) -> None:
+ async def delete_vector(self) -> None:
"""删除所有Vector表"""
async with postgres.session() as session:
await session.execute(text("DROP TABLE IF EXISTS framework_flow_vector"))
@@ -148,7 +148,6 @@ class Embedding:
async def init(self) -> None:
"""在使用Embedding前初始化数据库表等资源"""
- await self._delete_vector()
# 检测维度
dim = await self._get_embedding_dimension()
await self._create_vector_table(dim)
diff --git a/apps/llm/function.py b/apps/llm/function.py
index 6725c46cfb0d9311c480ca90f63be7f9a2fb077e..63e9ec1db59b872235e7d68b3ddf80a5f01da59e 100644
--- a/apps/llm/function.py
+++ b/apps/llm/function.py
@@ -120,6 +120,7 @@ class JsonGenerator:
autoescape=False,
trim_blocks=True,
lstrip_blocks=True,
+ extensions=["jinja2.ext.loopcontrols"],
)
self._err_info = ""
diff --git a/apps/llm/providers/ollama.py b/apps/llm/providers/ollama.py
index 7d0578a7a654079bb5efab204cdf5baf5ce3736a..e4935561a6ecb2710191b15aa5b7b6bcd1ceb40d 100644
--- a/apps/llm/providers/ollama.py
+++ b/apps/llm/providers/ollama.py
@@ -7,7 +7,7 @@ from typing import Any
from ollama import AsyncClient, ChatResponse
from typing_extensions import override
-from apps.llm import TokenCalculator
+from apps.llm.token import TokenCalculator
from apps.models import LLMType
from apps.schemas.llm import LLMChunk, LLMFunctions
diff --git a/apps/llm/providers/openai.py b/apps/llm/providers/openai.py
index d13e1d0b8dcc32cbe7f38ee6dfe15027ebcfb9bc..8a6fdcc322fce60f5a02911d23a3a663d984b104 100644
--- a/apps/llm/providers/openai.py
+++ b/apps/llm/providers/openai.py
@@ -15,7 +15,7 @@ from openai.types.chat import (
)
from typing_extensions import override
-from apps.llm import TokenCalculator
+from apps.llm.token import TokenCalculator
from apps.models import LLMType
from apps.schemas.llm import LLMChunk, LLMFunctions
diff --git a/apps/models/__init__.py b/apps/models/__init__.py
index b67b00ee7887eb97642be0dc302e974452025525..899b564bea856b6c0a83c18452c59f031abfb9ea 100644
--- a/apps/models/__init__.py
+++ b/apps/models/__init__.py
@@ -14,7 +14,16 @@ from .record import FootNoteType, Record, RecordFootNote, RecordMetadata
from .service import Service, ServiceACL, ServiceHashes
from .session import Session, SessionActivity, SessionType
from .tag import Tag
-from .task import ExecutorCheckpoint, ExecutorHistory, ExecutorStatus, LanguageType, StepStatus, Task, TaskRuntime
+from .task import (
+ ExecutorCheckpoint,
+ ExecutorHistory,
+ ExecutorStatus,
+ LanguageType,
+ StepStatus,
+ StepType,
+ Task,
+ TaskRuntime,
+)
from .user import User, UserAppUsage, UserFavorite, UserFavoriteType, UserTag
__all__ = [
@@ -57,6 +66,7 @@ __all__ = [
"SessionActivity",
"SessionType",
"StepStatus",
+ "StepType",
"Tag",
"Task",
"TaskRuntime",
diff --git a/apps/models/base.py b/apps/models/base.py
index 5f4424f2f4256b18a62398f8a7d3fc3667e17f15..432e564754cf06c635b31bb48d64e50907f1843e 100644
--- a/apps/models/base.py
+++ b/apps/models/base.py
@@ -1,8 +1,13 @@
"""SQLAlchemy模型基类"""
+from typing import Any, ClassVar
+
from sqlalchemy.orm import DeclarativeBase, MappedAsDataclass
class Base(MappedAsDataclass, DeclarativeBase):
"""SQLAlchemy模型基类"""
+ # 生成文档时需要启动这个参数,否则会触发重复导入告警
+ __table_args__: ClassVar[dict[str, Any]] = {"extend_existing": True}
+
diff --git a/apps/models/llm.py b/apps/models/llm.py
index 6249f5b377f334df01eab92224ab30a02ab1718e..422721b5d6143f725096d6535c58c43a011698e7 100644
--- a/apps/models/llm.py
+++ b/apps/models/llm.py
@@ -55,12 +55,12 @@ class LLMData(Base):
"""LLM最大Token数量"""
temperature: Mapped[float] = mapped_column(Float, default=0.7, nullable=False)
"""LLM温度"""
- llmType: Mapped[list[LLMType]] = mapped_column(ARRAY(Enum(LLMType)), default=[], nullable=False) # noqa: N815
+ llmType: Mapped[list[LLMType]] = mapped_column(ARRAY(Enum(LLMType)), default_factory=list, nullable=False) # noqa: N815
"""LLM类型"""
provider: Mapped[LLMProvider] = mapped_column(
Enum(LLMProvider), default=None, nullable=True,
)
- extraConfig: Mapped[dict[str, Any]] = mapped_column(JSONB, default={}, nullable=False) # noqa: N815
+ extraConfig: Mapped[dict[str, Any]] = mapped_column(JSONB, default_factory=dict, nullable=False) # noqa: N815
"""大模型API类型"""
createdAt: Mapped[DateTime] = mapped_column( # noqa: N815
DateTime,
diff --git a/apps/models/record.py b/apps/models/record.py
index 7b3bec8ddd010b0d3c94ac3cdd0d4d4a9b3c0e22..2bc80d8442430c178c4568ef96149f33d8b7321e 100644
--- a/apps/models/record.py
+++ b/apps/models/record.py
@@ -54,7 +54,7 @@ class RecordMetadata(Base):
"""问答对输入token数"""
outputTokens: Mapped[int] = mapped_column(Integer, default=0, nullable=False) # noqa: N815
"""问答对输出token数"""
- featureSwitch: Mapped[dict[str, Any]] = mapped_column(JSONB, default={}, nullable=False) # noqa: N815
+ featureSwitch: Mapped[dict[str, Any]] = mapped_column(JSONB, default_factory=dict, nullable=False) # noqa: N815
"""问答对功能开关"""
diff --git a/apps/models/task.py b/apps/models/task.py
index 5692fff86f91a211f83df4afc9f59b4a24cbc29a..61f12b78f24f54440ace6606102c46e662aa42fc 100644
--- a/apps/models/task.py
+++ b/apps/models/task.py
@@ -44,14 +44,32 @@ class StepStatus(str, PyEnum):
CANCELLED = "cancelled"
+class StepType(str, PyEnum):
+ """步骤类型"""
+
+ API = "api"
+ CHOICE = "choice"
+ CMD = "cmd"
+ CONVERT = "convert"
+ FACTS = "facts"
+ GRAPH = "graph"
+ LLM = "llm"
+ MCP = "mcp"
+ RAG = "rag"
+ SLOT = "slot"
+ SUMMARY = "summary"
+ SQL = "sql"
+ SUGGEST = "suggest"
+
+
class Task(Base):
"""任务"""
__tablename__ = "framework_task"
userSub: Mapped[str] = mapped_column(String(255), ForeignKey("framework_user.sub")) # noqa: N815
"""用户ID"""
- conversationId: Mapped[uuid.UUID] = mapped_column( # noqa: N815
- UUID(as_uuid=True), ForeignKey("framework_conversation.id"), nullable=False,
+ conversationId: Mapped[uuid.UUID | None] = mapped_column( # noqa: N815
+ UUID(as_uuid=True), ForeignKey("framework_conversation.id"), nullable=True,
)
"""对话ID"""
checkpointId: Mapped[uuid.UUID | None] = mapped_column( # noqa: N815
@@ -89,13 +107,13 @@ class TaskRuntime(Base):
"""用户输入"""
fullAnswer: Mapped[str] = mapped_column(Text, nullable=False, default="") # noqa: N815
"""完整输出"""
- fact: Mapped[list[str]] = mapped_column(JSONB, nullable=False, default=[])
+ fact: Mapped[list[str]] = mapped_column(JSONB, nullable=False, default_factory=list)
"""记忆"""
reasoning: Mapped[str] = mapped_column(Text, nullable=False, default="")
"""中间推理"""
- filledSlot: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False, default={}) # noqa: N815
+ filledSlot: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False, default_factory=dict) # noqa: N815
"""计划"""
- document: Mapped[list[dict[uuid.UUID, Any]]] = mapped_column(JSONB, nullable=False, default={})
+ document: Mapped[list[dict[uuid.UUID, Any]]] = mapped_column(JSONB, nullable=False, default_factory=dict)
"""关联文档"""
language: Mapped[LanguageType] = mapped_column(Enum(LanguageType), nullable=False, default=LanguageType.CHINESE)
"""语言"""
@@ -124,15 +142,15 @@ class ExecutorCheckpoint(Base):
"""步骤名称"""
stepStatus: Mapped[StepStatus] = mapped_column(Enum(StepStatus), nullable=False) # noqa: N815
"""步骤状态"""
+ stepType: Mapped[StepType] = mapped_column(Enum(StepType), nullable=False) # noqa: N815
+ """步骤类型"""
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default_factory=uuid.uuid4)
"""检查点ID"""
executorDescription: Mapped[str] = mapped_column(Text, nullable=False, default="") # noqa: N815
"""执行器描述"""
- stepDescription: Mapped[str] = mapped_column(Text, nullable=False, default="") # noqa: N815
- """步骤描述"""
- data: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False, default={})
+ data: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False, default_factory=dict)
"""步骤额外数据"""
- errorMessage: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False, default={}) # noqa: N815
+ errorMessage: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False, default_factory=dict) # noqa: N815
"""错误信息"""
@@ -153,17 +171,17 @@ class ExecutorHistory(Base):
"""步骤ID"""
stepName: Mapped[str] = mapped_column(String(255), nullable=False) # noqa: N815
"""步骤名称"""
+ stepType: Mapped[StepType] = mapped_column(Enum(StepType), nullable=False) # noqa: N815
+ """步骤类型"""
stepStatus: Mapped[StepStatus] = mapped_column(Enum(StepStatus), nullable=False) # noqa: N815
"""步骤状态"""
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default_factory=uuid.uuid4)
"""执行器历史ID"""
- stepDescription: Mapped[str] = mapped_column(Text, nullable=False, default="") # noqa: N815
- """步骤描述"""
- inputData: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False, default={}) # noqa: N815
+ inputData: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False, default_factory=dict) # noqa: N815
"""步骤输入数据"""
- outputData: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False, default={}) # noqa: N815
+ outputData: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False, default_factory=dict) # noqa: N815
"""步骤输出数据"""
- extraData: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False, default={}) # noqa: N815
+ extraData: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False, default_factory=dict) # noqa: N815
"""步骤额外数据"""
updatedAt: Mapped[datetime] = mapped_column( # noqa: N815
DateTime(timezone=True), nullable=False, default_factory=lambda: datetime.now(UTC),
diff --git a/apps/models/user.py b/apps/models/user.py
index b32a3cd57d2c44b2fb0d4f9dfc4454274ef82ad0..278456a3c43af96ca98f72af48b1f824ac9e4477 100644
--- a/apps/models/user.py
+++ b/apps/models/user.py
@@ -5,7 +5,7 @@ import uuid
from datetime import UTC, datetime
from hashlib import sha256
-from sqlalchemy import ARRAY, BigInteger, Boolean, DateTime, Enum, ForeignKey, Integer, String
+from sqlalchemy import BigInteger, Boolean, DateTime, Enum, ForeignKey, Integer, String
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import Mapped, mapped_column
@@ -34,8 +34,6 @@ class User(Base):
String(100), default_factory=lambda: sha256(str(uuid.uuid4()).encode()).hexdigest()[:16], nullable=False,
)
"""用户个人令牌"""
- selectedKB: Mapped[list[uuid.UUID]] = mapped_column(ARRAY(UUID), default=[], nullable=False) # noqa: N815
- """用户选择的知识库的ID"""
functionLLM: Mapped[str | None] = mapped_column(String(255), default=None, nullable=True) # noqa: N815
"""用户选择的函数模型ID"""
embeddingLLM: Mapped[str | None] = mapped_column(String(255), default=None, nullable=True) # noqa: N815
diff --git a/apps/routers/chat.py b/apps/routers/chat.py
index 6bfb853df9d8a190fe2f391e72ee4412ac07101c..32e8c81c7df635579bee9ccb72d0f218f256fd4e 100644
--- a/apps/routers/chat.py
+++ b/apps/routers/chat.py
@@ -3,7 +3,6 @@
import asyncio
import logging
-import uuid
from collections.abc import AsyncGenerator
from fastapi import APIRouter, Depends, HTTPException, Request, status
@@ -15,16 +14,12 @@ from apps.dependency import verify_personal_token, verify_session
from apps.models import ExecutorStatus
from apps.scheduler.scheduler import Scheduler
from apps.scheduler.scheduler.context import save_data
-from apps.schemas.request_data import RequestData, RequestDataApp
+from apps.schemas.request_data import RequestData
from apps.schemas.response_data import ResponseData
-from apps.schemas.task import TaskData
from apps.services import (
Activity,
- ConversationManager,
FlowManager,
QuestionBlacklistManager,
- RecordManager,
- TaskManager,
UserBlacklistManager,
)
@@ -40,34 +35,6 @@ router = APIRouter(
)
-async def init_task(post_body: RequestData, user_sub: str, session_id: str) -> TaskData:
- """初始化Task"""
- # 更改信息并刷新数据库
- if post_body.task_id is None:
- conversation = await ConversationManager.get_conversation_by_conversation_id(
- user_sub=user_sub,
- conversation_id=post_body.conversation_id,
- )
- if not conversation:
- err = "[Chat] 用户没有权限访问该对话!"
- raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=err)
- task_ids = await TaskManager.delete_tasks_by_conversation_id(post_body.conversation_id)
- await RecordManager.update_record_flow_status_to_cancelled_by_task_ids(task_ids)
- task = await TaskManager.init_new_task(user_sub=user_sub, session_id=session_id, post_body=post_body)
- task.runtime.question = post_body.question
- task.state.app_id = post_body.app.app_id if post_body.app else ""
- else:
- if not post_body.task_id:
- err = "[Chat] task_id 不可为空!"
- raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="task_id cannot be empty")
- task = await TaskManager.get_task_data_by_task_id(post_body.task_id)
- post_body.app = RequestDataApp(appId=task.state.app_id)
- post_body.conversation_id = task.ids.conversation_id
- post_body.language = task.language
- post_body.question = task.runtime.question
- return task
-
-
async def chat_generator(post_body: RequestData, user_sub: str, session_id: str) -> AsyncGenerator[str, None]:
"""进行实际问答,并从MQ中获取消息"""
try:
@@ -103,14 +70,14 @@ async def chat_generator(post_body: RequestData, user_sub: str, session_id: str)
# 获取最终答案
task = scheduler.task
- if task.state.flow_status == ExecutorStatus.ERROR:
+ if task.state.executorStatus == ExecutorStatus.ERROR:
_logger.error("[Chat] 生成答案失败")
yield "data: [ERROR]\n\n"
await Activity.remove_active(user_sub)
return
# 对结果进行敏感词检查
- if await WordsCheck().check(task.runtime.answer) != 1:
+ if await WordsCheck().check(task.runtime.fullAnswer) != 1:
yield "data: [SENSITIVE]\n\n"
_logger.info("[Chat] 答案包含敏感词!")
await Activity.remove_active(user_sub)
diff --git a/apps/scheduler/call/core.py b/apps/scheduler/call/core.py
index c927ae6c65114d648f993a7115c7aa7512c13457..cf11620365054de86e1b4ad41ebe533f756ec24d 100644
--- a/apps/scheduler/call/core.py
+++ b/apps/scheduler/call/core.py
@@ -104,6 +104,7 @@ class CoreCall(BaseModel):
session_id=executor.task.runtime.sessionId,
user_sub=executor.task.metadata.userSub,
app_id=executor.task.state.appId,
+ conversation_id=executor.task.metadata.conversationId,
),
question=executor.question,
step_data=history,
diff --git a/apps/scheduler/call/facts/facts.py b/apps/scheduler/call/facts/facts.py
index 70bf8e6313383c92ee379d73dc073164dd33653a..a13bee732a011441e6a11b97f3098d569a0df222 100644
--- a/apps/scheduler/call/facts/facts.py
+++ b/apps/scheduler/call/facts/facts.py
@@ -83,6 +83,7 @@ class FactsCall(CoreCall, input_model=FactsInput, output_model=FactsOutput):
autoescape=False,
trim_blocks=True,
lstrip_blocks=True,
+ extensions=["jinja2.ext.loopcontrols"],
)
# 提取事实信息
diff --git a/apps/scheduler/call/llm/llm.py b/apps/scheduler/call/llm/llm.py
index 05c6e3da09bf87eb8c2d6bb2eedc8a3ea9a5615b..8140fb04ae973b2ddb174bb279f94f613e40d3e7 100644
--- a/apps/scheduler/call/llm/llm.py
+++ b/apps/scheduler/call/llm/llm.py
@@ -64,6 +64,7 @@ class LLM(CoreCall, input_model=LLMInput, output_model=LLMOutput):
autoescape=False,
trim_blocks=True,
lstrip_blocks=True,
+ extensions=["jinja2.ext.loopcontrols"],
)
# 上下文信息
diff --git a/apps/scheduler/call/rag/rag.py b/apps/scheduler/call/rag/rag.py
index 918e2565d0773337ff0902f368e5992c7489472e..c1e6e5b4ee0cefb097b34fa6fa90a2b70de5e5d7 100644
--- a/apps/scheduler/call/rag/rag.py
+++ b/apps/scheduler/call/rag/rag.py
@@ -23,6 +23,7 @@ from apps.schemas.scheduler import (
CallOutputChunk,
CallVars,
)
+from apps.services import DocumentManager
from .prompt import QUESTION_REWRITE
from .schema import (
@@ -118,29 +119,37 @@ class RAG(CoreCall, input_model=RAGInput, output_model=RAGOutput):
return doc_chunk_list
- #TODO:修改为正确的获取临时文档的逻辑
- async def _get_doc_info(self, doc_ids: list[str], data: RAGInput) -> AsyncGenerator[CallOutputChunk, None]:
- """获取文档信息"""
- doc_chunk_list = []
- # 处理指定文档ID的情况
+ async def _get_temp_docs(self, conversation_id: uuid.UUID) -> list[str]:
+ """获取当前会话的临时文档"""
+ doc_ids = []
+ # 从Conversation中获取刚上传的文档
+ docs = await DocumentManager.get_unused_docs(conversation_id)
+ # 从最近10条Record中获取文档
+ docs += await DocumentManager.get_used_docs(conversation_id, 10, "question")
+ doc_ids += [doc.id for doc in docs]
+ return doc_ids
+
+
+ async def _get_doc_info(self, doc_ids: list[str], data: RAGInput) -> list[DocItem]:
+ """获取文档信息,支持临时文档和知识库文档"""
+ doc_chunk_list: list[DocItem] = []
+
+ # 处理临时文档
if doc_ids:
tmp_data = deepcopy(data)
- tmp_data.kbIds = [ uuid.UUID("00000000-0000-0000-0000-000000000000") ]
+ tmp_data.kbIds = [uuid.UUID("00000000-0000-0000-0000-000000000000")]
+ tmp_data.docIds = doc_ids
doc_chunk_list.extend(await self._fetch_doc_chunks(tmp_data))
# 处理知识库ID的情况
if data.kbIds:
- doc_chunk_list.extend(await self._fetch_doc_chunks(data))
+ kb_data = deepcopy(data)
+ # 知识库查询时不使用docIds,只使用kbIds
+ kb_data.docIds = None
+ doc_chunk_list.extend(await self._fetch_doc_chunks(kb_data))
- # 将文档分片转换为文本片段并返回
- for doc_chunk in doc_chunk_list:
- for chunk in doc_chunk.chunks:
- text = chunk.text.replace("\n", "")
- yield CallOutputChunk(
- type=CallOutputType.DATA,
- content=text,
- )
+ return doc_chunk_list
async def _exec(self, input_data: dict[str, Any]) -> AsyncGenerator[CallOutputChunk, None]:
@@ -155,8 +164,6 @@ class RAG(CoreCall, input_model=RAGInput, output_model=RAGOutput):
lstrip_blocks=True,
)
tmpl = env.from_string(QUESTION_REWRITE[self._sys_vars.language])
-
- # 仅使用当前问题,不使用历史问答数据
prompt = tmpl.render(question=data.query)
# 使用_json方法直接获取JSON结果
@@ -169,18 +176,22 @@ class RAG(CoreCall, input_model=RAGInput, output_model=RAGOutput):
except Exception:
_logger.exception("[RAG] 问题重写失败,使用原始问题")
- # 获取文档片段
- doc_chunk_list = await self._fetch_doc_chunks(data)
+ # 获取临时文档ID列表
+ if self._sys_vars.ids.conversation_id:
+ temp_doc_ids = await self._get_temp_docs(self._sys_vars.ids.conversation_id)
+ else:
+ temp_doc_ids = []
- corpus = []
- for doc_chunk in doc_chunk_list:
- for chunk in doc_chunk.chunks:
- corpus.extend([chunk.text.replace("\n", "")])
+ # 合并传入的doc_ids和临时文档ID
+ all_doc_ids = list(set((data.docIds or []) + temp_doc_ids))
+
+ # 获取文档片段
+ doc_chunk_list = await self._get_doc_info(all_doc_ids, data)
yield CallOutputChunk(
type=CallOutputType.DATA,
content=RAGOutput(
question=data.query,
- corpus=corpus,
+ corpus=doc_chunk_list,
).model_dump(exclude_none=True, by_alias=True),
)
diff --git a/apps/scheduler/call/rag/schema.py b/apps/scheduler/call/rag/schema.py
index 69239acffdfb467497ff3dc319e0d15e97dd7b6d..55894b7f7e0f76f6f4d833ec6ed278f0c6e8a1d3 100644
--- a/apps/scheduler/call/rag/schema.py
+++ b/apps/scheduler/call/rag/schema.py
@@ -53,7 +53,7 @@ class RAGOutput(DataBase):
"""RAG工具的输出"""
question: str = Field(description="用户输入")
- corpus: list[str] = Field(description="知识库的语料列表")
+ corpus: list[DocItem] = Field(description="知识库的语料列表")
class RAGInput(DataBase):
diff --git a/apps/scheduler/executor/agent.py b/apps/scheduler/executor/agent.py
index 6d912e5e366cae305659a485d9a4bf41a9eb89e5..cb37c69e68044ec6c4b4d01e2f8c01f9bdbac767 100644
--- a/apps/scheduler/executor/agent.py
+++ b/apps/scheduler/executor/agent.py
@@ -53,6 +53,8 @@ class MCPAgentExecutor(BaseExecutor):
_logger.error(err)
raise RuntimeError(err)
self._user = user
+ # 获取历史
+ await self._load_history()
async def load_mcp(self) -> None:
"""加载MCP服务器列表"""
@@ -168,7 +170,7 @@ class MCPAgentExecutor(BaseExecutor):
return
try:
- output_data = await mcp_client.call_tool(mcp_tool.name, self._current_input)
+ output_data = await mcp_client.call_tool(mcp_tool.toolName, self._current_input)
except anyio.ClosedResourceError as e:
_logger.exception("[MCPAgentExecutor] MCP客户端连接已关闭: %s", mcp_tool.mcpId)
# 停止当前用户MCP进程
@@ -418,7 +420,7 @@ class MCPAgentExecutor(BaseExecutor):
)
await self.get_next_step()
else:
- mcp_tool = self.tools[self.task.state.toolId]
+ mcp_tool = self.tools[self.task.state.stepName]
is_param_error = await self._planner.is_param_error(
await self._host.assemble_memory(self.task.runtime, self.task.context),
self.task.state.errorMessage,
diff --git a/apps/scheduler/executor/base.py b/apps/scheduler/executor/base.py
index 637e2cc7b46b16b51d82c1717c19c91117bb9f34..a7e4653412c1053368b8eeb5e970c8299b29c22a 100644
--- a/apps/scheduler/executor/base.py
+++ b/apps/scheduler/executor/base.py
@@ -26,7 +26,6 @@ class BaseExecutor(BaseModel, ABC):
msg_queue: "MessageQueue"
llm: "LLMConfig"
- background: "ExecutorBackground"
question: str
model_config = ConfigDict(
@@ -42,6 +41,14 @@ class BaseExecutor(BaseModel, ABC):
async def _load_history(self, n: int = 3) -> None:
"""加载历史记录"""
+ # 不存在conversationId,为首个问题
+ if not self.task.metadata.conversationId:
+ self.background = ExecutorBackground(
+ conversation=[],
+ facts=[],
+ num=n,
+ )
+ return
# 获取最后n+5条Record
records = await RecordManager.query_record_by_conversation_id(
self.task.metadata.userSub, self.task.metadata.conversationId, n + 5,
diff --git a/apps/scheduler/executor/flow.py b/apps/scheduler/executor/flow.py
index b15c6154c7b1f02b5deba513a1e6b95f797fe0ec..955dc65dded27e848433e42148339df0cf06d918 100644
--- a/apps/scheduler/executor/flow.py
+++ b/apps/scheduler/executor/flow.py
@@ -13,6 +13,7 @@ from apps.models import (
ExecutorStatus,
LanguageType,
StepStatus,
+ StepType,
)
from apps.schemas.enum_var import EventType, SpecialCallType
from apps.schemas.flow import Flow, Step
@@ -89,6 +90,7 @@ class FlowExecutor(BaseExecutor):
stepStatus=StepStatus.RUNNING,
stepId=self.flow.basicConfig.startStep,
stepName=self.flow.steps[self.flow.basicConfig.startStep].name,
+ stepType=StepType(self.flow.steps[self.flow.basicConfig.startStep].type),
)
# 是否到达Flow结束终点(变量)
self._reached_end: bool = False
diff --git a/apps/scheduler/executor/prompt.py b/apps/scheduler/executor/prompt.py
index 33c72cade69e5d2f07f48e98101d5f60719425b8..cd50338f45cf32d3842e006e44acf78cd4a89fd7 100644
--- a/apps/scheduler/executor/prompt.py
+++ b/apps/scheduler/executor/prompt.py
@@ -133,39 +133,36 @@ GEN_RAG_ANSWER: dict[LanguageType, str] = {
LanguageType.CHINESE: r"""
你是一个专业的智能助手,擅长基于提供的文档内容回答用户问题。
-
-
- 任务:结合背景信息全面回答用户提问,并为关键信息添加脚注引用。
-
+ 任务:全面结合上下文和文档内容,回答用户提问,并为关键信息添加脚注引用。
- - 背景信息: 标签中提供相关文档内容
+ - 上下文:请参考先前的对话
+ - 文档内容: 标签中提供相关文档内容
- 用户问题: 标签中提供具体问题
- 参考示例: 标签中展示期望的输出格式
1. 格式要求:
- - 纯文本输出,不使用任何格式标记(标题、加粗、列表等)
+ - 输出不要包含任何XML标签
- 脚注格式:[[1]]、[[2]]、[[3]],数字对应文档ID
- 脚注紧跟相关句子的标点符号后
2. 内容要求:
- - 基于文档内容回答,不编造信息
+ - 不编造信息,充分结合上下文和文档内容
- 如问题与文档无关,直接回答而不使用文档
- - 回答应结构清晰:背景→核心→扩展→总结
- - 充分引用文档内容,提供详细信息
+ - 回答应结构清晰:背景→核心→扩展→总结,并且内容全面
3. 引用规范:
- - 文档ID从1开始按顺序递增
- 不得使用示例中的文档序号
- 关键信息必须添加脚注
+ - 标注时选择关联性最强的切块对应的文档
-
-
+
+ -
1
openEuler介绍文档
@@ -174,8 +171,8 @@ GEN_RAG_ANSWER: dict[LanguageType, str] = {
openEuler社区的核心目标是面向服务器、云、边缘计算等场景,为用户提供一个稳定、安全、高效的操作系统平台,并且支持x86、ARM等多种硬件架构。
-
-
+
+ -
2
社区发展报告
@@ -184,8 +181,8 @@ GEN_RAG_ANSWER: dict[LanguageType, str] = {
社区成员通过技术贡献、代码提交、文档编写、测试验证等多种方式,共同推动开源操作系统的发展,并为用户提供技术支持和社区服务。
-
-
+
+
openEuler社区的目标是什么?有哪些特色?
@@ -205,115 +202,156 @@ GEN_RAG_ANSWER: dict[LanguageType, str] = {
形成了良性的开源生态系统。[[2]]
-
- {bac_info}
-
+
+ {% set __ctx_len = ctx_length|default(0) %}
+ {% set max_length = max_length if max_length is not none else (__ctx_len * 0.7)|int %}
+ {% set __total_len = 0 %}
+ {% set __stop = false %}
+ {% for item in documents %}
+ -
+ {{item.doc_id}}
+ {{item.doc_name}}
+ {% for chunk in item.chunks %}
+ {% set __chunk_len = (chunk.text|length) %}
+ {% if (__total_len + __chunk_len) > max_length %}
+ {% set __stop = true %}
+ {% break %}
+ {% endif %}
+ {{chunk.text}}
+ {% set __total_len = __total_len + __chunk_len %}
+ {% endfor %}
+
+ {% if __stop %}
+ {% break %}
+ {% endif %}
+ {% endfor %}
+
- {user_question}
+ {{user_question}}
请基于上述背景信息和用户问题,按照指令要求生成详细、结构清晰的回答:
""",
LanguageType.ENGLISH: r"""
- You are a professional assistant who specializes in answering user questions based on provided document content.
-
-
- Task: Answer user questions comprehensively based on background information and add footnote references to \
-key information.
-
+ You are a professional intelligent assistant, adept at answering user questions
+ based on the provided documents.
+ Task: Combine the context and document content comprehensively to answer the
+ user's question, and add footnote citations for key information.
- - Background information: Provided in tags with relevant document content
- - User question: Provided in tags with specific questions
- - Reference example: Shown in tags with expected output format
+ - Context: Please refer to the prior conversation
+ - Document content: Relevant content is provided within the tag
+ - User question: The specific question is provided within the tag
+ - Reference example: The expected output format is shown within the tag
1. Format requirements:
- - Plain text output, no formatting marks (headings, bold, lists, etc.)
+ - Do not include any XML tags in the output
- Footnote format: [[1]], [[2]], [[3]], with numbers corresponding to document IDs
- - Footnotes placed right after punctuation of related sentences
+ - Place footnotes immediately after the punctuation of the related sentence
2. Content requirements:
- - Answer based on document content, do not fabricate information
- - If the question is unrelated to the document, answer directly without using the document
- - Answers should be well-structured: background → core → expansion → summary
- - Fully cite document content with detailed information
+ - Do not fabricate information; fully integrate context and document content
+ - If the question is unrelated to the documents, answer directly without using the documents
+ - The answer should be clearly structured: background → core → expansion → summary, and be comprehensive
3. Citation specifications:
- - Document IDs start from 1 and increase sequentially
- - Do not use document numbers from examples
- - Key information must have footnotes
+ - Do not use the document numbers from the example
+ - Key information must include footnotes
+ - When annotating, select the document corresponding to the most relevant chunk
-
-
+
+ -
1
- openEuler Introduction Document
+ Introduction to openEuler
- The openEuler community is an open-source operating system community dedicated to promoting the \
-development of Linux operating systems. The community was initiated by Huawei in 2019, aiming to build an open \
-and collaborative operating system ecosystem.
+ The openEuler community is an open-source operating system community dedicated
+ to advancing the Linux operating system. The community was initiated by Huawei
+ in 2019 to build an open and collaborative OS ecosystem.
- The core goal of the openEuler community is to provide users with a stable, secure, and efficient \
-operating system platform for scenarios such as servers, clouds, and edge computing, supporting multiple hardware \
-architectures such as x86 and ARM.
+ The core goal of the openEuler community is to provide a stable, secure, and
+ efficient operating system platform for scenarios such as servers, cloud, and
+ edge computing, and it supports multiple hardware architectures including x86
+ and ARM.
-
-
+
+ -
2
Community Development Report
- Members of the openEuler community come from all over the world, including developers, users, \
-enterprise partners, and academic institutions. As of 2023, the community has over 300 enterprises and \
-organizations participating in contributions.
+ Members of the openEuler community come from around the world, including
+ developers, users, enterprise partners, and academic institutions. As of 2023,
+ over 300 enterprises and organizations have contributed to the community.
- Community members contribute through various ways such as technical contributions, code \
-submissions, documentation writing, and testing verification, jointly promoting the development of \
-open-source operating systems and providing technical support and community services to users.
+ Community members jointly promote the development of open-source operating
+ systems and provide users with technical support and community services through
+ technical contributions, code submissions, documentation writing, and testing.
-
-
+
+
What are the goals and characteristics of the openEuler community?
- Please generate a detailed and well-structured answer based on the background information and user \
-question according to the instructions:
+ Please generate a detailed and well-structured answer based on the above background
+ information and user question:
- The openEuler community is an open-source operating system community initiated by Huawei, dedicated \
-to promoting the development of Linux operating systems since its establishment in 2019. [[1]]
+ The openEuler community, initiated by Huawei in 2019, is an open-source operating
+ system community dedicated to advancing Linux development. [[1]]
- The core goal of the community is to provide users with a stable, secure, and efficient operating \
-system platform for various application scenarios such as servers, clouds, and edge computing. [[1]]
- At the same time, openEuler supports multiple hardware architectures such as x86 and ARM, offering \
-excellent cross-platform compatibility. [[1]]
+ Its core goal is to provide a stable, secure, and efficient operating system platform
+ for servers, cloud, and edge computing. [[1]] It also supports multiple hardware
+ architectures, including x86 and ARM, offering strong cross-platform compatibility. [[1]]
- In terms of community building, openEuler gathers developers, users, enterprise partners, and \
-academic institutions from around the world. [[2]]
- Currently, over 300 enterprises and organizations have participated in community contributions, \
-[[2]] jointly promoting the development of open-source operating systems through technical contributions, \
-code submissions, documentation writing, and testing verification. [[2]]
+ In terms of community building, openEuler brings together developers, users, enterprise
+ partners, and academic institutions worldwide. [[2]] To date, over 300 enterprises and
+ organizations have participated in contributions, jointly promoting open-source OS
+ development through technical contributions, code submissions, documentation, and
+ testing. [[2]]
- This open collaboration model not only promotes technological innovation but also provides \
-comprehensive technical support and community services to users, forming a positive open-source ecosystem. [[2]]
+ This open collaboration model not only drives technological innovation but also provides
+ comprehensive technical support and community services to users, forming a healthy
+ open-source ecosystem. [[2]]
-
- {bac_info}
-
+
+ {# Compute default max length: prefer provided max_length, else use ctx_length*0.7 #}
+ {% set __ctx_len = ctx_length|default(0) %}
+ {% set max_length = max_length if max_length is not none else (__ctx_len * 0.7)|int %}
+ {% set __total_len = 0 %}
+ {% set __stop = false %}
+ {% for item in documents %}
+ -
+ {{item.doc_id}}
+ {{item.doc_name}}
+ {% for chunk in item.chunks %}
+ {% set __chunk_len = (chunk.text|length) %}
+ {% if (__total_len + __chunk_len) > max_length %}
+ {% set __stop = true %}
+ {% break %}
+ {% endif %}
+ {{chunk.text}}
+ {% set __total_len = __total_len + __chunk_len %}
+ {% endfor %}
+
+ {% if __stop %}
+ {% break %}
+ {% endif %}
+ {% endfor %}
+
- {user_question}
+ {{user_question}}
- Please generate a detailed and well-structured answer based on the background information and user question \
-according to the instructions:
+ Please generate a detailed and well-structured answer based on the above background information and user question:
""",
}
diff --git a/apps/scheduler/executor/qa.py b/apps/scheduler/executor/qa.py
index 46b11922015f6bb06512cdbc4aea2e3cb22f686f..e529cfc86f3becf670e96b4f400757ef85754915 100644
--- a/apps/scheduler/executor/qa.py
+++ b/apps/scheduler/executor/qa.py
@@ -1,25 +1,19 @@
"""用于执行智能问答的Executor"""
-import json
import logging
import uuid
-from collections.abc import AsyncGenerator
from datetime import UTC, datetime
-from apps.llm import TokenCalculator
-from apps.models import Document, ExecutorCheckpoint, ExecutorStatus, StepStatus
+from apps.models import ExecutorCheckpoint, ExecutorStatus, StepStatus
from apps.models.task import LanguageType
-from apps.scheduler.call.rag.schema import DocItem, RAGInput
+from apps.scheduler.call.rag.schema import DocItem, RAGOutput
from apps.schemas.document import DocumentInfo
from apps.schemas.enum_var import EventType, SpecialCallType
from apps.schemas.flow import Step
-from apps.schemas.message import DocumentAddContent, TextAddContent
-from apps.schemas.record import RecordDocument
+from apps.schemas.message import DocumentAddContent
from apps.schemas.task import StepQueueItem
-from apps.services import DocumentManager
from .base import BaseExecutor
-from .prompt import GEN_RAG_ANSWER
from .step import StepExecutor
_logger = logging.getLogger(__name__)
@@ -101,81 +95,27 @@ class QAExecutor(BaseExecutor):
appId=None,
)
- async def _get_docs(self, conversation_id: uuid.UUID) -> tuple[list[RecordDocument] | list[Document], list[str]]:
- """获取当前问答可供关联的文档"""
- doc_ids = []
- # 从Conversation中获取刚上传的文档
- docs = await DocumentManager.get_unused_docs(conversation_id)
- # 从最近10条Record中获取文档
- docs += await DocumentManager.get_used_docs(conversation_id, 10, "question")
- doc_ids += [doc.id for doc in docs]
- return docs, doc_ids
-
- async def _push_rag_text(self, content: str) -> None:
- """推送RAG单个消息块"""
- # 如果是换行
- if not content or not content.rstrip().rstrip("\n"):
- return
- await self._push_message(
- event_type=EventType.TEXT_ADD.value,
- data=TextAddContent(text=content).model_dump(exclude_none=True, by_alias=True),
- )
-
- async def _construct_prompt(
- self,
- doc_ids: list[str],
- data: RAGInput,
- ) -> AsyncGenerator[str, None]:
- """获取RAG服务的结果"""
- # 获取文档信息
- doc_chunk_list = await self._fetch_doc_chunks(data)
- bac_info, doc_info_list = await self._assemble_doc_info(
- doc_chunk_list=doc_chunk_list, max_tokens=8192,
- )
- # 构建提示词
- prompt_template = GEN_RAG_ANSWER[self.task.runtime.language]
- prompt = prompt_template.format(bac_info=bac_info, user_question=data.query)
- # 计算token数
- input_tokens = TokenCalculator().calculate_token_length(messages=[{"role": "system", "content": prompt}])
- output_tokens = 0
- doc_cnt: int = 0
- for doc_info in doc_info_list:
- doc_cnt = max(doc_cnt, doc_info.order)
- yield (
- "data: "
- + json.dumps(
- {
- "event_type": EventType.DOCUMENT_ADD.value,
- "input_tokens": input_tokens,
- "output_tokens": output_tokens,
- "content": doc_info.model_dump(),
- },
- ensure_ascii=False,
- )
- + "\n\n"
- )
- # 使用LLM的推理模型调用大模型
- messages = [
- {"role": "system", "content": prompt},
- {"role": "user", "content": data.query},
- ]
-
async def _assemble_doc_info(
self,
doc_chunk_list: list[DocItem],
max_tokens: int,
- ) -> tuple[str, list[DocumentInfo]]:
+ ) -> list[DocumentInfo]:
"""组装文档信息"""
- bac_info = ""
doc_info_list = []
doc_cnt = 0
doc_id_map = {}
- remaining_tokens = round(max_tokens * 0.8)
+ # 预留tokens
+ _ = round(max_tokens * 0.8)
for doc_chunk in doc_chunk_list:
if doc_chunk.docId not in doc_id_map:
doc_cnt += 1
# 创建DocumentInfo对象
+ created_at_value = (
+ doc_chunk.docCreatedAt.timestamp()
+ if isinstance(doc_chunk.docCreatedAt, datetime)
+ else doc_chunk.docCreatedAt
+ )
doc_info = DocumentInfo(
id=doc_chunk.docId,
order=doc_cnt,
@@ -184,55 +124,15 @@ class QAExecutor(BaseExecutor):
extension=doc_chunk.docExtension,
abstract=doc_chunk.docAbstract,
size=doc_chunk.docSize,
- created_at=doc_chunk.docCreatedAt.timestamp() if isinstance(doc_chunk.docCreatedAt, datetime) else doc_chunk.docCreatedAt,
+ created_at=created_at_value,
)
doc_info_list.append(doc_info)
doc_id_map[doc_chunk.docId] = doc_cnt
- doc_index = doc_id_map[doc_chunk.docId]
-
- if bac_info:
- bac_info += "\n\n"
- bac_info += f""""""
- for chunk in doc_chunk.chunks:
- if remaining_tokens <= 0:
- break
- chunk_text = chunk.text
- chunk_text = TokenCalculator().get_k_tokens_words_from_content(
- content=chunk_text, k=remaining_tokens,
- )
- remaining_tokens -= TokenCalculator().calculate_token_length(messages=[
- {"role": "user", "content": ""},
- {"role": "user", "content": chunk_text},
- {"role": "user", "content": ""},
- ], pure_text=True)
- bac_info += f"""
-
- {chunk_text}
-
- """
- bac_info += ""
- return bac_info, doc_info_list
-
- async def _push_rag_doc(self, doc: DocumentInfo, document_order: int = 1) -> None:
- """推送RAG使用的文档信息"""
- await self._push_message(
- event_type=EventType.DOCUMENT_ADD.value,
- data=DocumentAddContent(
- documentId=str(doc.id),
- documentOrder=doc.order,
- documentAuthor=doc.author,
- documentName=doc.name,
- documentAbstract=doc.abstract,
- documentType=doc.extension,
- documentSize=doc.size,
- createdAt=doc.created_at,
- ).model_dump(exclude_none=True, by_alias=True),
- )
+ return doc_info_list
async def run(self) -> None:
"""运行QA"""
- # 运行RAG步骤
rag_exec = StepExecutor(
msg_queue=self.msg_queue,
task=self.task,
@@ -246,9 +146,22 @@ class QAExecutor(BaseExecutor):
question=self.question,
llm=self.llm,
)
+ await rag_exec.init()
+ await rag_exec.run()
+ # 解析并推送文档信息
+ if first_chunk and isinstance(first_chunk.content, dict):
+ rag_output = RAGOutput.model_validate(first_chunk.content)
+ doc_chunk_list: list[DocItem] = [
+ DocItem.model_validate(item) if not isinstance(item, DocItem) else item
+ for item in rag_output.corpus
+ ]
+ doc_info_list = await self._assemble_doc_info(doc_chunk_list, 8192)
+ for doc_info in doc_info_list:
+ await self._push_rag_doc(doc_info)
# 保存答案
+ full_answer = ""
self.task.runtime.fullAnswer = full_answer
self.task.runtime.fullTime = round(datetime.now(UTC).timestamp(), 2) - self.task.runtime.time
diff --git a/apps/scheduler/executor/step.py b/apps/scheduler/executor/step.py
index ce664d2144b0aa8bd3613725cfafcdbeb0874961..a6470955bd3dde9c3d0307e0fd24e5984d53d8ce 100644
--- a/apps/scheduler/executor/step.py
+++ b/apps/scheduler/executor/step.py
@@ -10,7 +10,7 @@ from typing import TYPE_CHECKING, Any
from pydantic import ConfigDict
-from apps.models import ExecutorHistory, StepStatus
+from apps.models import ExecutorHistory, StepStatus, StepType
from apps.scheduler.call.core import CoreCall
from apps.scheduler.call.empty import Empty
from apps.scheduler.call.facts.facts import FactsCall
@@ -23,7 +23,7 @@ from apps.schemas.enum_var import (
SpecialCallType,
)
from apps.schemas.message import TextAddContent
-from apps.schemas.scheduler import CallError, CallOutputChunk
+from apps.schemas.scheduler import CallError, CallOutputChunk, ExecutorBackground
from apps.services import NodeManager
from .base import BaseExecutor
@@ -38,6 +38,7 @@ class StepExecutor(BaseExecutor):
"""工作流中步骤相关函数"""
step: "StepQueueItem"
+ background: "ExecutorBackground"
model_config = ConfigDict(
arbitrary_types_allowed=True,
@@ -90,7 +91,7 @@ class StepExecutor(BaseExecutor):
# State写入ID和运行状态
self.task.state.stepId = self.step.step_id
- self.task.state.stepDescription = self.step.step.description
+ self.task.state.stepType = StepType(self.step.step.type)
self.task.state.stepName = self.step.step.name
# 获取并验证Call类
@@ -264,7 +265,7 @@ class StepExecutor(BaseExecutor):
executorStatus=self.task.state.executorStatus,
stepId=self.step.step_id,
stepName=self.step.step.name,
- stepDescription=self.step.step.description,
+ stepType=StepType(self.step.step.type),
stepStatus=self.task.state.stepStatus,
inputData=self.obj.input,
outputData=output_data,
diff --git a/apps/scheduler/pool/loader/mcp.py b/apps/scheduler/pool/loader/mcp.py
index 4e073a5a93260162fde3975f47d4b26c6879e9ad..2c9ea0d9161956154180de92d4b5123f662df6c5 100644
--- a/apps/scheduler/pool/loader/mcp.py
+++ b/apps/scheduler/pool/loader/mcp.py
@@ -177,16 +177,11 @@ class MCPLoader(metaclass=SingletonMeta):
async def cancel_all_installing_task() -> None:
"""取消正在安装的MCP模板任务"""
template_path = MCP_PATH / "template"
- logger.info("[MCPLoader] 初始化所有MCP模板: %s", template_path)
+ logger.info("[MCPLoader] 取消正在安装的MCP模板任务: %s", template_path)
- # 遍历所有模板
- mcp_ids = []
- async for mcp_dir in template_path.iterdir():
- # 不是目录
- if not await mcp_dir.is_dir():
- logger.warning("[MCPLoader] 跳过非目录: %s", mcp_dir.as_posix())
- continue
- mcp_ids.append(mcp_dir.name)
+ # 获取所有MCP ID
+ mcp_configs = await MCPLoader._get_all_mcp_configs()
+ mcp_ids = list(mcp_configs.keys())
async with postgres.session() as session:
await session.execute(
@@ -201,15 +196,14 @@ class MCPLoader(metaclass=SingletonMeta):
@staticmethod
- async def _init_all_template() -> None:
+ async def _get_all_mcp_configs() -> dict[str, MCPServerConfig]:
"""
- 初始化所有文件夹中的MCP模板
+ 获取所有MCP模板的配置
- 遍历 ``template`` 目录下的所有MCP模板,并初始化。在Framework启动时进行此流程,确保所有MCP均可正常使用。
- 这一过程会与数据库内的条目进行对比,若发生修改,则重新创建数据库条目。
+ :return: MCP ID到配置的映射
"""
+ mcp_configs = {}
template_path = MCP_PATH / "template"
- logger.info("[MCPLoader] 初始化所有MCP模板: %s", template_path)
# 遍历所有模板
async for mcp_dir in template_path.iterdir():
@@ -224,12 +218,32 @@ class MCPLoader(metaclass=SingletonMeta):
logger.warning("[MCPLoader] 跳过没有配置文件的MCP模板: %s", mcp_dir.as_posix())
continue
- # 读取配置并加载
- config = await MCPLoader._load_config(config_path)
+ try:
+ # 读取配置
+ config = await MCPLoader._load_config(config_path)
+ mcp_configs[mcp_dir.name] = config
+ except Exception as e: # noqa: BLE001
+ logger.warning("[MCPLoader] 加载MCP配置失败: %s, 错误: %s", mcp_dir.name, str(e))
+ continue
+
+ return mcp_configs
+
+ @staticmethod
+ async def _init_all_template() -> None:
+ """
+ 初始化所有文件夹中的MCP模板
+
+ 遍历 ``template`` 目录下的所有MCP模板,并初始化。在Framework启动时进行此流程,确保所有MCP均可正常使用。
+ 这一过程会与数据库内的条目进行对比,若发生修改,则重新创建数据库条目。
+ """
+ template_path = MCP_PATH / "template"
+ logger.info("[MCPLoader] 初始化所有MCP模板: %s", template_path)
+ mcp_configs = await MCPLoader._get_all_mcp_configs()
+ for mcp_id, config in mcp_configs.items():
# 初始化第一个MCP Server
- logger.info("[MCPLoader] 初始化MCP模板: %s", mcp_dir.as_posix())
- await MCPLoader.init_one_template(mcp_dir.name, config)
+ logger.info("[MCPLoader] 初始化MCP模板: %s", mcp_id)
+ await MCPLoader.init_one_template(mcp_id, config)
@staticmethod
@@ -639,6 +653,33 @@ class MCPLoader(metaclass=SingletonMeta):
await session.commit()
+ @staticmethod
+ async def set_vector(embedding_model: Embedding) -> None:
+ """
+ 将MCP工具描述进行向量化并存入数据库
+
+ :param Embedding embedding_model: 嵌入模型
+ :return: 无
+ """
+ try:
+ # 获取所有MCP配置
+ mcp_configs = await MCPLoader._get_all_mcp_configs()
+
+ for mcp_id, config in mcp_configs.items():
+ try:
+ # 进行向量化
+ await MCPLoader._insert_template_tool_vector(mcp_id, config, embedding_model)
+ logger.info("[MCPLoader] MCP工具向量化完成: %s", mcp_id)
+ except Exception as e: # noqa: BLE001
+ logger.warning("[MCPLoader] MCP工具向量化失败: %s, 错误: %s", mcp_id, str(e))
+ continue
+
+ logger.info("[MCPLoader] 所有MCP工具向量化完成")
+ except Exception as e:
+ err = "[MCPLoader] MCP工具向量化失败"
+ logger.exception(err)
+ raise RuntimeError(err) from e
+
@staticmethod
async def init() -> None:
"""
diff --git a/apps/scheduler/pool/loader/service.py b/apps/scheduler/pool/loader/service.py
index def8814687efa4a36948aa18801dd9542bc67769..671d25f1472f970b90684a7003151192b3d2bcbf 100644
--- a/apps/scheduler/pool/loader/service.py
+++ b/apps/scheduler/pool/loader/service.py
@@ -6,7 +6,7 @@ import shutil
import uuid
from anyio import Path
-from sqlalchemy import delete
+from sqlalchemy import delete, select
from apps.common.config import config
from apps.common.postgres import postgres
@@ -31,6 +31,23 @@ BASE_PATH = Path(config.deploy.data_dir) / "semantics" / "service"
class ServiceLoader:
"""Service 加载器"""
+ @staticmethod
+ async def _load_all_services() -> list[tuple[Service, list[NodeInfo]]]:
+ """从数据库加载所有服务和对应的节点"""
+ async with postgres.session() as session:
+ # 查询所有服务
+ services_query = select(Service)
+ services = list((await session.scalars(services_query)).all())
+
+ # 为每个服务查询对应的节点
+ service_nodes = []
+ for service in services:
+ nodes_query = select(NodeInfo).where(NodeInfo.serviceId == service.id)
+ nodes = list((await session.scalars(nodes_query)).all())
+ service_nodes.append((service, nodes))
+
+ return service_nodes
+
@staticmethod
async def load(service_id: uuid.UUID, hashes: dict[str, str]) -> None:
"""加载单个Service"""
@@ -145,11 +162,29 @@ class ServiceLoader:
await session.commit()
+ @staticmethod
+ async def set_vector(embedding_model: Embedding) -> None:
+ """将所有服务和节点的向量化数据存入数据库"""
+ service_nodes = await ServiceLoader._load_all_services()
+
+ # 为每个服务调用现有的update_vector方法
+ for service, nodes in service_nodes:
+ await ServiceLoader._update_vector(
+ nodes,
+ service.id,
+ service.description,
+ embedding_model,
+ )
@staticmethod
- async def _update_vector(nodes: list[NodeInfo], metadata: ServiceMetadata, embedding_model: Embedding) -> None:
+ async def _update_vector(
+ nodes: list[NodeInfo],
+ service_id: uuid.UUID,
+ service_description: str,
+ embedding_model: Embedding,
+ ) -> None:
"""更新向量数据"""
- service_vecs = await embedding_model.get_embedding([metadata.description])
+ service_vecs = await embedding_model.get_embedding([service_description])
node_descriptions = []
for node in nodes:
node_descriptions += [node.description]
@@ -158,20 +193,20 @@ class ServiceLoader:
async with postgres.session() as session:
# 删除旧数据
await session.execute(
- delete(embedding_model.ServicePoolVector).where(embedding_model.ServicePoolVector.id == metadata.id),
+ delete(embedding_model.ServicePoolVector).where(embedding_model.ServicePoolVector.id == service_id),
)
await session.execute(
- delete(embedding_model.NodePoolVector).where(embedding_model.NodePoolVector.serviceId == metadata.id),
+ delete(embedding_model.NodePoolVector).where(embedding_model.NodePoolVector.serviceId == service_id),
)
# 插入新数据
session.add(embedding_model.ServicePoolVector(
- id=metadata.id,
+ id=service_id,
embedding=service_vecs[0],
))
for vec in node_vecs:
node_data = embedding_model.NodePoolVector(
id=node.id,
- serviceId=metadata.id,
+ serviceId=service_id,
embedding=vec,
)
session.add(node_data)
diff --git a/apps/scheduler/scheduler/context.py b/apps/scheduler/scheduler/context.py
index 03438416eab5ca73e5bceca39c83cc0b6dd60b75..ddaddba85a9bebc4cf5eb485113589099fa94854 100644
--- a/apps/scheduler/scheduler/context.py
+++ b/apps/scheduler/scheduler/context.py
@@ -23,26 +23,6 @@ logger = logging.getLogger(__name__)
async def save_data(scheduler: "Scheduler") -> None:
"""保存当前Executor、Task、Record等的数据"""
- # 构造RecordContent
- used_docs = []
- order_to_id = {}
- for docs in task.runtime.documents:
- used_docs.append(
- RecordGroupDocument(
- _id=docs["id"],
- author=docs.get("author", ""),
- order=docs.get("order", 0),
- name=docs["name"],
- abstract=docs.get("abstract", ""),
- extension=docs.get("extension", ""),
- size=docs.get("size", 0),
- associated="answer",
- created_at=docs.get("created_at", round(datetime.now(UTC).timestamp(), 3)),
- ),
- )
- if docs.get("order") is not None:
- order_to_id[docs["order"]] = docs["id"]
-
foot_note_pattern = re.compile(r"\[\[(\d+)\]\]")
footnote_list = []
offset = 0
diff --git a/apps/scheduler/scheduler/scheduler.py b/apps/scheduler/scheduler/scheduler.py
index bdc5bc6f033fb3b515a4f51d04e6d1a5a66459c8..55b312b95e4be0ba96871c88a105f2ac38ed9074 100644
--- a/apps/scheduler/scheduler/scheduler.py
+++ b/apps/scheduler/scheduler/scheduler.py
@@ -11,8 +11,8 @@ from jinja2.sandbox import SandboxedEnvironment
from apps.common.queue import MessageQueue
from apps.llm import Embedding, FunctionLLM, JsonGenerator, ReasoningLLM
-from apps.models import AppType, ExecutorStatus, Task, TaskRuntime, User
-from apps.scheduler.executor import FlowExecutor, MCPAgentExecutor
+from apps.models import AppType, Conversation, ExecutorStatus, Task, TaskRuntime, User
+from apps.scheduler.executor import FlowExecutor, MCPAgentExecutor, QAExecutor
from apps.scheduler.pool.pool import Pool
from apps.schemas.enum_var import EventType
from apps.schemas.message import (
@@ -20,11 +20,12 @@ from apps.schemas.message import (
InitContentFeature,
)
from apps.schemas.request_data import RequestData
-from apps.schemas.scheduler import ExecutorBackground, LLMConfig, TopFlow
+from apps.schemas.scheduler import LLMConfig, TopFlow
from apps.schemas.task import TaskData
from apps.services import (
Activity,
AppCenterManager,
+ ConversationManager,
KnowledgeBaseManager,
LLMManager,
TaskManager,
@@ -33,7 +34,7 @@ from apps.services import (
from .prompt import FLOW_SELECT
-logger = logging.getLogger(__name__)
+_logger = logging.getLogger(__name__)
class Scheduler:
@@ -64,14 +65,14 @@ class Scheduler:
user = await UserManager.get_user(user_sub)
if not user:
err = f"[Scheduler] 用户 {user_sub} 不存在"
- logger.error(err)
+ _logger.error(err)
raise RuntimeError(err)
self.user = user
# 获取Task
task = await TaskManager.get_task_data_by_task_id(task_id)
if not task:
- logger.info("[Scheduler] 新建任务")
+ _logger.info("[Scheduler] 新建任务")
task = TaskData(
metadata=Task(
id=task_id,
@@ -92,6 +93,7 @@ class Scheduler:
autoescape=False,
trim_blocks=True,
lstrip_blocks=True,
+ extensions=["jinja2.ext.loopcontrols"],
)
# LLM
@@ -140,16 +142,16 @@ class Scheduler:
is_active = await Activity.is_active(user_sub)
if not is_active:
- logger.warning("[Scheduler] 用户 %s 不活跃,终止工作流", user_sub)
+ _logger.warning("[Scheduler] 用户 %s 不活跃,终止工作流", user_sub)
kill_event.set()
break
# 控制检查频率
await asyncio.sleep(check_interval)
except asyncio.CancelledError:
- logger.info("[Scheduler] 活动监控任务已取消")
+ _logger.info("[Scheduler] 活动监控任务已取消")
except Exception:
- logger.exception("[Scheduler] 活动监控过程中发生错误")
+ _logger.exception("[Scheduler] 活动监控过程中发生错误")
kill_event.set()
@@ -159,18 +161,18 @@ class Scheduler:
reasoning_llm = await LLMManager.get_llm(reasoning_llm_id)
if not reasoning_llm:
err = "[Scheduler] 获取问答用大模型ID失败"
- logger.error(err)
+ _logger.error(err)
raise ValueError(err)
reasoning_llm = ReasoningLLM(reasoning_llm)
# 获取功能性的大模型信息
function_llm = None
if not self.user.functionLLM:
- logger.error("[Scheduler] 用户 %s 没有设置函数调用大模型,相关功能将被禁用", self.user.userSub)
+ _logger.error("[Scheduler] 用户 %s 没有设置函数调用大模型,相关功能将被禁用", self.user.userSub)
else:
function_llm = await LLMManager.get_llm(self.user.functionLLM)
if not function_llm:
- logger.error(
+ _logger.error(
"[Scheduler] 用户 %s 设置的函数调用大模型ID %s 不存在,相关功能将被禁用",
self.user.userSub, self.user.functionLLM,
)
@@ -179,11 +181,11 @@ class Scheduler:
embedding_llm = None
if not self.user.embeddingLLM:
- logger.error("[Scheduler] 用户 %s 没有设置向量模型,相关功能将被禁用", self.user.userSub)
+ _logger.error("[Scheduler] 用户 %s 没有设置向量模型,相关功能将被禁用", self.user.userSub)
else:
embedding_llm = await LLMManager.get_llm(self.user.embeddingLLM)
if not embedding_llm:
- logger.error(
+ _logger.error(
"[Scheduler] 用户 %s 设置的向量模型ID %s 不存在,相关功能将被禁用",
self.user.userSub, self.user.embeddingLLM,
)
@@ -201,22 +203,22 @@ class Scheduler:
"""获取Top1 Flow"""
if not self.llm.function:
err = "[Scheduler] 未设置Function模型"
- logger.error(err)
+ _logger.error(err)
raise RuntimeError(err)
# 获取所选应用的所有Flow
if not self.post_body.app or not self.post_body.app.app_id:
err = "[Scheduler] 未选择应用"
- logger.error(err)
+ _logger.error(err)
raise RuntimeError(err)
flow_list = await Pool().get_flow_metadata(self.post_body.app.app_id)
if not flow_list:
err = "[Scheduler] 未找到应用中合法的Flow"
- logger.error(err)
+ _logger.error(err)
raise RuntimeError(err)
- logger.info("[Scheduler] 选择应用 %s 最合适的Flow", self.post_body.app.app_id)
+ _logger.info("[Scheduler] 选择应用 %s 最合适的Flow", self.post_body.app.app_id)
choices = [{
"name": flow.id,
"description": f"{flow.name}, {flow.description}",
@@ -239,8 +241,9 @@ class Scheduler:
result = TopFlow.model_validate(result_str)
return result.choice
+
async def create_new_conversation(
- title: str, user_sub: str, app_id: uuid.UUID | None = None,
+ self, title: str, user_sub: str, app_id: uuid.UUID | None = None,
*,
debug: bool = False,
) -> Conversation:
@@ -273,7 +276,7 @@ class Scheduler:
async def run(self) -> None:
"""运行调度器"""
# 如果是智能问答,直接执行
- logger.info("[Scheduler] 开始执行")
+ _logger.info("[Scheduler] 开始执行")
# 创建用于通信的事件
kill_event = asyncio.Event()
monitor = asyncio.create_task(self._monitor_activity(kill_event, self.task.metadata.userSub))
@@ -282,25 +285,17 @@ class Scheduler:
if self.post_body.app and self.post_body.app.app_id:
rag_method = False
- if self.task.state.appId:
- rag_method = False
-
if rag_method:
kb_ids = await KnowledgeBaseManager.get_selected_kb(self.task.metadata.userSub)
await self._push_init_message(3, is_flow=False)
- rag_data = RAGQueryReq(
- kbIds=kb_ids,
- query=self.post_body.question,
- tokensLimit=self.llm.reasoning.config.maxToken,
- )
# 启动监控任务和主任务
- main_task = asyncio.create_task(push_rag_message(
+ main_task = asyncio.create_task(self._run_qa(
self.task, self.queue, self.task.ids.user_sub, llm, history, doc_ids, rag_data))
else:
# 查找对应的App元数据
app_data = await AppCenterManager.fetch_app_data_by_id(self.post_body.app.app_id)
if not app_data:
- logger.error("[Scheduler] App %s 不存在", self.post_body.app.app_id)
+ _logger.error("[Scheduler] App %s 不存在", self.post_body.app.app_id)
await self.queue.close()
return
@@ -311,8 +306,6 @@ class Scheduler:
else:
# Agent 应用
is_flow = False
- # 需要执行Flow
- self.task = await push_init_message(self.task, self.queue, app_data.history_len, is_flow=is_flow)
# 启动监控任务和主任务
main_task = asyncio.create_task(self._run_agent(self.queue, self.post_body, executor_background))
# 等待任一任务完成
@@ -321,20 +314,20 @@ class Scheduler:
return_when=asyncio.FIRST_COMPLETED,
)
- # 如果是监控任务触发,终止主任务
+ # 如果用户手动终止,则cancel主任务
if kill_event.is_set():
- logger.warning("[Scheduler] 用户活动状态检测不活跃,正在终止工作流执行...")
+ _logger.warning("[Scheduler] 用户取消执行,正在终止...")
main_task.cancel()
if self.task.state.executorStatus in [ExecutorStatus.RUNNING, ExecutorStatus.WAITING]:
self.task.state.executorStatus = ExecutorStatus.CANCELLED
try:
await main_task
- logger.info("[Scheduler] 工作流执行已被终止")
+ _logger.info("[Scheduler] 工作流执行已被终止")
except Exception:
- logger.exception("[Scheduler] 终止工作流时发生错误")
+ _logger.exception("[Scheduler] 终止工作流时发生错误")
# 更新Task,发送结束消息
- logger.info("[Scheduler] 发送结束消息")
+ _logger.info("[Scheduler] 发送结束消息")
await self.queue.push_output(self.task, event_type=EventType.DONE.value, data={})
# 关闭Queue
await self.queue.close()
@@ -343,42 +336,48 @@ class Scheduler:
async def _run_qa(self) -> None:
- pass
+ qa_executor = QAExecutor(
+ task=self.task,
+ msg_queue=self.queue,
+ question=self.post_body.question,
+ llm=self.llm,
+ )
+ _logger.info("[Scheduler] 开始智能问答")
+ await qa_executor.init()
+ await qa_executor.run()
+ self.task = qa_executor.task
async def _run_flow(self) -> None:
# 获取应用信息
if not self.post_body.app or not self.post_body.app.app_id:
- logger.error("[Scheduler] 未选择应用")
+ _logger.error("[Scheduler] 未选择应用")
return
- logger.info("[Scheduler] 获取工作流元数据")
+ _logger.info("[Scheduler] 获取工作流元数据")
flow_info = await Pool().get_flow_metadata(self.post_body.app.app_id)
# 如果flow_info为空,则直接返回
if not flow_info:
- logger.error("[Scheduler] 未找到工作流元数据")
+ _logger.error("[Scheduler] 未找到工作流元数据")
return
# 如果用户选了特定的Flow
- if self.post_body.app.flow_id:
- logger.info("[Scheduler] 获取工作流定义")
- flow_id = self.post_body.app.flow_id
- flow_data = await Pool().get_flow(self.post_body.app.app_id, flow_id)
+ if not self.post_body.app.flow_id:
+ _logger.info("[Scheduler] 选择最合适的流")
+ flow_id = await self.get_top_flow()
else:
# 如果用户没有选特定的Flow,则根据语义选择一个Flow
- logger.info("[Scheduler] 选择最合适的流")
- flow_id = await self.get_top_flow()
- logger.info("[Scheduler] 获取工作流定义")
- flow_data = await Pool().get_flow(self.post_body.app.app_id, flow_id)
+ flow_id = self.post_body.app.flow_id
+ _logger.info("[Scheduler] 获取工作流定义")
+ flow_data = await Pool().get_flow(self.post_body.app.app_id, flow_id)
# 如果flow_data为空,则直接返回
if not flow_data:
- logger.error("[Scheduler] 未找到工作流定义")
+ _logger.error("[Scheduler] 未找到工作流定义")
return
# 初始化Executor
- logger.info("[Scheduler] 初始化Executor")
flow_exec = FlowExecutor(
flow_id=flow_id,
flow=flow_data,
@@ -390,33 +389,53 @@ class Scheduler:
)
# 开始运行
- logger.info("[Scheduler] 运行Executor")
+ _logger.info("[Scheduler] 运行工作流执行器")
await flow_exec.init()
await flow_exec.run()
self.task = flow_exec.task
- async def _run_agent(
- self, queue: MessageQueue, post_body: RequestData, background: ExecutorBackground,
- ) -> None:
+ async def _run_agent(self) -> None:
"""构造Executor并执行"""
+ # 获取应用信息
+ if not self.post_body.app or not self.post_body.app.app_id:
+ _logger.error("[Scheduler] 未选择MCP应用")
+ return
+
# 初始化Executor
agent_exec = MCPAgentExecutor(
task=self.task,
- msg_queue=queue,
- question=post_body.question,
- history_len=self.post_body.app.history_len if hasattr(self.post_body.app, 'history_len') else 3,
- background=background,
+ msg_queue=self.queue,
+ question=self.post_body.question,
agent_id=self.post_body.app.app_id,
- params=self.post_body.params if hasattr(self.post_body, 'params') else {},
+ params=self.post_body.app.params,
llm=self.llm,
)
# 开始运行
- logger.info("[Scheduler] 运行Executor")
+ _logger.info("[Scheduler] 运行MCP执行器")
+ await agent_exec.init()
await agent_exec.run()
self.task = agent_exec.task
async def _save_task(self) -> None:
"""保存Task"""
- await TaskManager.save_task(self.task)
+ # 构造RecordContent
+ used_docs = []
+ order_to_id = {}
+ for docs in task.runtime.documents:
+ used_docs.append(
+ RecordGroupDocument(
+ _id=docs["id"],
+ author=docs.get("author", ""),
+ order=docs.get("order", 0),
+ name=docs["name"],
+ abstract=docs.get("abstract", ""),
+ extension=docs.get("extension", ""),
+ size=docs.get("size", 0),
+ associated="answer",
+ created_at=docs.get("created_at", round(datetime.now(UTC).timestamp(), 3)),
+ ),
+ )
+ if docs.get("order") is not None:
+ order_to_id[docs["order"]] = docs["id"]
diff --git a/apps/schemas/enum_var.py b/apps/schemas/enum_var.py
index d5a8ba264048cbecff46cbfedfdeed11ec32bff7..5606ed92ba3cae8827a076af1b1f2772dc10faf7 100644
--- a/apps/schemas/enum_var.py
+++ b/apps/schemas/enum_var.py
@@ -28,7 +28,6 @@ class EventType(str, Enum):
INIT = "init"
TEXT_ADD = "text.add"
GRAPH = "graph"
- DOCUMENT_ADD = "document.add"
STEP_WAITING_FOR_START = "step.waiting_for_start"
STEP_WAITING_FOR_PARAM = "step.waiting_for_param"
FLOW_START = "flow.start"
diff --git a/apps/schemas/flow.py b/apps/schemas/flow.py
index 26152f0e07808f5adcf0d1514b893a03d0fb0aec..17007bdfb0aaf2183786c6020bea34e41b896bcf 100644
--- a/apps/schemas/flow.py
+++ b/apps/schemas/flow.py
@@ -94,7 +94,6 @@ class MetadataBase(BaseModel):
icon: str = Field(description="图标", default="")
name: str = Field(description="元数据名称")
description: str = Field(description="元数据描述")
- version: str = Field(description="元数据版本")
author: str = Field(description="创建者的用户名")
hashes: dict[str, str] | None = Field(description="资源(App、Service等)下所有文件的hash值", default=None)
diff --git a/apps/schemas/message.py b/apps/schemas/message.py
index 1e36cd275cbd40b75a2124a8004f1ebec083dd8c..3b2b1706d795912a33574505100437ab0f90b79e 100644
--- a/apps/schemas/message.py
+++ b/apps/schemas/message.py
@@ -2,7 +2,6 @@
"""队列中的消息结构"""
import uuid
-from datetime import UTC, datetime
from typing import Any
from pydantic import BaseModel, Field
@@ -31,20 +30,16 @@ class HeartbeatData(BaseModel):
class MessageFlow(BaseModel):
"""消息中有关Flow信息的部分"""
- app_id: uuid.UUID = Field(description="插件ID", alias="appId")
- flow_id: str = Field(description="Flow ID", alias="flowId")
- flow_name: str = Field(description="Flow名称", alias="flowName")
- flow_status: ExecutorStatus = Field(description="Flow状态", alias="flowStatus", default=ExecutorStatus.UNKNOWN)
+ app_id: uuid.UUID | None = Field(description="插件ID", alias="appId", default=None)
+ executor_id: str = Field(description="Flow ID", alias="executorId")
+ executor_name: str = Field(description="Flow名称", alias="executorName")
+ executor_status: ExecutorStatus = Field(
+ description="Flow状态", alias="executorStatus", default=ExecutorStatus.UNKNOWN,
+ )
step_id: uuid.UUID = Field(description="当前步骤ID", alias="stepId")
step_name: str = Field(description="当前步骤名称", alias="stepName")
- sub_step_id: str | None = Field(description="当前子步骤ID", alias="subStepId", default=None)
- sub_step_name: str | None = Field(description="当前子步骤名称", alias="subStepName", default=None)
+ step_type: str = Field(description="当前步骤类型", alias="stepType")
step_status: StepStatus = Field(description="当前步骤状态", alias="stepStatus")
- step_description: str | None = Field(
- description="当前步骤描述",
- alias="stepDescription",
- default=None,
- )
class MessageMetadata(RecordMetadata):
@@ -75,28 +70,11 @@ class TextAddContent(BaseModel):
text: str = Field(min_length=1, description="流式生成的文本内容")
-class DocumentAddContent(BaseModel):
- """document.add消息的content"""
-
- document_id: str = Field(description="文档UUID", alias="documentId")
- document_order: int = Field(description="文档在对话中的顺序,从1开始", alias="documentOrder")
- document_author: str = Field(description="文档作者", alias="documentAuthor", default="")
- document_name: str = Field(description="文档名称", alias="documentName")
- document_abstract: str = Field(description="文档摘要", alias="documentAbstract", default="")
- document_type: str = Field(description="文档MIME类型", alias="documentType", default="")
- document_size: float = Field(ge=0, description="文档大小,单位是KB,保留两位小数", alias="documentSize", default=0)
- created_at: float = Field(
- description="文档创建时间,单位是秒", alias="createdAt",
- default_factory=lambda: round(datetime.now(tz=UTC).timestamp(), 3),
- )
-
-
class MessageBase(HeartbeatData):
"""基础消息事件结构"""
- id: str = Field(min_length=36, max_length=36)
- conversation_id: uuid.UUID = Field(min_length=36, max_length=36, alias="conversationId")
- task_id: uuid.UUID = Field(min_length=36, max_length=36, alias="taskId")
+ id: uuid.UUID = Field(min_length=36, max_length=36)
+ conversation_id: uuid.UUID | None = Field(min_length=36, max_length=36, alias="conversationId", default=None)
flow: MessageFlow | None = None
content: Any | None = Field(default=None, description="消息内容")
metadata: MessageMetadata
diff --git a/apps/schemas/request_data.py b/apps/schemas/request_data.py
index b59918b9f68b1f6220e61ea1217463342e20d7c7..7d1916b8d0012c42ea537a86b9394cc873c12daf 100644
--- a/apps/schemas/request_data.py
+++ b/apps/schemas/request_data.py
@@ -25,15 +25,14 @@ class RequestData(BaseModel):
"""POST /api/chat 请求的总的数据结构"""
question: str = Field(max_length=2000, description="用户输入")
- conversation_id: uuid.UUID = Field(
- default=uuid.UUID("00000000-0000-0000-0000-000000000000"), alias="conversationId", description="聊天ID",
+ conversation_id: uuid.UUID | None = Field(
+ default=None, alias="conversationId", description="聊天ID",
)
language: LanguageType = Field(default=LanguageType.CHINESE, description="语言")
- files: list[str] = Field(default=[], description="文件列表")
app: RequestDataApp | None = Field(default=None, description="应用")
- debug: bool = Field(default=False, description="是否调试")
task_id: str | None = Field(default=None, alias="taskId", description="任务ID")
llm_id: str = Field(alias="llmId", description="大模型ID")
+ kb_ids: list[uuid.UUID] = Field(default=[], description="知识库ID列表")
class PostTagData(BaseModel):
diff --git a/apps/schemas/scheduler.py b/apps/schemas/scheduler.py
index 2686b4d40ee1376b2849d6a2bf1eb5bfe061047d..6b48db2028328ddcdfe4646e3c33901989b220a1 100644
--- a/apps/schemas/scheduler.py
+++ b/apps/schemas/scheduler.py
@@ -10,7 +10,6 @@ from apps.llm import Embedding, FunctionLLM, ReasoningLLM
from apps.models import ExecutorHistory, LanguageType
from .enum_var import CallOutputType
-from .scheduler import ExecutorBackground
class LLMConfig(BaseModel):
@@ -36,6 +35,15 @@ class CallIds(BaseModel):
session_id: str | None = Field(description="当前用户的Session ID")
app_id: uuid.UUID | None = Field(description="当前应用的ID")
user_sub: str = Field(description="当前用户的用户ID")
+ conversation_id: uuid.UUID | None = Field(description="当前对话的ID")
+
+
+class ExecutorBackground(BaseModel):
+ """Executor的背景信息"""
+
+ num: int = Field(description="对话记录最大数量", default=0)
+ conversation: list[dict[str, str]] = Field(description="对话记录", default=[])
+ facts: list[str] = Field(description="当前Executor的背景信息", default=[])
class CallVars(BaseModel):
@@ -50,14 +58,6 @@ class CallVars(BaseModel):
language: LanguageType = Field(description="语言", default=LanguageType.CHINESE)
-class ExecutorBackground(BaseModel):
- """Executor的背景信息"""
-
- num: int = Field(description="对话记录最大数量", default=0)
- conversation: list[dict[str, str]] = Field(description="对话记录", default=[])
- facts: list[str] = Field(description="当前Executor的背景信息", default=[])
-
-
class CallError(Exception):
"""Call错误"""
diff --git a/apps/services/llm.py b/apps/services/llm.py
index eab2d1b61615d89ee38d4a2d555f367d82781a10..203474cf9c934f391249399571a336e87ae28a89 100644
--- a/apps/services/llm.py
+++ b/apps/services/llm.py
@@ -6,7 +6,9 @@ import logging
from sqlalchemy import select
from apps.common.postgres import postgres
+from apps.llm import Embedding
from apps.models import LLMData, User
+from apps.scheduler.pool.pool import Pool
from apps.schemas.request_data import (
UpdateLLMReq,
UpdateUserSelectedLLMReq,
@@ -185,6 +187,10 @@ class LLMManager:
req: UpdateUserSelectedLLMReq,
) -> None:
"""更新用户的默认LLM"""
+ # 检查embedding模型是否发生变化
+ old_embedding_llm = None
+ new_embedding_llm = req.embeddingLLM
+
async with postgres.session() as session:
user = (await session.scalars(
select(User).where(User.userSub == user_sub),
@@ -192,6 +198,28 @@ class LLMManager:
if not user:
err = f"[LLMManager] 用户 {user_sub} 不存在"
raise ValueError(err)
+
+ old_embedding_llm = user.embeddingLLM
user.functionLLM = req.functionLLM
user.embeddingLLM = req.embeddingLLM
await session.commit()
+
+ # 如果embedding模型发生变化,触发向量化过程
+ if old_embedding_llm != new_embedding_llm and new_embedding_llm:
+ try:
+ # 获取新的embedding模型配置
+ embedding_llm_config = await LLMManager.get_llm(new_embedding_llm)
+ if embedding_llm_config:
+ # 创建Embedding实例
+ embedding_model = Embedding(embedding_llm_config)
+ await embedding_model.init()
+
+ # 获取Pool实例并触发向量化
+ pool = Pool()
+ await pool.set_vector(embedding_model)
+
+ logger.info("[LLMManager] 用户 %s 的embedding模型已更新,向量化过程已完成", user_sub)
+ else:
+ logger.error("[LLMManager] 用户 %s 选择的embedding模型 %s 不存在", user_sub, new_embedding_llm)
+ except Exception:
+ logger.exception("[LLMManager] 用户 %s 的embedding模型向量化过程失败", user_sub)
diff --git a/apps/services/mcp_service.py b/apps/services/mcp_service.py
index 6f48c953f70623d609692665d61d90e76f171180..14cd6230fd58ac15a8dedc86295d0c571c962206 100644
--- a/apps/services/mcp_service.py
+++ b/apps/services/mcp_service.py
@@ -9,7 +9,7 @@ from typing import Any
import magic
from fastapi import UploadFile
from PIL import Image
-from sqlalchemy import and_, or_, select
+from sqlalchemy import and_, delete, or_, select
from apps.common.postgres import postgres
from apps.constants import (
@@ -25,6 +25,7 @@ from apps.models import (
MCPTools,
MCPType,
)
+from apps.models.app import AppMCP
from apps.scheduler.pool.loader.mcp import MCPLoader
from apps.scheduler.pool.mcp.pool import MCPPool
from apps.schemas.enum_var import SearchType
@@ -310,8 +311,15 @@ class MCPServiceManager:
msg = "[MCPServiceManager] MCP服务未找到或无权限"
raise ValueError(msg)
- for user_id in db_service.activated:
- await MCPServiceManager.deactive_mcpservice(user_sub=user_id, mcp_id=data.mcp_id)
+ # 查询所有激活该MCP服务的用户
+ async with postgres.session() as session:
+ activated_users = (await session.scalars(select(MCPActivated).where(
+ MCPActivated.mcpId == data.mcp_id,
+ ))).all()
+
+ # 为每个激活的用户取消激活
+ for activated_user in activated_users:
+ await MCPServiceManager.deactive_mcpservice(user_sub=activated_user.userSub, mcp_id=data.mcp_id)
mcp_config = MCPServerConfig(
name=data.name,
@@ -349,12 +357,11 @@ class MCPServiceManager:
# 删除对应的mcp
await MCPLoader.delete_mcp(mcp_id)
- # 遍历所有应用,将其中的MCP依赖删除
- app_collection = MongoDB().get_collection("application")
- await app_collection.update_many(
- {"mcp_service": mcp_id},
- {"$pull": {"mcp_service": mcp_id}},
- )
+ # 从PostgreSQL中删除所有应用对应该MCP服务的关联记录
+ async with postgres.session() as session:
+ stmt = delete(AppMCP).where(AppMCP.mcpId == mcp_id)
+ await session.execute(stmt)
+ await session.commit()
@staticmethod
diff --git a/manual/source/conf.py b/manual/source/conf.py
index 5b61cf42b5aff54c448b4bb04ab229204df84155..0505fdb68743cf0cdad74ebdd8982a97aedb4865 100644
--- a/manual/source/conf.py
+++ b/manual/source/conf.py
@@ -11,6 +11,7 @@ release = "0.10.0"
extensions = [
"sphinx.ext.autodoc",
"sphinxcontrib.autodoc_pydantic",
+ "sphinxcontrib.plantuml",
]
templates_path = ["_templates"]
@@ -26,3 +27,8 @@ autodoc_pydantic_model_show_field_summary = True
autodoc_pydantic_field_show_default = True
sys.path.insert(0, str(Path(__file__).resolve().parents[2]))
+
+plantuml_output_format = "svg"
+plantuml = (
+ r"java -jar C:\Users\ObjectNF\scoop\apps\plantuml\current\plantuml.jar"
+)
diff --git a/manual/source/index.rst b/manual/source/index.rst
index 676840dd65d9b3ca0b4a522b405073e65e50e190..62bfa69b30872afe1de372a3808578ecb47345fa 100644
--- a/manual/source/index.rst
+++ b/manual/source/index.rst
@@ -8,3 +8,4 @@ openEuler Intelligence Framework 文档
pool/index
models/index
call/index
+ service/index
diff --git a/manual/source/models/index.rst b/manual/source/models/index.rst
index d93bd1392c71a6ac88e2e7283729626b739150eb..f0e0f2cd734f0f23be9ddb34045c82633c11f473 100644
--- a/manual/source/models/index.rst
+++ b/manual/source/models/index.rst
@@ -10,5 +10,4 @@
.. toctree::
:maxdepth: 2
- lance
mongo
diff --git a/manual/source/models/lance.rst b/manual/source/models/lance.rst
deleted file mode 100644
index 43bf10a44df200104b8fa0ae6c9a9fd6ce88ec8a..0000000000000000000000000000000000000000
--- a/manual/source/models/lance.rst
+++ /dev/null
@@ -1,21 +0,0 @@
-##############
-LanceDB 数据库
-##############
-
-LanceDB 是一个基于矢量数据库的存储系统,用于存储和检索向量化的数据。
-
-LanceDB 的详细介绍请参考 `LanceDB 官方文档 `__ ,其特性如下:
-
-- Serverless,无需安装和部署服务
-- 支持大规模数据存储和检索
-
-
-*************
-LanceDB连接器
-*************
-
-.. autoclass:: apps.models.lance.LanceDB
- :members:
- :undoc-members:
- :private-members:
- :inherited-members:
diff --git a/manual/source/service/activity.rst b/manual/source/service/activity.rst
new file mode 100644
index 0000000000000000000000000000000000000000..0de41d1749950023b570f1532ccc37f8706cfe70
--- /dev/null
+++ b/manual/source/service/activity.rst
@@ -0,0 +1,118 @@
+############################
+Activity 模块(限流与并发)
+############################
+
+.. currentmodule:: apps.services.activity
+
+概述
+====
+
+``Activity`` 模块负责用户请求的限流与全局并发控制,提供以下能力:
+
+- 单用户滑动窗口限流:限制单个用户在指定时间窗口内的请求数量
+- 全局并发限制:控制系统同时执行的任务总数
+- 活动状态管理:提供活跃任务的登记与释放
+- 数据持久化:使用 PostgreSQL 持久化活动记录
+
+核心常量
+========
+
+以下常量由 ``apps.constants`` 提供,可通过配置调整:
+
+- ``MAX_CONCURRENT_TASKS``:全局同时运行任务上限(默认 30)
+- ``SLIDE_WINDOW_QUESTION_COUNT``:滑动窗口内单用户最大请求数(默认 5)
+- ``SLIDE_WINDOW_TIME``:滑动窗口时间(秒,默认 15)
+
+数据模型
+========
+
+使用 ``SessionActivity`` 记录活跃行为:
+
+- ``id``:主键 ID(自增)
+- ``userSub``:用户实体 ID(外键关联)
+- ``timestamp``:活动时间戳(UTC)
+
+API 参考
+========
+
+Activity 类
+-----------
+
+.. autoclass:: Activity
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
+设计说明
+========
+
+- 滑动窗口限流:统计用户在最近 ``SLIDE_WINDOW_TIME`` 秒内的请求数量,超过 ``SLIDE_WINDOW_QUESTION_COUNT`` 即判定限流。
+- 全局并发控制:通过统计 ``SessionActivity`` 的记录数判断是否达到 ``MAX_CONCURRENT_TASKS`` 上限。
+- 并发安全:依赖数据库事务保障登记与释放的原子性。
+- 时间精度:所有时间戳使用 UTC。
+
+注意事项
+========
+
+1. 在调用活跃登记前应先进行可用性检查。
+2. 任务完成后需及时释放活跃记录以避免占用并发额度。
+3. 合理调整常量以适配不同吞吐需求。
+
+
+
+流程图(is_active)
+===================
+
+.. uml::
+
+ @startuml
+ title Activity.is_active 流程
+ start
+ :获取当前 UTC 时间;
+ :开启数据库会话;
+ :统计 userSub 在 [now - SLIDE_WINDOW_TIME, now] 的请求数;
+ if (count >= SLIDE_WINDOW_QUESTION_COUNT?) then (是)
+ :返回 True (达到限流);
+ stop
+ else (否)
+ :统计当前活跃任务数量 current_active;
+ if (current_active >= MAX_CONCURRENT_TASKS?) then (是)
+ :返回 True (达到并发上限);
+ else (否)
+ :返回 False (可执行);
+ endif
+ endif
+ stop
+ @enduml
+
+
+时序图(set_active/remove_active)
+===================================
+
+.. uml::
+
+ @startuml
+ title 活跃任务登记与释放时序
+ actor Client
+ participant "Activity" as Activity
+ database "PostgreSQL" as PG
+
+ == set_active ==
+ Client -> Activity: set_active(user_sub)
+ Activity -> PG: 查询当前活跃数量
+ PG --> Activity: current_active
+ alt 达到并发上限
+ Activity -> Client: 抛出 ActivityError
+ else 未达上限
+ Activity -> PG: merge SessionActivity(userSub, timestamp)
+ Activity -> PG: commit
+ Activity -> Client: 完成
+ end
+
+ == remove_active ==
+ Client -> Activity: remove_active(user_sub)
+ Activity -> PG: delete SessionActivity where userSub=user_sub
+ Activity -> PG: commit
+ Activity -> Client: 完成
+
+ @enduml
diff --git a/manual/source/service/index.rst b/manual/source/service/index.rst
new file mode 100644
index 0000000000000000000000000000000000000000..5cb76ac3380835b6a2a94b1976f03a4bd0ed5832
--- /dev/null
+++ b/manual/source/service/index.rst
@@ -0,0 +1,9 @@
+########################
+服务模块文档(Service)
+########################
+
+.. toctree::
+ :maxdepth: 2
+
+ activity
+ tag
diff --git a/manual/source/service/tag.rst b/manual/source/service/tag.rst
new file mode 100644
index 0000000000000000000000000000000000000000..6f72c9f07a532fae7f2515f4b267216880bb68cf
--- /dev/null
+++ b/manual/source/service/tag.rst
@@ -0,0 +1,130 @@
+########################
+Tag 模块(用户标签)
+########################
+
+.. currentmodule:: apps.services.tag
+
+概述
+====
+
+``Tag`` 模块提供用户标签的增删改查能力,统一通过数据库会话访问 ``Tag`` 与 ``UserTag`` 两类模型,实现:
+
+- 获取全部标签
+- 按名称查询标签
+- 按用户 ``sub`` 查询其拥有的标签集合
+- 新增标签(若存在则合并)
+- 按名称更新标签定义
+- 删除标签
+
+依赖
+====
+
+- 数据库会话:``apps.common.postgres.postgres.session`` (异步上下文)
+- 数据模型:``apps.models.Tag``、``apps.models.UserTag``
+- 入参模型:``apps.schemas.request_data.PostTagData`` (字段:``tag``、``description``)
+
+API 参考
+========
+
+TagManager 类
+--------------
+
+.. autoclass:: TagManager
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
+设计说明
+========
+
+- 读取路径:查询 ``Tag`` 表,或先查 ``UserTag`` 再关联回 ``Tag`` 表。
+- 写入路径:采用 ``session.merge`` 以便在新增/幂等更新间取得平衡。
+- 时间戳:更新时写入 ``updatedAt = datetime.now(tz=UTC)``。
+- 异常处理:更新/删除时若目标标签不存在,记录错误并抛出 ``ValueError``。
+
+流程图(get_tag_by_user_sub)
+=============================
+
+.. uml::
+
+ @startuml
+ title 通过 user_sub 查询用户标签流程
+ start
+ :传入 user_sub;
+ :开启数据库会话;
+ :查询 UserTag where userSub = user_sub;
+ if (是否存在 user_tags?) then (是)
+ :遍历 user_tags;
+ :按 tag id 逐个查询 Tag;
+ :若 Tag 存在则加入结果列表;
+ else (否)
+ :返回空列表;
+ stop
+ endif
+ :返回标签结果列表;
+ stop
+ @enduml
+
+流程图(add/update/delete)
+============================
+
+.. uml::
+
+ @startuml
+ title 标签写操作流程(add / update / delete)
+ start
+ :开启数据库会话;
+ partition add_tag {
+ :构造 Tag(name=data.tag, definition=data.description);
+ :session.merge(tag);
+ :commit;
+ }
+ partition update_tag_by_name {
+ :按名称查询 Tag;
+ if (是否存在?) then (否)
+ :logger.error 并抛出 ValueError;
+ stop
+ else (是)
+ :更新 definition, updatedAt;
+ :session.merge(tag);
+ :commit 并返回 tag;
+ endif
+ }
+ partition delete_tag {
+ :按名称查询 Tag;
+ if (是否存在?) then (否)
+ :logger.error 并抛出 ValueError;
+ stop
+ else (是)
+ :session.delete(tag);
+ :commit;
+ endif
+ }
+ stop
+ @enduml
+
+时序图(get_tag_by_name / add_tag)
+===================================
+
+.. uml::
+
+ @startuml
+ title 查询与新增标签时序
+ actor Client
+ participant "TagManager" as TM
+ database "PostgreSQL" as PG
+
+ == get_tag_by_name ==
+ Client -> TM: get_tag_by_name(name)
+ TM -> PG: SELECT Tag WHERE name = :name LIMIT 1
+ PG --> TM: Tag | None
+ TM -> Client: 返回 Tag | None
+
+ == add_tag ==
+ Client -> TM: add_tag(PostTagData)
+ TM -> PG: MERGE Tag(name, definition)
+ TM -> PG: COMMIT
+ TM -> Client: 完成
+ @enduml
+
+
diff --git a/pyproject.toml b/pyproject.toml
index 857fa8a39a3d28b9c0e758111818194f65a3da9f..6aa0bd603435f0086df981a89c17b5b3cd91ad44 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -49,4 +49,5 @@ dev = [
"ruff==0.12.5",
"sphinx==8.2.3",
"sphinx-rtd-theme==3.0.2",
+ "sphinxcontrib-plantuml>=0.31",
]