diff --git a/apps/llm/function.py b/apps/llm/function.py index 74d5543a024905414f82780867b9e9a5fd46699c..88ee8d690fab99a5230c57059a5a9d60aed34941 100644 --- a/apps/llm/function.py +++ b/apps/llm/function.py @@ -13,8 +13,8 @@ from jinja2 import BaseLoader from jinja2.sandbox import SandboxedEnvironment from jsonschema import Draft7Validator -from apps.common.config import config from apps.constants import JSON_GEN_MAX_TRIAL, REASONING_END_TOKEN +from apps.models.llm import FunctionCallBackend, LLMData from .prompt import JSON_GEN_BASIC @@ -26,7 +26,7 @@ class FunctionLLM: timeout: float = 30.0 - def __init__(self) -> None: + def __init__(self, llm_config: LLMData | None = None) -> None: """ 初始化用于FunctionCall的模型 @@ -38,39 +38,44 @@ class FunctionLLM: - structured_output """ # 暂存config;这里可以替代为从其他位置获取 - self._config = config.function_call - if not self._config.model: + if not llm_config: + err = "未设置大模型配置" + logger.error(err) + raise RuntimeError(err) + self._config: LLMData = llm_config + + if not self._config.modelName: err_msg = "[FunctionCall] 未设置FuntionCall所用模型!" logger.error(err_msg) raise ValueError(err_msg) self._params = { - "model": self._config.model, + "model": self._config.modelName, "messages": [], } - if self._config.backend == "ollama" and not self._config.api_key: + if self._config.functionCallBackend == FunctionCallBackend.OLLAMA and not self._config.openaiAPIKey: self._client = ollama.AsyncClient( - host=self._config.endpoint, + host=self._config.openaiBaseUrl, timeout=self.timeout, ) - elif self._config.backend == "ollama" and self._config.api_key: + elif self._config.functionCallBackend == FunctionCallBackend.OLLAMA and self._config.openaiAPIKey: self._client = ollama.AsyncClient( - host=self._config.endpoint, + host=self._config.openaiBaseUrl, headers={ - "Authorization": f"Bearer {self._config.api_key}", + "Authorization": f"Bearer {self._config.openaiAPIKey}", }, timeout=self.timeout, ) - elif self._config.backend != "ollama" and not self._config.api_key: + elif self._config.functionCallBackend != FunctionCallBackend.OLLAMA and not self._config.openaiAPIKey: self._client = openai.AsyncOpenAI( - base_url=self._config.endpoint, + base_url=self._config.openaiBaseUrl, timeout=self.timeout, ) - elif self._config.backend != "ollama" and self._config.api_key: + elif self._config.functionCallBackend != FunctionCallBackend.OLLAMA and self._config.openaiAPIKey: self._client = openai.AsyncOpenAI( - base_url=self._config.endpoint, - api_key=self._config.api_key, + base_url=self._config.openaiBaseUrl, + api_key=self._config.openaiAPIKey, timeout=self.timeout, ) @@ -98,14 +103,14 @@ class FunctionLLM: "temperature": temperature, }) - if self._config.backend == "vllm": + if self._config.functionCallBackend == FunctionCallBackend.VLLM: self._params["extra_body"] = {"guided_json": schema} - elif self._config.backend == "json_mode": + elif self._config.functionCallBackend == FunctionCallBackend.JSON_MODE: logger.warning("[FunctionCall] json_mode无法确保输出格式符合要求,使用效果将受到影响") self._params["response_format"] = {"type": "json_object"} - elif self._config.backend == "structured_output": + elif self._config.functionCallBackend == FunctionCallBackend.STRUCTURED_OUTPUT: self._params["response_format"] = { "type": "json_schema", "json_schema": { @@ -116,7 +121,7 @@ class FunctionLLM: }, } - elif self._config.backend == "function_call": + elif self._config.functionCallBackend == FunctionCallBackend.FUNCTION_CALL: logger.warning("[FunctionCall] function_call无法确保一定调用工具,使用效果将受到影响") self._params["tools"] = [ { @@ -220,14 +225,19 @@ class FunctionLLM: """ # 检查max_tokens和temperature是否设置 if max_tokens is None: - max_tokens = self._config.max_tokens + max_tokens = self._config.maxToken if temperature is None: temperature = self._config.temperature - if self._config.backend == "ollama": + if self._config.functionCallBackend == FunctionCallBackend.OLLAMA: json_str = await self._call_ollama(messages, schema, max_tokens, temperature) - elif self._config.backend in ["function_call", "json_mode", "response_format", "vllm"]: + elif self._config.functionCallBackend in [ + FunctionCallBackend.FUNCTION_CALL, + FunctionCallBackend.JSON_MODE, + FunctionCallBackend.STRUCTURED_OUTPUT, + FunctionCallBackend.VLLM, + ]: json_str = await self._call_openai(messages, schema, max_tokens, temperature) else: @@ -301,11 +311,12 @@ class JsonGenerator: return {} - def __init__(self, query: str, conversation: list[dict[str, str]], schema: dict[str, Any]) -> None: + def __init__(self, config: LLMData, query: str, conversation: list[dict[str, str]], schema: dict[str, Any]) -> None: """初始化JSON生成器""" self._query = query self._conversation = conversation self._schema = schema + self._config = config self._trial = {} self._count = 0 @@ -321,7 +332,7 @@ class JsonGenerator: async def _assemble_message(self) -> str: """组装消息""" # 检查类型 - function_call = config.function_call.backend == "function_call" + function_call = self._config.functionCallBackend == FunctionCallBackend.FUNCTION_CALL # 渲染模板 template = self._env.from_string(JSON_GEN_BASIC) diff --git a/apps/llm/reasoning.py b/apps/llm/reasoning.py index f5d396a3fbaa69bd965cf93835b2f3db3c76d133..4cd886b2057fe8f4ec9fffa32f01d03f17240d2a 100644 --- a/apps/llm/reasoning.py +++ b/apps/llm/reasoning.py @@ -8,9 +8,8 @@ from dataclasses import dataclass from openai import AsyncOpenAI from openai.types.chat import ChatCompletionChunk -from apps.common.config import config from apps.constants import REASONING_BEGIN_TOKEN, REASONING_END_TOKEN -from apps.schemas.config import LLMConfig +from apps.models.llm import LLMData from .token import TokenCalculator @@ -95,27 +94,27 @@ class ReasoningLLM: output_tokens: int = 0 timeout: float = 30.0 - def __init__(self, llm_config: LLMConfig | None = None) -> None: + def __init__(self, llm_config: LLMData | None = None) -> None: """判断配置文件里用了哪种大模型;初始化大模型客户端""" if not llm_config: - self._config: LLMConfig = config.llm - self._init_client() - else: - self._config: LLMConfig = llm_config - self._init_client() + err = "未设置大模型配置" + logger.error(err) + raise RuntimeError(err) + self._config: LLMData = llm_config + self._init_client() def _init_client(self) -> None: """初始化OpenAI客户端""" - if not self._config.key: + if not self._config.openaiAPIKey: self._client = AsyncOpenAI( - base_url=self._config.endpoint, + base_url=self._config.openaiBaseUrl, timeout=self.timeout, ) return self._client = AsyncOpenAI( - api_key=self._config.key, - base_url=self._config.endpoint, + api_key=self._config.openaiAPIKey, + base_url=self._config.openaiBaseUrl, timeout=self.timeout, ) @@ -141,11 +140,11 @@ class ReasoningLLM: ) -> AsyncGenerator[ChatCompletionChunk, None]: """创建流式响应""" if model is None: - model = self._config.model + model = self._config.modelName return await self._client.chat.completions.create( model=model, messages=messages, # type: ignore[] - max_tokens=max_tokens or self._config.max_tokens, + max_tokens=max_tokens or self._config.maxToken, temperature=temperature or self._config.temperature, stream=True, stream_options={"include_usage": True}, @@ -164,11 +163,11 @@ class ReasoningLLM: """调用大模型,分为流式和非流式两种""" # 检查max_tokens和temperature if max_tokens is None: - max_tokens = self._config.max_tokens + max_tokens = self._config.maxToken if temperature is None: temperature = self._config.temperature if model is None: - model = self._config.model + model = self._config.modelName msg_list = self._validate_messages(messages) stream = await self._create_stream(msg_list, max_tokens, temperature, model) reasoning = ReasoningContent() diff --git a/apps/models/llm.py b/apps/models/llm.py index 267f12bbdd324541b65fbf530c319cacf8aa9163..a819ea350d0b98fff401a09535d0b9b68735e7d7 100644 --- a/apps/models/llm.py +++ b/apps/models/llm.py @@ -1,32 +1,68 @@ """大模型信息 数据库表""" from datetime import UTC, datetime +from enum import Enum as PyEnum -from sqlalchemy import DateTime, Integer, String +from sqlalchemy import ARRAY, DateTime, Enum, Float, Integer, String from sqlalchemy.orm import Mapped, mapped_column -from apps.common.config import config from apps.templates.generate_llm_operator_config import llm_provider_dict from .base import Base +class LLMType(str, PyEnum): + """大模型类型""" + + FUNCTION = "function" + """模型支持Function Call""" + EMBEDDING = "embedding" + """模型支持Embedding""" + VISION = "vision" + """模型支持图片理解""" + REASONING = "reasoning" + """模型支持思考推理""" + + +class FunctionCallBackend(str, PyEnum): + """Function Call后端""" + + OLLAMA = "ollama" + """Ollama""" + VLLM = "vllm" + """VLLM""" + FUNCTION_CALL = "function_call" + """Function Call""" + JSON_MODE = "json_mode" + """JSON Mode""" + STRUCTURED_OUTPUT = "structured_output" + """Structured Output""" + + class LLMData(Base): """大模型信息""" __tablename__ = "framework_llm" id: Mapped[str] = mapped_column(String(255), primary_key=True) """大模型ID""" - icon: Mapped[str] = mapped_column(String(1000), default=llm_provider_dict["ollama"]["icon"], nullable=False) - """LLM图标路径""" - openaiBaseUrl: Mapped[str] = mapped_column(String(300), default=config.llm.endpoint, nullable=False) # noqa: N815 + openaiBaseUrl: Mapped[str] = mapped_column(String(300), nullable=False) # noqa: N815 """LLM URL地址""" - openaiAPIKey: Mapped[str] = mapped_column(String(300), default=config.llm.key, nullable=False) # noqa: N815 + openaiAPIKey: Mapped[str] = mapped_column(String(300), nullable=False) # noqa: N815 """LLM API Key""" - modelName: Mapped[str] = mapped_column(String(300), default=config.llm.model, nullable=False) # noqa: N815 + modelName: Mapped[str] = mapped_column(String(300), nullable=False) # noqa: N815 """LLM模型名""" - maxToken: Mapped[int] = mapped_column(Integer, default=config.llm.max_tokens, nullable=False) # noqa: N815 + maxToken: Mapped[int] = mapped_column(Integer, default=8192, nullable=False) # noqa: N815 """LLM最大Token数量""" + temperature: Mapped[float] = mapped_column(Float, default=0.7, nullable=False) + """LLM温度""" + icon: Mapped[str] = mapped_column(String(1000), default=llm_provider_dict["ollama"]["icon"], nullable=False) + """LLM图标路径""" + llmType: Mapped[list[LLMType]] = mapped_column(ARRAY(Enum(LLMType)), default=[], nullable=False) # noqa: N815 + """LLM类型""" + functionCallBackend: Mapped[FunctionCallBackend | None] = mapped_column( # noqa: N815 + Enum(FunctionCallBackend), default=None, nullable=True, + ) + """Function Call后端""" createdAt: Mapped[DateTime] = mapped_column( # noqa: N815 DateTime, default_factory=lambda: datetime.now(tz=UTC), diff --git a/apps/models/node.py b/apps/models/node.py index f477e4e6f4864476a67df02fcfadeffbb1c16559..8f6331f920048c8ce42c459bf083d9c8e78186c9 100644 --- a/apps/models/node.py +++ b/apps/models/node.py @@ -1,10 +1,11 @@ """节点 数据库表""" +import uuid from datetime import UTC, datetime from typing import Any from sqlalchemy import DateTime, ForeignKey, String -from sqlalchemy.dialects.postgresql import JSONB +from sqlalchemy.dialects.postgresql import JSONB, UUID from sqlalchemy.orm import Mapped, mapped_column from .base import Base @@ -18,8 +19,8 @@ class NodeInfo(Base): """节点名称""" description: Mapped[str] = mapped_column(String(2000), nullable=False) """节点描述""" - serviceId: Mapped[str | None] = mapped_column( # noqa: N815 - String(255), ForeignKey("framework_service.id"), nullable=True, + serviceId: Mapped[uuid.UUID | None] = mapped_column( # noqa: N815 + UUID(as_uuid=True), ForeignKey("framework_service.id"), nullable=True, ) """所属服务ID""" callId: Mapped[str] = mapped_column(String(50), nullable=False) # noqa: N815 diff --git a/apps/models/task.py b/apps/models/task.py index 36f82152cbeb06cc5b995f6939aa720380870018..c21975e9858ab42a4c0d2aa7347143e286498ff0 100644 --- a/apps/models/task.py +++ b/apps/models/task.py @@ -23,6 +23,8 @@ class Task(Base): UUID(as_uuid=True), ForeignKey("framework_conversation.id"), nullable=False, ) """对话ID""" + sessionId: Mapped[str] = mapped_column(String(255), ForeignKey("framework_session.id"), nullable=False) # noqa: N815 + """会话ID""" checkpointId: Mapped[uuid.UUID | None] = mapped_column( # noqa: N815 UUID(as_uuid=True), ForeignKey("framework_executor_checkpoint.id"), nullable=True, default=None, diff --git a/apps/models/vectors.py b/apps/models/vectors.py index 266901dbeeb7e56e207ec72cdd3e9906fb860e58..7378d07ed60422ee0078fb6536f4809dceeeebd6 100644 --- a/apps/models/vectors.py +++ b/apps/models/vectors.py @@ -37,7 +37,7 @@ class ServicePoolVector(Base): __tablename__ = "framework_service_vector" embedding: Mapped[list[float]] = mapped_column(Vector(1024), nullable=False) """向量数据""" - id: Mapped[str] = mapped_column(String(255), ForeignKey("framework_service.id"), primary_key=True) + id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("framework_service.id"), primary_key=True) """Service的ID""" __table_args__ = ( Index( @@ -56,7 +56,9 @@ class NodePoolVector(Base): __tablename__ = "framework_node_vector" embedding: Mapped[list[float]] = mapped_column(Vector(1024), nullable=False) """向量数据""" - serviceId: Mapped[str | None] = mapped_column(String(255), ForeignKey("framework_service.id"), nullable=True) # noqa: N815 + serviceId: Mapped[uuid.UUID | None] = mapped_column( # noqa: N815 + UUID(as_uuid=True), ForeignKey("framework_service.id"), nullable=True, + ) """Service的ID""" id: Mapped[str] = mapped_column(String(255), ForeignKey("framework_node.id"), primary_key=True) """Node的ID""" diff --git a/apps/routers/llm.py b/apps/routers/llm.py index e5d84daa26a847dba0067b83c6068920fb9d8ed8..6c281a47618a64719cd8aa89f8eef70d2bc65a72 100644 --- a/apps/routers/llm.py +++ b/apps/routers/llm.py @@ -1,8 +1,6 @@ # Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """FastAPI 大模型相关接口""" -import uuid - from fastapi import APIRouter, Depends, Request, status from fastapi.responses import JSONResponse @@ -55,7 +53,7 @@ async def list_llm_provider() -> JSONResponse: @router.get("", response_model=ListLLMRsp, responses={status.HTTP_404_NOT_FOUND: {"model": ResponseData}}, ) -async def list_llm(llmId: uuid.UUID | None = None) -> JSONResponse: # noqa: N803 +async def list_llm(llmId: str | None = None) -> JSONResponse: # noqa: N803 """获取大模型列表""" llm_list = await LLMManager.list_llm(llmId) return JSONResponse( @@ -73,7 +71,7 @@ async def list_llm(llmId: uuid.UUID | None = None) -> JSONResponse: # noqa: N80 ) async def create_llm( req: UpdateLLMReq, - llmId: uuid.UUID | None = None, # noqa: N803 + llmId: str | None = None, # noqa: N803 ) -> JSONResponse: """创建或更新大模型配置""" try: @@ -100,7 +98,7 @@ async def create_llm( @admin_router.delete("", responses={status.HTTP_404_NOT_FOUND: {"model": ResponseData}}, ) -async def delete_llm(request: Request, llmId: uuid.UUID) -> JSONResponse: # noqa: N803 +async def delete_llm(request: Request, llmId: str) -> JSONResponse: # noqa: N803 """删除大模型配置""" try: await LLMManager.delete_llm(request.state.user_sub, llmId) @@ -128,7 +126,7 @@ async def delete_llm(request: Request, llmId: uuid.UUID) -> JSONResponse: # noq ) async def update_user_llm( request: Request, - llmId: uuid.UUID, # noqa: N803 + llmId: str, # noqa: N803 ) -> JSONResponse: """更新用户所选的大模型""" try: diff --git a/apps/routers/parameter.py b/apps/routers/parameter.py index 02ac6d637107792c50accc1550af14666fd9caaa..60ee123e8f2fa80e98d19e44e672d77ab4556175 100644 --- a/apps/routers/parameter.py +++ b/apps/routers/parameter.py @@ -24,7 +24,7 @@ router = APIRouter( @router.get("", response_model=GetParamsRsp) async def get_parameters( - request: Request, appId: uuid.UUID, flowId: uuid.UUID, stepId: uuid.UUID, # noqa: N803 + request: Request, appId: uuid.UUID, flowId: str, stepId: uuid.UUID, # noqa: N803 ) -> JSONResponse: """Get parameters for node choice.""" if not await AppCenterManager.validate_user_app_access(request.state.user_sub, appId): diff --git a/apps/scheduler/call/core.py b/apps/scheduler/call/core.py index 95c012f2f2d06f8f60285682ab919bc42ee3d7d8..3bf42c366a5922461c3279c318dbeabca14aa456 100644 --- a/apps/scheduler/call/core.py +++ b/apps/scheduler/call/core.py @@ -14,7 +14,6 @@ from pydantic.json_schema import SkipJsonSchema from apps.llm.function import FunctionLLM from apps.llm.reasoning import ReasoningLLM -from apps.models.llm import LLMData from apps.models.node import NodeInfo from apps.models.task import ExecutorHistory from apps.schemas.enum_var import CallOutputType, LanguageType @@ -26,6 +25,7 @@ from apps.schemas.scheduler import ( CallTokens, CallVars, ) +from apps.services.llm import LLMManager if TYPE_CHECKING: from apps.scheduler.executor.step import StepExecutor @@ -95,31 +95,30 @@ class CoreCall(BaseModel): @staticmethod def _assemble_call_vars(executor: "StepExecutor") -> CallVars: """组装CallVars""" - if not executor.task.state: + if not executor.state: err = "[CoreCall] 当前ExecutorState为空" logger.error(err) raise ValueError(err) history = {} history_order = [] - for item in executor.task.context: - item_obj = ExecutorHistory.model_validate(item) - history[item_obj.step_id] = item_obj - history_order.append(item_obj.step_id) + for item in executor.context: + history[item.stepId] = item + history_order.append(item.stepId) return CallVars( - language=executor.task.language, + language=executor.runtime.language, ids=CallIds( task_id=executor.task.id, - flow_id=executor.task.state.flow_id, - session_id=executor.task.ids.session_id, - user_sub=executor.task.ids.user_sub, - app_id=executor.task.state.app_id, + executor_id=executor.state.executorId, + session_id=executor.task.sessionId, + user_sub=executor.task.userSub, + app_id=executor.state.appId, ), question=executor.question, history=history, history_order=history_order, - summary=executor.task.runtime.summary, + summary=executor.runtime.reasoning, ) @@ -195,18 +194,53 @@ class CoreCall(BaseModel): await self._after_exec(input_data) - async def _llm(self, messages: list[dict[str, Any]]) -> str: + async def _llm(self, messages: list[dict[str, Any]], *, streaming: bool = False) -> AsyncGenerator[str, None]: """Call可直接使用的LLM非流式调用""" - result = "" - llm = ReasoningLLM() - async for chunk in llm.call(messages, streaming=False): - result += chunk - self.input_tokens = llm.input_tokens - self.output_tokens = llm.output_tokens - return result + user_sub = self._sys_vars.ids.user_sub + llm_id = await LLMManager.get_user_default_llm(user_sub) + if not llm_id: + err = f"[CoreCall] 用户{user_sub}未设置默认LLM" + logger.error(err) + raise CallError( + message=err, + data={ + "user_sub": user_sub, + }, + ) + + llm_data = await LLMManager.get_llm(llm_id) + if not llm_data: + err = f"[CoreCall] LLM{llm_id}不存在" + logger.error(err) + + llm = ReasoningLLM(llm_config=llm_data) + if streaming: + async for chunk in llm.call(messages, streaming=streaming): + yield chunk + else: + result = "" + async for chunk in llm.call(messages, streaming=streaming): + result += chunk + yield result + + self.tokens.input_tokens += llm.input_tokens + self.tokens.output_tokens += llm.output_tokens async def _json(self, messages: list[dict[str, Any]], schema: dict[str, Any]) -> dict[str, Any]: """Call可直接使用的JSON生成""" + user_sub = self._sys_vars.ids.user_sub + llm_id = await LLMManager.get_user_default_llm(user_sub) + if not llm_id: + err = f"[CoreCall] 用户{user_sub}未设置默认LLM" + logger.error(err) + raise CallError( + message=err, + data={ + "user_sub": user_sub, + }, + ) + llm_data = await LLMManager.get_llm(llm_id) + json = FunctionLLM() return await json.call(messages=messages, schema=schema) diff --git a/apps/scheduler/call/facts/facts.py b/apps/scheduler/call/facts/facts.py index ee95b495f294bfe99cbabd24b095378dab8b7578..d317011e07a9c182b9c45965510b09d9abefa801 100644 --- a/apps/scheduler/call/facts/facts.py +++ b/apps/scheduler/call/facts/facts.py @@ -49,7 +49,7 @@ class FactsCall(CoreCall, input_model=FactsInput, output_model=FactsOutput): async def instance(cls, executor: "StepExecutor", node: NodeInfo | None, **kwargs: Any) -> Self: """初始化工具""" obj = cls( - answer=executor.task.runtime.answer, + answer=executor.runtime.fullAnswer, name=executor.step.step.name, description=executor.step.step.description, node=node, @@ -120,5 +120,5 @@ class FactsCall(CoreCall, input_model=FactsInput, output_model=FactsOutput): if not isinstance(content, dict): err = "[FactsCall] 工具输出格式错误" raise TypeError(err) - executor.task.runtime.facts = FactsOutput.model_validate(content).facts + executor.runtime.fact = FactsOutput.model_validate(content).facts yield chunk diff --git a/apps/scheduler/call/llm/llm.py b/apps/scheduler/call/llm/llm.py index d1ecebf0417ebc2a2ed7360f5f6882d985ec34b2..3cf3275a2fe8581a38b50bc9863c3572253a59e9 100644 --- a/apps/scheduler/call/llm/llm.py +++ b/apps/scheduler/call/llm/llm.py @@ -115,12 +115,9 @@ class LLM(CoreCall, input_model=LLMInput, output_model=LLMOutput): """运行LLM Call""" data = LLMInput(**input_data) try: - llm = ReasoningLLM() - async for chunk in llm.call(messages=data.message): + async for chunk in self._llm(messages=data.message, streaming=True): if not chunk: continue yield CallOutputChunk(type=CallOutputType.TEXT, content=chunk) - self.tokens.input_tokens = llm.input_tokens - self.tokens.output_tokens = llm.output_tokens except Exception as e: raise CallError(message=f"大模型调用失败:{e!s}", data={}) from e diff --git a/apps/scheduler/call/mcp/mcp.py b/apps/scheduler/call/mcp/mcp.py index 3d45139be9825b2e4917db40d82c3f313eb67102..3805fd4a8f9a43823f3fc8d4610081dedfa4c092 100644 --- a/apps/scheduler/call/mcp/mcp.py +++ b/apps/scheduler/call/mcp/mcp.py @@ -88,7 +88,7 @@ class MCP(CoreCall, input_model=MCPInput, output_model=MCPOutput): self._host = MCPHost( call_vars.ids.user_sub, call_vars.ids.task_id, - call_vars.ids.flow_id, + call_vars.ids.executor_id, self.description, language=self._sys_vars.language, ) diff --git a/apps/scheduler/call/slot/slot.py b/apps/scheduler/call/slot/slot.py index abd2831e43948da3b30a1910359d896bc7880090..3b88fb4ff14b6a554a6d2010f5d195be4503556d 100644 --- a/apps/scheduler/call/slot/slot.py +++ b/apps/scheduler/call/slot/slot.py @@ -9,6 +9,7 @@ from jinja2 import BaseLoader from jinja2.sandbox import SandboxedEnvironment from pydantic import Field +from apps.llm.function import FunctionLLM from apps.models.node import NodeInfo from apps.scheduler.call.core import CoreCall from apps.scheduler.slot.slot import Slot as SlotProcessor @@ -71,13 +72,7 @@ class Slot(CoreCall, input_model=SlotInput, output_model=SlotOutput): ] # 使用大模型进行尝试 - reasoning = ReasoningLLM() - answer = "" - async for chunk in reasoning.call(messages=conversation, streaming=False): - answer += chunk - self.tokens.input_tokens += reasoning.input_tokens - self.tokens.output_tokens += reasoning.output_tokens - + answer = await self._llm(messages=conversation, streaming=False) answer = await FunctionLLM.process_response(answer) try: data = json.loads(answer) @@ -91,12 +86,7 @@ class Slot(CoreCall, input_model=SlotInput, output_model=SlotOutput): {"role": "user", "content": self._question}, {"role": "assistant", "content": answer}, ] - json_gen = JsonGenerator( - query=self._question, - conversation=conversation, - schema=remaining_schema, - ) - return await json_gen.generate() + return await self._json(messages=conversation, schema=remaining_schema) @classmethod async def instance(cls, executor: "StepExecutor", node: NodeInfo | None, **kwargs: Any) -> Self: @@ -105,7 +95,7 @@ class Slot(CoreCall, input_model=SlotInput, output_model=SlotOutput): name=executor.step.step.name, description=executor.step.step.description, facts=executor.background.facts, - summary=executor.task.runtime.summary, + summary=executor.runtime.reasoning, node=node, **kwargs, ) diff --git a/apps/scheduler/call/suggest/suggest.py b/apps/scheduler/call/suggest/suggest.py index e85f606cbda165867a2151fe655c49f9f318d5a0..d67dc385f8ba882ac269936d47d88f559762db73 100644 --- a/apps/scheduler/call/suggest/suggest.py +++ b/apps/scheduler/call/suggest/suggest.py @@ -69,11 +69,11 @@ class Suggestion(CoreCall, input_model=SuggestionInput, output_model=SuggestionO context = [ { "role": "user", - "content": executor.task.runtime.question, + "content": executor.runtime.userInput, }, { "role": "assistant", - "content": executor.task.runtime.answer, + "content": executor.runtime.fullAnswer, }, ] obj = cls( @@ -81,7 +81,7 @@ class Suggestion(CoreCall, input_model=SuggestionInput, output_model=SuggestionO description=executor.step.step.description, node=node, context=context, - conversation_id=executor.task.ids.conversation_id, + conversation_id=executor.task.conversationId, **kwargs, ) await obj._set_input(executor) @@ -95,7 +95,7 @@ class Suggestion(CoreCall, input_model=SuggestionInput, output_model=SuggestionO self.conversation_id, ) self._app_id = call_vars.ids.app_id - self._flow_id = call_vars.ids.flow_id + self._flow_id = call_vars.ids.executor_id app_metadata = await AppCenterManager.fetch_app_data_by_id(self._app_id) self._env = SandboxedEnvironment( loader=BaseLoader(), diff --git a/apps/scheduler/call/summary/summary.py b/apps/scheduler/call/summary/summary.py index 8b7c5466d7d910293e4580bfe0a272809297ef1c..6f1956dc7b32339aef7f99de71197c06a54bf391 100644 --- a/apps/scheduler/call/summary/summary.py +++ b/apps/scheduler/call/summary/summary.py @@ -77,5 +77,5 @@ class Summary(CoreCall, input_model=DataBase, output_model=SummaryOutput): if not isinstance(content, str): err = "[SummaryCall] 工具输出格式错误" raise TypeError(err) - executor.task.runtime.summary = content + executor.runtime.reasoning = content yield chunk diff --git a/apps/scheduler/pool/loader/app.py b/apps/scheduler/pool/loader/app.py index a0eebe9403dbdcdb520d39dc22e208a02a23c7b6..f9a212a83d084dc0ffcd093a28b94467049f7c70 100644 --- a/apps/scheduler/pool/loader/app.py +++ b/apps/scheduler/pool/loader/app.py @@ -47,7 +47,7 @@ class AppLoader: async for flow_file in flow_path.rglob("*.yaml"): if flow_file.stem not in flow_ids: logger.warning("[AppLoader] 工作流 %s 不在元数据中", flow_file) - flow = await FlowLoader.load(app_id, uuid.UUID(flow_file.stem)) + flow = await FlowLoader.load(app_id, flow_file.stem) if not flow: err = f"[AppLoader] 工作流 {flow_file} 加载失败" raise ValueError(err) diff --git a/apps/scheduler/pool/loader/flow.py b/apps/scheduler/pool/loader/flow.py index e1d912f4b31177ad92c8fc6885fd294bcdfa6946..7ebafefd866e8e051d1686c00cd084d8ca226342 100644 --- a/apps/scheduler/pool/loader/flow.py +++ b/apps/scheduler/pool/loader/flow.py @@ -88,16 +88,22 @@ class FlowLoader: raise ValueError(err) if step["type"]==NodeType.START.value or step["type"]==NodeType.END.value: continue + node_data = await NodeManager.get_node(step["node"]) try: - step["type"] = await NodeManager.get_node_call_id(step["node"]) + step["type"] = node_data.callId except ValueError as e: logger.warning("[FlowLoader] 获取节点call_id失败:%s,错误信息:%s", step["node"], e) step["type"] = "Empty" step["name"] = ( - (await NodeManager.get_node_name(step["node"])) + node_data.name if "name" not in step or step["name"] == "" else step["name"] ) + step["description"] = ( + node_data.description + if "description" not in step or step["description"] == "" + else step["description"] + ) return flow_yaml diff --git a/apps/scheduler/pool/loader/mcp.py b/apps/scheduler/pool/loader/mcp.py index b203c54545f9210e73431da885853e5ab5b27841..0c86fc23090ec8e40d7efc120ec6544f9e2abea2 100644 --- a/apps/scheduler/pool/loader/mcp.py +++ b/apps/scheduler/pool/loader/mcp.py @@ -503,14 +503,20 @@ class MCPLoader(metaclass=SingletonMeta): :param list[str] cancel_mcp_list: 需要取消的MCP列表 :return: 无 """ - mongo = MongoDB() - mcp_collection = mongo.get_collection("mcp") - # 更新数据库状态 - cancel_mcp_list = await mcp_collection.distinct("_id", {"_id": {"$in": cancel_mcp_list}, "status": MCPInstallStatus.INSTALLING}) - await mcp_collection.update_many( - {"_id": {"$in": cancel_mcp_list}, "status": MCPInstallStatus.INSTALLING}, - {"$set": {"status": MCPInstallStatus.CANCELLED}}, - ) + async with postgres.session() as session: + result = await session.scalars( + select(MCPInfo).where( + and_( + MCPInfo.status == MCPInstallStatus.INSTALLING, + MCPInfo.id.in_(cancel_mcp_list), + ), + ), + ) + result = result.all() + for mcp in result: + mcp.status = MCPInstallStatus.CANCELLED + await session.commit() + for mcp_id in cancel_mcp_list: ProcessHandler.remove_task(mcp_id) logger.info("[MCPLoader] 取消这些正在安装的MCP模板任务: %s", cancel_mcp_list) diff --git a/apps/scheduler/pool/loader/openapi.py b/apps/scheduler/pool/loader/openapi.py index 979775b26893cb52b8d5b64a022e5d6580dee819..3e141ab9dfc36c6ffb916db6db20f16769988620 100644 --- a/apps/scheduler/pool/loader/openapi.py +++ b/apps/scheduler/pool/loader/openapi.py @@ -2,6 +2,7 @@ """OpenAPI文档载入器""" import logging +import uuid from hashlib import shake_128 from typing import Any @@ -121,7 +122,7 @@ class OpenAPILoader: async def _process_spec( self, - service_id: str, + service_id: uuid.UUID, yaml_filename: str, spec: ReducedOpenAPISpec, server: str, @@ -166,10 +167,10 @@ class OpenAPILoader: self, yaml_dict: dict[str, Any], ) -> ReducedOpenAPISpec: - """加载字典形式的OpenAPI文档""" + """加载字典形式的OpenAPI文档;生成随机的Service ID""" spec = reduce_openapi_spec(yaml_dict) try: - await self._process_spec("temp", "temp.yaml", spec, spec.servers) + await self._process_spec(uuid.uuid4(), "temp.yaml", spec, spec.servers) except Exception: err = "[OpenAPILoader] 处理OpenAPI文档失败" logger.exception(err) @@ -178,7 +179,7 @@ class OpenAPILoader: return spec - async def load_one(self, service_id: str, yaml_path: Path, server: str) -> list[NodeInfo]: + async def load_one(self, service_id: uuid.UUID, yaml_path: Path, server: str) -> list[NodeInfo]: """加载单个OpenAPI文档,可以直接指定路径""" try: spec = await self._read_yaml(yaml_path) diff --git a/apps/scheduler/pool/loader/service.py b/apps/scheduler/pool/loader/service.py index 01cebd78d650621cc579ef90ce806dbadec549a1..802ec35050ae860727dedb2e354c277853330559 100644 --- a/apps/scheduler/pool/loader/service.py +++ b/apps/scheduler/pool/loader/service.py @@ -3,6 +3,7 @@ import logging import shutil +import uuid from anyio import Path from sqlalchemy import delete @@ -27,7 +28,7 @@ class ServiceLoader: """Service 加载器""" @staticmethod - async def load(service_id: str, hashes: dict[str, str]) -> None: + async def load(service_id: uuid.UUID, hashes: dict[str, str]) -> None: """加载单个Service""" service_path = BASE_PATH / str(service_id) # 载入元数据 @@ -54,7 +55,7 @@ class ServiceLoader: @staticmethod - async def save(service_id: str, metadata: ServiceMetadata, data: dict) -> None: + async def save(service_id: uuid.UUID, metadata: ServiceMetadata, data: dict) -> None: """在文件系统上保存Service,并更新数据库""" service_path = BASE_PATH / str(service_id) # 创建文件夹 @@ -74,7 +75,7 @@ class ServiceLoader: @staticmethod - async def delete(service_id: str, *, is_reload: bool = False) -> None: + async def delete(service_id: uuid.UUID, *, is_reload: bool = False) -> None: """删除Service,并更新数据库""" async with postgres.session() as session: await session.execute(delete(Service).where(Service.id == service_id)) diff --git a/apps/scheduler/slot/parser/const.py b/apps/scheduler/slot/parser/const.py index fd582a9724c5bdec7248e17708532bc38b416e42..3d9fadc57914ddd16f2a08a47ba5bb943426e05a 100644 --- a/apps/scheduler/slot/parser/const.py +++ b/apps/scheduler/slot/parser/const.py @@ -26,5 +26,3 @@ class SlotConstParser: @classmethod def keyword_validate(cls, validator: Validator, keyword: str, instance: Any, schema: dict[str, Any]) -> bool: """生成对应类型的验证器""" - ... - diff --git a/apps/scheduler/slot/parser/default.py b/apps/scheduler/slot/parser/default.py index ef950f3a63bada152209bc8c3a4967a7ff3bc4a0..2405bf87930a37742bd71c4f4f1ee68911b72311 100644 --- a/apps/scheduler/slot/parser/default.py +++ b/apps/scheduler/slot/parser/default.py @@ -26,5 +26,3 @@ class SlotDefaultParser: @classmethod def keyword_validate(cls, validator: Validator, keyword: str, instance: Any, schema: dict[str, Any]) -> bool: """给字段设置默认值""" - ... - diff --git a/apps/scheduler/slot/slot.py b/apps/scheduler/slot/slot.py index bda44a3cf26111facd70d3beef71c8244d0d7970..c14422238af8efa4aca619d6bce82f9c4bc8e1c3 100644 --- a/apps/scheduler/slot/slot.py +++ b/apps/scheduler/slot/slot.py @@ -155,7 +155,7 @@ class Slot: return _process_json_value(json_data, self._schema) @staticmethod - def _generate_example(schema_node: dict) -> Any: # noqa: PLR0911 + def _generate_example(schema_node: dict) -> Any: # noqa: C901, PLR0911, PLR0912 """根据schema生成示例值""" if "anyOf" in schema_node or "oneOf" in schema_node: # 如果有anyOf,随机返回一个示例 @@ -185,10 +185,9 @@ class Slot: if "type" not in schema_node: return None type_value = schema_node["type"] - if isinstance(type_value, list): + if isinstance(type_value, list) and len(type_value) > 1: # 如果是多类型,随机返回一个示例 - if len(type_value) > 1: - type_value = type_value[0] + type_value = type_value[0] # 处理类型为 object 的节点 if type_value == "object": data = {} @@ -197,20 +196,20 @@ class Slot: data[name] = Slot._generate_example(schema) return data # 处理类型为 array 的节点 - elif type_value == "array": + if type_value == "array": items_schema = schema_node.get("items", {}) return [Slot._generate_example(items_schema)] # 处理类型为 string 的节点 - elif type_value == "string": + if type_value == "string": return "" # 处理类型为 number 或 integer 的节点 - elif type_value in ["number", "integer"]: + if type_value in ["number", "integer"]: return 0 # 处理类型为 boolean 的节点 - elif type_value == "boolean": + if type_value == "boolean": return False # 处理其他类型或未定义类型 @@ -220,74 +219,67 @@ class Slot: """创建一个空的槽位""" return self._generate_example(self._schema) + def _extract_type_desc(self, schema_node: dict[str, Any]) -> dict[str, Any]: # noqa: C901, PLR0912 + # 处理组合关键字 + special_keys = ["anyOf", "allOf", "oneOf"] + for key in special_keys: + if key in schema_node: + data = { + "type": key, + "description": schema_node.get("description", ""), + "items": {}, + } + for type_index, item in enumerate(schema_node[key]): + if isinstance(item, dict): + data["items"][f"item_{type_index}"] = self._extract_type_desc(item) + else: + data["items"][f"item_{type_index}"] = {"type": item, "description": ""} + return data + # 处理基本类型 + type_val = schema_node.get("type", "") + description = schema_node.get("description", "") + + # 处理多类型数组 + if isinstance(type_val, list): + if len(type_val) > 1: + data = {"type": "union", "description": description, "items": {}} + type_index = 0 + for t in type_val: + if t == "object": + tmp_dict = {} + for key, val in schema_node.get("properties", {}).items(): + tmp_dict[key] = self._extract_type_desc(val) + data["items"][f"item_{type_index}"] = tmp_dict + elif t == "array": + items_schema = schema_node.get("items", {}) + data["items"][f"item_{type_index}"] = self._extract_type_desc(items_schema) + else: + data["items"][f"item_{type_index}"] = {"type": t, "description": description} + type_index += 1 + return data + type_val = type_val[0] if len(type_val) == 1 else "" + + data = {"type": type_val, "description": description, "items": {}} + + # 递归处理对象和数组 + if type_val == "object": + for key, val in schema_node.get("properties", {}).items(): + data["items"][key] = self._extract_type_desc(val) + elif type_val == "array": + items_schema = schema_node.get("items", {}) + if isinstance(items_schema, list): + item_index = 0 + for item_index, item in enumerate(items_schema): + data["items"][f"item_{item_index}"] = self._extract_type_desc(item) + else: + data["items"]["item"] = self._extract_type_desc(items_schema) + if data["items"] == {}: + del data["items"] + return data + def extract_type_desc_from_schema(self) -> dict[str, str]: """从JSON Schema中提取类型描述""" - - def _extract_type_desc(schema_node: dict[str, Any]) -> dict[str, Any]: - # 处理组合关键字 - special_keys = ["anyOf", "allOf", "oneOf"] - for key in special_keys: - if key in schema_node: - data = { - "type": key, - "description": schema_node.get("description", ""), - "items": {}, - } - type_index = 0 - for item in schema_node[key]: - if isinstance(item, dict): - data["items"][f"item_{type_index}"] = _extract_type_desc(item) - else: - data["items"][f"item_{type_index}"] = {"type": item, "description": ""} - type_index += 1 - return data - # 处理基本类型 - type_val = schema_node.get("type", "") - description = schema_node.get("description", "") - - # 处理多类型数组 - if isinstance(type_val, list): - if len(type_val) > 1: - data = {"type": "union", "description": description, "items": {}} - type_index = 0 - for t in type_val: - if t == "object": - tmp_dict = {} - for key, val in schema_node.get("properties", {}).items(): - tmp_dict[key] = _extract_type_desc(val) - data["items"][f"item_{type_index}"] = tmp_dict - elif t == "array": - items_schema = schema_node.get("items", {}) - data["items"][f"item_{type_index}"] = _extract_type_desc(items_schema) - else: - data["items"][f"item_{type_index}"] = {"type": t, "description": description} - type_index += 1 - return data - elif len(type_val) == 1: - type_val = type_val[0] - else: - type_val = "" - - data = {"type": type_val, "description": description, "items": {}} - - # 递归处理对象和数组 - if type_val == "object": - for key, val in schema_node.get("properties", {}).items(): - data["items"][key] = _extract_type_desc(val) - elif type_val == "array": - items_schema = schema_node.get("items", {}) - if isinstance(items_schema, list): - item_index = 0 - for item in items_schema: - data["items"][f"item_{item_index}"] = _extract_type_desc(item) - item_index += 1 - else: - data["items"]["item"] = _extract_type_desc(items_schema) - if data["items"] == {}: - del data["items"] - return data - - return _extract_type_desc(self._schema) + return self._extract_type_desc(self._schema) def get_params_node_from_schema(self, root: str = "") -> ParamsNode: """从JSON Schema中提取ParamsNode""" @@ -328,8 +320,8 @@ class Slot: subParams=sub_params) try: return _extract_params_node(self._schema, name=root, path=root) - except Exception as e: - logger.error(f"[Slot] 提取ParamsNode失败: {e!s}\n{traceback.format_exc()}") + except Exception: + logger.exception("[Slot] 提取ParamsNode失败") return None def _flatten_schema(self, schema: dict[str, Any]) -> tuple[dict[str, Any], list[str]]: @@ -484,52 +476,47 @@ class Slot: return {} def add_null_to_basic_types(self) -> dict[str, Any]: - """ - 递归地为 JSON Schema 中的基础类型(bool、number等)添加 null 选项 - """ + """递归地为 JSON Schema 中的基础类型(bool、number等)添加 null 选项""" def add_null_to_basic_types(schema: dict[str, Any]) -> dict[str, Any]: """ - 递归地为 JSON Schema 中的基础类型(bool、number等)添加 null 选项 - - 参数: - schema (dict): 原始 JSON Schema + 递归地为 JSON Schema 中的基础类型(bool、number等)添加 null 选项 - 返回: - dict: 修改后的 JSON Schema + :param schema: 原始 JSON Schema + :return: 修改后的 JSON Schema """ # 如果不是字典类型(schema),直接返回 if not isinstance(schema, dict): return schema # 处理当前节点的 type 字段 - if 'type' in schema: + if "type" in schema: # 处理单一类型字符串 - if isinstance(schema['type'], str): - if schema['type'] in ['boolean', 'number', 'string', 'integer']: - schema['type'] = [schema['type'], 'null'] + if isinstance(schema["type"], str): + if schema["type"] in ["boolean", "number", "string", "integer"]: + schema["type"] = [schema["type"], "null"] # 处理类型数组 - elif isinstance(schema['type'], list): - for i, t in enumerate(schema['type']): - if isinstance(t, str) and t in ['boolean', 'number', 'string', 'integer']: - if 'null' not in schema['type']: - schema['type'].append('null') + elif isinstance(schema["type"], list): + for i, t in enumerate(schema["type"]): + if isinstance(t, str) and t in ["boolean", "number", "string", "integer"]: + if "null" not in schema["type"]: + schema["type"].append("null") break # 递归处理 properties 字段(对象类型) - if 'properties' in schema: - for prop, prop_schema in schema['properties'].items(): - schema['properties'][prop] = add_null_to_basic_types(prop_schema) + if "properties" in schema: + for prop, prop_schema in schema["properties"].items(): + schema["properties"][prop] = add_null_to_basic_types(prop_schema) # 递归处理 items 字段(数组类型) - if 'items' in schema: - schema['items'] = add_null_to_basic_types(schema['items']) + if "items" in schema: + schema["items"] = add_null_to_basic_types(schema["items"]) # 递归处理 anyOf, oneOf, allOf 字段 - for keyword in ['anyOf', 'oneOf', 'allOf']: + for keyword in ["anyOf", "oneOf", "allOf"]: if keyword in schema: schema[keyword] = [add_null_to_basic_types(sub_schema) for sub_schema in schema[keyword]] return schema schema_copy = copy.deepcopy(self._schema) - return add_null_to_basic_types(schema_copy) \ No newline at end of file + return add_null_to_basic_types(schema_copy) diff --git a/apps/schemas/config.py b/apps/schemas/config.py index ea8d40e56fa6e9894291006e99bc5c75b36c91a4..d81900a9025a05845e28cf936c42c404d6d60c70 100644 --- a/apps/schemas/config.py +++ b/apps/schemas/config.py @@ -88,27 +88,6 @@ class PostgresConfig(BaseModel): database: str = Field(description="Postgres数据库名") -class LLMConfig(BaseModel): - """LLM配置""" - - key: str = Field(description="LLM API密钥") - endpoint: str = Field(description="LLM API URL地址") - model: str = Field(description="LLM API 模型名") - max_tokens: int | None = Field(description="LLM API 最大Token数", default=None) - temperature: float | None = Field(description="LLM API 温度", default=None) - - -class FunctionCallConfig(BaseModel): - """Function Call配置""" - - backend: str = Field(description="Function Call 后端") - model: str = Field(description="Function Call 模型名") - endpoint: str = Field(description="Function Call API URL地址") - api_key: str = Field(description="Function Call API密钥") - max_tokens: int | None = Field(description="Function Call 最大Token数", default=None) - temperature: float | None = Field(description="Function Call 温度", default=None) - - class SecurityConfig(BaseModel): """安全配置""" @@ -142,8 +121,6 @@ class ConfigModel(BaseModel): minio: MinioConfig mongodb: MongoDBConfig postgres: PostgresConfig - llm: LLMConfig - function_call: FunctionCallConfig security: SecurityConfig check: CheckConfig extra: ExtraConfig diff --git a/apps/schemas/flow.py b/apps/schemas/flow.py index 0a1566de3d2e132e350ea92500b4238c3fea68f6..1fa8e560f220048452c90ca5755fdb2ecd46626f 100644 --- a/apps/schemas/flow.py +++ b/apps/schemas/flow.py @@ -43,15 +43,29 @@ class FlowError(BaseModel): output_format: str | None = Field(description="错误处理节点的输出格式", default=None) +class FlowBasicConfig(BaseModel): + """Flow的基本配置""" + + startStep: uuid.UUID = Field(description="开始节点ID") # noqa: N815 + endStep: uuid.UUID = Field(description="结束节点ID") # noqa: N815 + focusPoint: PositionItem | None = Field(description="当前焦点节点", default=PositionItem(x=0, y=0)) # noqa: N815 + + +class FlowCheckStatus(BaseModel): + """Flow的配置检查状态""" + + debug: bool = Field(description="是否经过调试", default=False) + connectivity: bool = Field(default=False, description="图的开始节点和结束节点是否联通,并且除结束节点都有出边") + + class Flow(BaseModel): """Flow(工作流)的数据格式""" name: str = Field(description="Flow的名称", min_length=1) - description: str = Field(description="Flow的描述 ") - connectivity: bool = Field(default=False, description="图的开始节点和结束节点是否联通,并且除结束节点都有出边") - focus_point: PositionItem | None = Field(description="当前焦点节点", default=PositionItem(x=0, y=0)) - debug: bool = Field(description="是否经过调试", default=False) - on_error: FlowError = FlowError(use_llm=True) + description: str = Field(description="Flow的描述") + checkStatus: FlowCheckStatus = Field(description="Flow的配置检查状态") # noqa: N815 + basicConfig: FlowBasicConfig = Field(description="Flow的基本配置") # noqa: N815 + onError: FlowError = FlowError(use_llm=True) # noqa: N815 steps: dict[uuid.UUID, Step] = Field(description="节点列表", default={}) edges: list[Edge] = Field(description="边列表", default=[]) @@ -113,7 +127,7 @@ class ServiceApiConfig(BaseModel): class ServiceMetadata(MetadataBase): """Service的元数据""" - id: str = Field(description="Service的ID") + id: uuid.UUID = Field(description="Service的ID") type: MetadataType = MetadataType.SERVICE api: ServiceApiConfig = Field(description="API配置") permission: Permission | None = Field(description="服务权限配置", default=None) diff --git a/apps/schemas/request_data.py b/apps/schemas/request_data.py index c03e63ea587fa6263bbbe1a934e7ec7e83f229b0..52dda71901c006e4536a0f0a3e13b64a39d672a6 100644 --- a/apps/schemas/request_data.py +++ b/apps/schemas/request_data.py @@ -155,6 +155,13 @@ class UpdateLLMReq(BaseModel): max_tokens: int = Field(default=8192, description="最大token数", alias="maxTokens") +class UpdateUserSpecialLLMReq(BaseModel): + """更新用户特殊LLM请求体""" + + function_call_llm_id: str = Field(description="Function Call LLM ID", alias="functionCallLlmId") + embedding_llm_id: str = Field(description="Embedding LLM ID", alias="embeddingLlmId") + + class DeleteLLMReq(BaseModel): """删除大模型请求体""" diff --git a/apps/schemas/response_data.py b/apps/schemas/response_data.py index d672b831ff63adc362257ccecebf9ce5c58fc38c..9d2ef79d7ea1f0c22c7352462d061d0e7b247a1e 100644 --- a/apps/schemas/response_data.py +++ b/apps/schemas/response_data.py @@ -607,7 +607,7 @@ class ParamsNode(BaseModel): class StepParams(BaseModel): """参数数据结构""" - step_id: str = Field(..., description="步骤ID", alias="stepId") + step_id: uuid.UUID = Field(..., description="步骤ID", alias="stepId") name: str = Field(..., description="Step名称") params_node: ParamsNode | None = Field( default=None, description="参数节点", alias="paramsNode") diff --git a/apps/schemas/scheduler.py b/apps/schemas/scheduler.py index c4a10ad6ab6b0ba56fcc4099718accc32e37fbe1..d52bf85aff044035129c27ae61536bd414c96e1a 100644 --- a/apps/schemas/scheduler.py +++ b/apps/schemas/scheduler.py @@ -6,8 +6,9 @@ from typing import Any from pydantic import BaseModel, Field +from apps.models.task import ExecutorHistory + from .enum_var import CallOutputType, LanguageType -from .task import FlowStepHistory class CallInfo(BaseModel): @@ -21,7 +22,7 @@ class CallIds(BaseModel): """Call的ID,来自于Task""" task_id: uuid.UUID = Field(description="任务ID") - flow_id: uuid.UUID = Field(description="Flow ID") + executor_id: str = Field(description="Flow ID") session_id: str = Field(description="当前用户的Session ID") app_id: uuid.UUID = Field(description="当前应用的ID") user_sub: str = Field(description="当前用户的用户ID") @@ -32,7 +33,7 @@ class CallVars(BaseModel): summary: str = Field(description="上下文信息") question: str = Field(description="改写后的用户输入") - history: dict[str, FlowStepHistory] = Field(description="Executor中历史工具的结构化数据", default={}) + history: dict[str, ExecutorHistory] = Field(description="Executor中历史工具的结构化数据", default={}) history_order: list[str] = Field(description="Executor中历史工具的顺序", default=[]) ids: CallIds = Field(description="Call的ID") language: LanguageType = Field(description="语言", default=LanguageType.CHINESE) diff --git a/apps/services/flow.py b/apps/services/flow.py index 5a6cc0f67ecfede782557a2d34676fdb6dd79924..4b57065c452358324c59e7de6dcacb0e68f60a9b 100644 --- a/apps/services/flow.py +++ b/apps/services/flow.py @@ -179,7 +179,7 @@ class FlowManager: raise ValueError(err) flow_config = await FlowLoader.load(app_id, flow_id) - focus_point = flow_config.focus_point or PositionItem(x=0, y=0) + focus_point = flow_config.focusPoint or PositionItem(x=0, y=0) flow_item = FlowItem( flowId=flow_id, name=flow_config.name, @@ -309,7 +309,7 @@ class FlowManager: description=flow_item.description, steps={}, edges=[], - focus_point=flow_item.focus_point, + focusPoint=flow_item.focus_point, connectivity=flow_item.connectivity, debug=False, ) diff --git a/apps/services/llm.py b/apps/services/llm.py index dd095fe02bccb6ad03d3edb270c7de5fa4597ab6..0cd31fefa4d03c049b76df0bca7b887f2df7fcd4 100644 --- a/apps/services/llm.py +++ b/apps/services/llm.py @@ -10,6 +10,7 @@ from apps.models.llm import LLMData from apps.models.user import User from apps.schemas.request_data import ( UpdateLLMReq, + UpdateUserSpecialLLMReq, ) from apps.schemas.response_data import LLMProvider, LLMProviderInfo from apps.templates.generate_llm_operator_config import llm_provider_dict @@ -204,3 +205,17 @@ class LLMManager: raise ValueError(err) user.defaultLLM = llm_id await session.commit() + + + @staticmethod + async def update_user_special_llm( + user_sub: str, + req: UpdateUserSpecialLLMReq, + ) -> None: + """更新用户的特殊LLM(Function Call、Embedding等)""" + async with postgres.session() as session: + user = (await session.scalars( + select(User).where(User.userSub == user_sub), + )).one_or_none() + if not user: + pass diff --git a/apps/services/service.py b/apps/services/service.py index 34fa0fa453ad0cb0464b4ff6e55f339fbaf9a7a7..e4ded160533310e128e4b1ece35780e24e4713e9 100644 --- a/apps/services/service.py +++ b/apps/services/service.py @@ -396,7 +396,7 @@ class ServiceCenterManager: @staticmethod async def get_service_metadata( user_sub: str, - service_id: str, + service_id: uuid.UUID, ) -> ServiceMetadata: """获取服务元数据""" async with postgres.session() as session: