diff --git a/apps/entities/flow.py b/apps/entities/flow.py index 1b4e42f4c5d19c04f9daa7eabc3dc5688db40296..4e16959d959a52090c2754bfe30142ec99d43220 100644 --- a/apps/entities/flow.py +++ b/apps/entities/flow.py @@ -27,9 +27,9 @@ class Edge(BaseModel): """Flow中Edge的数据""" id: str = Field(description="边的ID") - edge_from: str = Field(description="边的来源节点ID", alias="from") - edge_to: str = Field(description="边的目标节点ID", alias="to") - edge_type: Optional[EdgeType] = Field(description="边的类型", default=EdgeType.NORMAL) + edge_from: str = Field(description="边的来源节点ID") + edge_to: str = Field(description="边的目标节点ID") + edge_type: Optional[EdgeType] = Field(description="边的类型",default = EdgeType.NORMAL) class Step(BaseModel): @@ -159,6 +159,5 @@ class ServiceApiSpec(BaseModel): class FlowConfig(BaseModel): """Flow的配置信息 用于前期调试使用""" - app_id: str flow_id: str flow_config: Flow diff --git a/apps/manager/flow.py b/apps/manager/flow.py index b76589357e75638623cab817c125b68fa5d288a6..e468a1592b201c1e270c29d93703eb2d80b4e90c 100644 --- a/apps/manager/flow.py +++ b/apps/manager/flow.py @@ -5,10 +5,11 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. from typing import Optional from pymongo import ASCENDING +import ray from apps.constants import LOGGER -from apps.entities.enum_var import PermissionType -from apps.entities.flow import AppFlow, Edge, Flow, FlowConfig, Step, StepPos +from apps.entities.enum_var import MetadataType, PermissionType +from apps.entities.flow import AppMetadata, Edge, Flow, FlowConfig, Step, StepPos from apps.entities.flow_topology import ( EdgeItem, FlowItem, @@ -17,7 +18,9 @@ from apps.entities.flow_topology import ( NodeServiceItem, PositionItem, ) +from apps.entities.pool import AppFlow, AppPool from apps.models.mongo import MongoDB +from apps.scheduler.pool.loader.app import AppLoader from apps.scheduler.pool.loader.flow import FlowLoader @@ -187,7 +190,7 @@ class FlowManager: LOGGER.error(f"应用{app_id}不存在") return None cursor = app_collection.find( - {"_id": app_id, "flows._id": flow_id}, + {"_id": app_id, "flows.id": flow_id}, {"flows.$": 1}, # 只返回 flows 数组中符合条件的第一个元素 ) # 获取结果列表,并限制长度为1,因为我们只期待一个结果 @@ -204,8 +207,7 @@ class FlowManager: return None try: if flow_record: - flow_config= await FlowLoader.load(app_id, flow_id) - flow_config = flow_config.dict() + flow_config = await FlowLoader().load(app_id, flow_id) if not flow_config: LOGGER.error( "Get flow config by app_id and flow_id failed") @@ -213,33 +215,33 @@ class FlowManager: focus_point = flow_record["focus_point"] flow_item = FlowItem( flowId=flow_id, - name=flow_config["name"], - description=flow_config["description"], + name=flow_config.name, + description=flow_config.description, enable=True, editable=True, nodes=[], edges=[], - createdAt=flow_record["created_at"], + debug=flow_config.debug, ) - for node_id, node_config in flow_config["steps"].items(): + for node_id, node_config in flow_config.steps.items(): node_item = NodeItem( nodeId=node_id, - nodeMetaDataId=node_config["node"], - name=node_config["name"], - description=node_config["description"], + nodeMetaDataId=node_config.node, + name=node_config.name, + description=node_config.description, enable=True, editable=True, - type=node_config["type"], - parameters=node_config["params"], + type=node_config.type, + parameters=node_config.params, position=PositionItem( - x=node_config["pos"]["x"], y=node_config["pos"]["y"]), + x=node_config.pos.x, y=node_config.pos.y), ) flow_item.nodes.append(node_item) - for edge_config in flow_config["edges"]: - edge_from = edge_config["edge_from"] + for edge_config in flow_config.edges: + edge_from = edge_config.edge_from branch_id = "" - tmp_list = edge_config["edge_from"].split(".") + tmp_list = edge_config.edge_from.split(".") if len(tmp_list) == 0 or len(tmp_list) > 2: LOGGER.error("edge from format error") continue @@ -247,10 +249,10 @@ class FlowManager: edge_from = tmp_list[0] branch_id = tmp_list[1] flow_item.edges.append(EdgeItem( - edgeId=edge_config["id"], + edgeId=edge_config.id, sourceNode=edge_from, - targetNode=edge_config["edge_to"], - type=edge_config["edge_type"], + targetNode=edge_config.edge_to, + type=edge_config.edge_type, branchId=branch_id, )) return (flow_item, focus_point) @@ -319,40 +321,47 @@ class FlowManager: edge_type=edge_item.type ) flow_config.edges.append(edge_config) - await FlowLoader.save(app_id, flow_id, flow_config) - flow_config = await FlowLoader.load(app_id, flow_id) - if flow_record: - app_collection = MongoDB.get_collection("app") - result = await app_collection.find_one_and_update( - {"_id": app_id}, - { - "$set": { - "flows.$[element].focus_point": focus_point.model_dump(by_alias=True), - }, - }, - array_filters=[{"element._id": flow_id}], - return_document=True, # 返回更新后的文档 - ) - if result is None: - LOGGER.error("Update flow failed") - return None - return result - new_flow = AppFlow( - _id=flow_id, - name=flow_item.name, - description=flow_item.description, - path="", - focus_point=PositionItem(x=focus_point.x, y=focus_point.y), - ) + await FlowLoader().save(app_id, flow_id, flow_config) + flow_config = await FlowLoader().load(app_id, flow_id) + # 修改app内flow信息 app_collection = MongoDB.get_collection("app") - result = await app_collection.find_one_and_update( - {"_id": app_id}, - { - "$push": { - "flows": new_flow.model_dump(by_alias=True), - }, - }, + result = await app_collection.find_one({"_id": app_id}) + app_pool = AppPool.model_validate(result) + metadata = AppMetadata( + type=MetadataType.APP, + id=app_pool.id, + icon=app_pool.icon, + name=app_pool.name, + description=app_pool.description, + version="1.0", + author=app_pool.author, + hashes=app_pool.hashes, + published=app_pool.published, + links=app_pool.links, + first_questions=app_pool.first_questions, + history_len=app_pool.history_len, + permission=app_pool.permission, + flows=app_pool.flows, ) + if flow_record: + for flow in metadata.flows: + if flow.id == flow_id: + flow.name = flow_item.name + flow.description = flow_item.description + flow.path = "" + flow.focus_point = PositionItem(x=focus_point.x, y=focus_point.y) + else: + new_flow = AppFlow( + id=flow_id, + name=flow_item.name, + description=flow_item.description, + path="", + focus_point=PositionItem(x=focus_point.x, y=focus_point.y), + ) + metadata.flows.append(new_flow) + app_loader = AppLoader.remote() + await app_loader.save.remote(metadata, app_id) # type: ignore[attr-type] + ray.kill(app_loader) if result is None: LOGGER.error("Add flow failed") return None @@ -371,17 +380,34 @@ class FlowManager: :return: 流的id """ try: - result = await FlowLoader.delete(app_id, flow_id) - app_pool_collection = MongoDB.get_collection("app") # 获取集合 - - result = await app_pool_collection.find_one_and_update( - {"_id": app_id}, - { - "$pull": { - "flows": {"_id": flow_id}, - }, - }, + result = await FlowLoader().delete(app_id, flow_id) + # 修改app内flow信息 + app_collection = MongoDB.get_collection("app") + result = await app_collection.find_one({"_id": app_id}) + app_pool = AppPool.model_validate(result) + metadata = AppMetadata( + type=MetadataType.APP, + id=app_pool.id, + icon=app_pool.icon, + name=app_pool.name, + description=app_pool.description, + version="1.0", + author=app_pool.author, + hashes=app_pool.hashes, + published=app_pool.published, + links=app_pool.links, + first_questions=app_pool.first_questions, + history_len=app_pool.history_len, + permission=app_pool.permission, + flows=app_pool.flows, ) + if flow_record: + for flow in metadata.flows: + if flow.id == flow_id: + metadata.flows.remove(flow) + app_loader = AppLoader.remote() + await app_loader.save.remote(metadata, app_id) # type: ignore[attr-type] + ray.kill(app_loader) if result is None: LOGGER.error("Delete flow from app pool failed") return None diff --git a/apps/manager/node.py b/apps/manager/node.py index 05f0a3d76dce89b9e80f7c1a71611ae59409ecd5..b0d23bff9a700e6dec0340e49ff1060758c5480b 100644 --- a/apps/manager/node.py +++ b/apps/manager/node.py @@ -14,3 +14,13 @@ class NodeManager: err = f"[NodeManager] Node {node_id} not found." raise ValueError(err) return node["call_id"] + + @staticmethod + async def get_node_name(node_id: str) -> str: + """获取Node的名称""" + node_collection = MongoDB().get_collection("node") + node = await node_collection.find_one({"id": node_id}, {"name": 1}) + if not node: + err = f"[NodeManager] Node {node_id} not found." + raise ValueError(err) + return node["name"] \ No newline at end of file diff --git a/apps/routers/chat.py b/apps/routers/chat.py index 58ab7fe8b019e46df14e85f719626915dc4eadf4..2b9162e6cc17a20b2b34555e92bb46c4578e55f8 100644 --- a/apps/routers/chat.py +++ b/apps/routers/chat.py @@ -11,7 +11,7 @@ from typing import Annotated import ray from fastapi import APIRouter, Depends, HTTPException, status from fastapi.responses import JSONResponse, StreamingResponse - +from apps.routers.mock import mock_data from apps.common.queue import MessageQueue from apps.constants import LOGGER from apps.dependency import ( @@ -24,7 +24,7 @@ from apps.entities.request_data import RequestData from apps.entities.response_data import ResponseData from apps.manager.appcenter import AppCenterManager from apps.manager.blacklist import QuestionBlacklistManager, UserBlacklistManager -from apps.scheduler.scheduler import Scheduler +# from apps.scheduler.scheduler import Scheduler from apps.service.activity import Activity RECOMMEND_TRES = 5 @@ -35,80 +35,80 @@ router = APIRouter( ) -async def chat_generator(post_body: RequestData, user_sub: str, session_id: str) -> AsyncGenerator[str, None]: - """进行实际问答,并从MQ中获取消息""" - try: - await Activity.set_active(user_sub) +# async def chat_generator(post_body: RequestData, user_sub: str, session_id: str) -> AsyncGenerator[str, None]: +# """进行实际问答,并从MQ中获取消息""" +# try: +# await Activity.set_active(user_sub) - # 敏感词检查 - word_check = ray.get_actor("words_check") - if await word_check.check.remote(post_body.question) != 1: - yield "data: [SENSITIVE]\n\n" - LOGGER.info(msg="问题包含敏感词!") - await Activity.remove_active(user_sub) - return +# # 敏感词检查 +# word_check = ray.get_actor("words_check") +# if await word_check.check.remote(post_body.question) != 1: +# yield "data: [SENSITIVE]\n\n" +# LOGGER.info(msg="问题包含敏感词!") +# await Activity.remove_active(user_sub) +# return - # 生成group_id - group_id = str(uuid.uuid4()) if not post_body.group_id else post_body.group_id +# # 生成group_id +# group_id = str(uuid.uuid4()) if not post_body.group_id else post_body.group_id - # 创建或还原Task - task_pool = ray.get_actor("task") - task = await task_pool.get_task.remote(session_id=session_id, post_body=post_body) - task_id = task.record.task_id +# # 创建或还原Task(获取task_id) +# task_pool = ray.get_actor("task") +# task = await task_pool.get_task.remote(session_id=session_id, post_body=post_body) +# task_id = task.record.task_id - task.record.group_id = group_id - post_body.group_id = group_id - await task_pool.set_task.remote(task_id, task) +# task.record.group_id = group_id +# post_body.group_id = group_id +# await task_pool.set_task.remote(task_id, task) - # 创建queue;由Scheduler进行关闭 - queue = MessageQueue() - await queue.init(task_id, enable_heartbeat=True) +# # 创建queue;由Scheduler进行关闭 +# queue = MessageQueue() +# await queue.init(task_id, enable_heartbeat=True) - # 在单独Task中运行Scheduler,拉齐queue.get的时机 - scheduler = Scheduler(task_id, queue) - scheduler_task = asyncio.create_task(scheduler.run(user_sub, session_id, post_body)) +# # 在单独Task中运行Scheduler,拉齐queue.get的时机 +# scheduler = Scheduler(task_id, queue) +# scheduler_task = asyncio.create_task(scheduler.run(user_sub, session_id, post_body)) - # 处理每一条消息 - async for event in queue.get(): - if event[:6] == "[DONE]": - break +# # 处理每一条消息 +# async for event in queue.get(): +# if event[:6] == "[DONE]": +# break - yield "data: " + event + "\n\n" +# yield "data: " + event + "\n\n" - # 等待Scheduler运行完毕 - await asyncio.gather(scheduler_task) +# # 等待Scheduler运行完毕 +# await asyncio.gather(scheduler_task) - # 获取最终答案 - task = await task_pool.get_task.remote(task_id) - answer_text = task.record.content.answer - if not answer_text: - LOGGER.error(msg="Answer is empty") - yield "data: [ERROR]\n\n" - await Activity.remove_active(user_sub) - return +# # 获取最终答案 +# task = await task_pool.get_task.remote(task_id) +# answer_text = task.record.content.answer +# if not answer_text: +# LOGGER.error(msg="Answer is empty") +# yield "data: [ERROR]\n\n" +# await Activity.remove_active(user_sub) +# return - # 对结果进行敏感词检查 - if await word_check.check.remote(answer_text) != 1: - yield "data: [SENSITIVE]\n\n" - LOGGER.info(msg="答案包含敏感词!") - await Activity.remove_active(user_sub) - return +# # 对结果进行敏感词检查 +# if await word_check.check.remote(answer_text) != 1: +# yield "data: [SENSITIVE]\n\n" +# LOGGER.info(msg="答案包含敏感词!") +# await Activity.remove_active(user_sub) +# return - # 创建新Record,存入数据库 - await scheduler.save_state(user_sub, post_body) - # 保存Task,从task_map中删除task - await task_pool.save_task.remote(task_id) +# # 创建新Record,存入数据库 +# await scheduler.save_state(user_sub, post_body) +# # 保存Task,从task_map中删除task +# await task_pool.save_task.remote(task_id) - yield "data: [DONE]\n\n" +# yield "data: [DONE]\n\n" - except Exception as e: - LOGGER.error(msg=f"生成答案失败:{e!s}\n{traceback.format_exc()}") - yield "data: [ERROR]\n\n" +# except Exception as e: +# LOGGER.error(msg=f"生成答案失败:{e!s}\n{traceback.format_exc()}") +# yield "data: [ERROR]\n\n" - finally: - if scheduler_task: - scheduler_task.cancel() - await Activity.remove_active(user_sub) +# finally: +# if scheduler_task: +# scheduler_task.cancel() +# await Activity.remove_active(user_sub) @router.post("/chat", dependencies=[Depends(verify_csrf_token), Depends(verify_user)]) @@ -130,7 +130,12 @@ async def chat( if post_body.app and post_body.app.app_id: await AppCenterManager.update_recent_app(user_sub, post_body.app.app_id) - res = chat_generator(post_body, user_sub, session_id) + # res = chat_generator(post_body, user_sub, session_id) + + if post_body.app and post_body.app.app_id: + res = mock_data(appId=post_body.app.app_id, conversationId=post_body.conversation_id, flowId=post_body.app.flow_id,question=post_body.question) + else: + res = mock_data(question=post_body.question) return StreamingResponse( content=res, media_type="text/event-stream", diff --git a/apps/routers/mock.py b/apps/routers/mock.py index dc753ef1ffd3b6530c1592707172559bdc1a8d3f..f2b7017c291e0333f5aa0a8d01accc51d28d7db8 100644 --- a/apps/routers/mock.py +++ b/apps/routers/mock.py @@ -3,7 +3,10 @@ import copy import json import random import time +from typing import Any, AsyncGenerator, Dict, Optional +import aiohttp +from pydantic import BaseModel, Field import tiktoken from fastapi import APIRouter, Depends, HTTPException, status from fastapi.responses import StreamingResponse @@ -16,8 +19,203 @@ from apps.dependency import ( verify_user, ) from apps.entities.request_data import MockRequestData, RequestData +from apps.entities.scheduler import CallError, SysCallVars from apps.manager.flow import FlowManager from apps.scheduler.pool.loader.flow import FlowLoader +from datetime import datetime +from textwrap import dedent +from typing import Any + +import pytz +from jinja2 import BaseLoader, select_autoescape +from jinja2.sandbox import SandboxedEnvironment +from pydantic import BaseModel, Field + +from apps.entities.scheduler import CallError, SysCallVars +from apps.scheduler.call.core import CoreCall + + +"""问答大模型调用 + +Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. +""" +from collections.abc import AsyncGenerator +from typing import Optional + +import ray +import tiktoken +from openai import AsyncOpenAI + +from apps.common.config import config +from apps.constants import LOGGER, REASONING_BEGIN_TOKEN, REASONING_END_TOKEN + + +class ReasoningLLM: + """调用用于问答的大模型""" + + _encoder = tiktoken.get_encoding("cl100k_base") + + def __init__(self) -> None: + """判断配置文件里用了哪种大模型;初始化大模型客户端""" + if not config["LLM_KEY"]: + self._client = AsyncOpenAI( + base_url=config["LLM_URL"], + ) + return + + self._client = AsyncOpenAI( + api_key=config["LLM_KEY"], + base_url=config["LLM_URL"], + ) + + def _calculate_token_length(self, messages: list[dict[str, str]], *, pure_text: bool = False) -> int: + """使用ChatGPT的cl100k tokenizer,估算Token消耗量""" + result = 0 + if not pure_text: + result += 3 * (len(messages) + 1) + + for msg in messages: + result += len(self._encoder.encode(msg["content"])) + + return result + + def _validate_messages(self, messages: list[dict[str, str]]) -> list[dict[str, str]]: + """验证消息格式是否正确""" + if messages[0]["role"] != "system": + # 添加默认系统消息 + messages.insert(0, {"role": "system", "content": "You are a helpful assistant."}) + + if messages[-1]["role"] != "user": + err = f"消息格式错误,最后一个消息必须是用户消息:{messages[-1]}" + raise ValueError(err) + + return messages + + async def call(self, messages: list[dict[str, str]], # noqa: C901 + max_tokens: Optional[int] = None, temperature: Optional[float] = None, + *, streaming: bool = True, result_only: bool = True) -> AsyncGenerator[str, None]: + """调用大模型,分为流式和非流式两种""" + # input_tokens = self._calculate_token_length(messages) + try: + + msg_list = self._validate_messages(messages) + except ValueError as e: + err = f"消息格式错误:{e}" + raise ValueError(err) from e + + if max_tokens is None: + max_tokens = config["LLM_MAX_TOKENS"] + if temperature is None: + temperature = config["LLM_TEMPERATURE"] + + stream = await self._client.chat.completions.create( + model=config["LLM_MODEL"], + messages=msg_list, # type: ignore[] + max_tokens=max_tokens, + temperature=temperature, + stream=True, + ) # type: ignore[] + + reasoning_content = "" + result = "" + + is_first_chunk = True + is_reasoning = False + reasoning_type = "" + + async for chunk in stream: + # 当前Chunk内的信息 + reason = "" + text = "" + + if is_first_chunk: + if hasattr(chunk.choices[0].delta, "reasoning_content"): + reason = "" + chunk.choices[0].delta.reasoning_content or "" + reasoning_type = "args" + is_reasoning = True + else: + for token in REASONING_BEGIN_TOKEN: + if token == (chunk.choices[0].delta.content or ""): + reason = "" + reasoning_type = "tokens" + is_reasoning = True + break + + # 当前已经不是第一个Chunk了 + is_first_chunk = False + + # 当前是正常问答 + if not is_reasoning: + text = chunk.choices[0].delta.content or "" + + # 当前处于推理状态 + if not is_first_chunk and is_reasoning: + # 如果推理内容用特殊参数传递 + if reasoning_type == "args": + # 还在推理 + if hasattr(chunk.choices[0].delta, "reasoning_content"): + reason = chunk.choices[0].delta.reasoning_content or "" + # 推理结束 + else: + is_reasoning = False + reason = "" + + # 如果推理内容用特殊token传递 + elif reasoning_type == "tokens": + # 结束推理 + for token in REASONING_END_TOKEN: + if token == (chunk.choices[0].delta.content or ""): + is_reasoning = False + reason = "" + text = "" + break + # 还在推理 + if is_reasoning: + reason = chunk.choices[0].delta.content or "" + + # 推送消息 + if streaming: + # 如果需要推送推理内容 + if reason and not result_only: + yield reason + + # 推送text + yield text + + # 整理结果 + reasoning_content += reason + result += text + + if not streaming: + if not result_only: + yield reasoning_content + yield result + + # LOGGER.info(f"推理LLM:{reasoning_content}\n\n{result}") + + # output_tokens = self._calculate_token_length([{"role": "assistant", "content": result}], pure_text=True) + # task = ray.get_actor("task") + # await task.update_token_summary.remote(input_tokens, output_tokens) + + +class _RAGParams(BaseModel): + """RAG工具的参数""" + + knowledge_base: str = Field(description="知识库的id", alias="kb_sn") + top_k: int = Field(description="返回的答案数量(经过整合以及上下文关联)", default=5) + methods: Optional[list[str]] = Field(description="rag检索方法") + + +class _RAGOutputList(BaseModel): + """RAG工具的输出""" + + corpus: list[str] = Field(description="知识库的语料列表") + + +class _RAGOutput(BaseModel): + """RAG工具的输出""" + + output: _RAGOutputList = Field(description="RAG工具的输出") router = APIRouter( prefix="/api", @@ -25,8 +223,9 @@ router = APIRouter( ) -def mock_data(appId="68dd3d90-6a97-4da0-aa62-d38a81c7d2f5", flowId="966c7964-e1c1-4bd8-9333-ed099cf25908", conversationId="eccb08c3-0621-4602-a4d2-4eaada892557", question="你好"): +async def mock_data(appId="68dd3d90-6a97-4da0-aa62-d38a81c7d2f5", flowId="966c7964-e1c1-4bd8-9333-ed099cf25908", conversationId="eccb08c3-0621-4602-a4d2-4eaada892557", question="你好"): _encoder = tiktoken.get_encoding("cl100k_base") + start_message = [{ # 任务流开始 "event": "flow.start", "id": "0f9d3e6b-7845-44ab-b247-35c522d38f13", @@ -40,11 +239,6 @@ def mock_data(appId="68dd3d90-6a97-4da0-aa62-d38a81c7d2f5", flowId="966c7964-e1c "stepStatus": "pending", }, "content": { - "question": "查询所有主机的CVE信息", - "params": { - "cveId": "CVE-2021-44228", - "host": "192.168.10.1" - } }, "metadata": { "inputTokens": 200, @@ -117,12 +311,25 @@ def mock_data(appId="68dd3d90-6a97-4da0-aa62-d38a81c7d2f5", flowId="966c7964-e1c "time_cost": 0.5, } } + messages = [] + for message in start_message: + messages.append(message) + for message in messages: + if message['event']=='step.output': + t=message['metadata']['time_cost'] + time.sleep(t) + elif message['event']=='text.add': + t=random.uniform(0.15, 0.2) + time.sleep(t) + yield "data: " + json.dumps(message,ensure_ascii=False) + "\n\n" mid_message = [] - flow = asyncio.run(FlowLoader.load(appId, flowId)) + flow = await FlowLoader.load(appId, flowId) now_flow_item = "start" start_time = time.time() last_item = "" mapp={} + params={} + params["question"] = question # print(json.dumps(flow)) for step_id, step in flow.steps.items(): mapp[step_id]= step.name, step.params @@ -138,26 +345,38 @@ def mock_data(appId="68dd3d90-6a97-4da0-aa62-d38a81c7d2f5", flowId="966c7964-e1c sample_input["flow"]["stepId"] = now_flow_item sample_input["flow"]["stepName"],sample_input["content"] = mapp[now_flow_item] sample_input["content"] = sample_input["content"]["input_parameters"] if now_flow_item != "start" else sample_input["content"] - mid_message.append(copy.deepcopy(sample_input)) + if "content" in sample_input and type(sample_input["content"])==dict: + for key, value in sample_input["content"].items(): + if key in params: + sample_input["content"][key] = params[key] + else: + params[key] = value + yield "data: " + json.dumps(sample_input,ensure_ascii=False) + "\n\n" sample_output["metadata"]["time_cost"] = random.uniform(0.5, 1.5) sample_output["flow"]["stepId"] = now_flow_item sample_output["flow"]["stepName"],sample_output["content"] = mapp[now_flow_item] sample_output["content"] = sample_output["content"]["output_parameters"] if now_flow_item != "start" else sample_output["content"] if sample_output["flow"]["stepName"] == "【RAG】知识库智能问答": - sample_output["content"] = call_rag() - if sample_output["flow"]["stepName"] == "【LLM】模型问答": - sample_output["content"] = call_llm() - mid_message.append(copy.deepcopy(sample_output)) + sample_output["content"] = await call_rag(params) + if sample_output["flow"]["stepName"] == "【LLM】大模型问答": + sample_output["content"] = await call_llm(params) + if "content" in sample_output and type(sample_output["content"])==dict: + for key, value in sample_output["content"].items(): + params[key]=value + yield "data: " + json.dumps(sample_output,ensure_ascii=False) + "\n\n" now_flow_item = edge.edge_to if now_flow_item == "end": sample_input["flow"]["stepId"] = now_flow_item sample_input["flow"]["stepName"] = "结束" - mid_message.append(sample_input) + yield "data: " + json.dumps(sample_input,ensure_ascii=False) + "\n\n" sample_output["flow"]["stepId"] = now_flow_item sample_output["flow"]["stepName"] = "结束" sample_output["metadata"]["time_cost"] = random.uniform(0.5, 1.5) - mid_message.append(sample_output) + yield "data: " + json.dumps(sample_output,ensure_ascii=False) + "\n\n" + + + end_message = [ { # flow结束 @@ -181,33 +400,113 @@ def mock_data(appId="68dd3d90-6a97-4da0-aa62-d38a81c7d2f5", flowId="966c7964-e1c "time_cost": 0.5 } }] - chat_message = [ - ] messages = [] - for message in start_message: - messages.append(message) - for message in mid_message: - messages.append(message) for message in end_message: messages.append(message) - for message in chat_message: - messages.append(message) - asyncio.run(FlowManager.updata_flow_debug_by_app_and_flow_id(appId,flowId,True)) + for message in messages: + if message['event']=='step.output': + t=message['metadata']['time_cost'] + time.sleep(t) + elif message['event']=='text.add': + t=random.uniform(0.15, 0.2) + time.sleep(t) + yield "data: " + json.dumps(message,ensure_ascii=False) + "\n\n" - for message in messages: - if message['event']=='step.output': - t=message['metadata']['time_cost'] - time.sleep(t) - elif message['event']=='text.add': - t=random.uniform(0.15, 0.2) - time.sleep(t) - yield "data: " + json.dumps(message,ensure_ascii=False) + "\n\n" + chat_message = call_llm_stream(params) + messages = [] + temp_messages = [] + async for message in chat_message: + yield "data: "+ message + "\n\n" + yield json.dumps({"event": "text.end", "content": "|", "input_tokens": len(_encoder.encode(question)), "output_tokens": 290}) + await FlowManager.updata_flow_debug_by_app_and_flow_id(appId,flowId,True) + + +async def call_rag(params:dict = {}): + url = config["RAG_HOST"].rstrip("/") + "/chunk/get" + headers = { + "Content-Type": "application/json", + } + params_dict = { + "kb_sn": params["kb_sn"], + "top_k": params["top_k"], + "content": params["question"], + } + # 发送 GET 请求 + async with aiohttp.ClientSession() as session: + async with session.get(url, headers=headers, params=params_dict) as response: + # 检查响应状态码 + if response.status == status.HTTP_200_OK: + result = await response.json() + chunk_list = result["data"] + for chunk in chunk_list: + chunk.replace('\n\n','') + return {"chunk_list":chunk_list} + text = await response.text() + raise CallError( + message=f"rag调用失败:{text}", + data={ + "question": params["question"], + "status": response.status, + "text": text, + }, + ) + +async def call_llm(params: dict = {}): + # 构建请求 URL 和 headers + url = config["LLM_URL"] + "/chat/completions" + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {config['LLM_KEY']}", # 添加鉴权 Token + } + prompt = params.get("prompt", "") + chunk_list = params.get("chunk_list", "") + user_call = "请回答问题" + params.get("quetion", "") + "下面是获得的信息:" + # 构建请求体 + payload = { + "model": params.get("model", config["LLM_MODEL"]), # 默认模型 + "messages": params.get("messages", [{"role":"system","content":prompt},{'role':'user','content':user_call+str(chunk_list)}]), # 消息列表 + "stream": params.get("stream", False), # 是否流式返回 + "n": params.get("n", 1), # 返回的候选答案数量 + "max_tokens": params.get("max_tokens", 4096), # 最大 token 数量 + } + + # 发送 POST 请求 + async with aiohttp.ClientSession() as session: + async with session.post(url, headers=headers, json=payload) as response: + # 检查响应状态码 + if response.status == status.HTTP_200_OK: + result = await response.json() + result = result["choices"][0]["message"]["content"] + print(result) + result = result.replace("\n\n", "") + return {"content":result} + else: + text = await response.text() + print(f"LLM 调用失败:{text}") + return None -async def call_rag(): - return "RAG" +class _LLMOutput(BaseModel): + """定义LLM工具调用的输出""" + + message: str = Field(description="大模型输出的文字信息") +async def call_llm_stream(params: Dict[str, Any] = {}): + _encoder = tiktoken.get_encoding("cl100k_base") + prompt = "你是EulerCopilot,我们向你问了一个问题,需要你完成这个问题,我们会给出对应的信息" + question = params.get("question", "") + content = f"问题:{question}\n" + "信息:" + str(params.get("chunk_list", "")) + message = params.get("messages", [{"role":"system","content":prompt},{'role':'user','content':content}]) + sum = 0 + async for chunk in ReasoningLLM().call(messages=message): + sum = sum + len(_encoder.encode(chunk)) + chunk = chunk.replace('\n\n','') + output = { + "event": "text.add", + "content": chunk, + "input_tokens": len(_encoder.encode(question)), + "output_tokens": sum , + } + yield json.dumps(output,ensure_ascii=False) -async def call_llm(): - return "LLM" @router.post("/mock/chat", dependencies=[Depends(verify_csrf_token), Depends(verify_user)]) async def chat( diff --git a/apps/scheduler/pool/loader/app.py b/apps/scheduler/pool/loader/app.py index 98f72b59cb8a9d09679627413d6d6e17d3ffd618..12f5b93654bc63797fff0e2b390d6b08fd4b90fb 100644 --- a/apps/scheduler/pool/loader/app.py +++ b/apps/scheduler/pool/loader/app.py @@ -53,7 +53,7 @@ class AppLoader: async for flow_file in flow_path.rglob("*.yaml"): if flow_file.stem not in flow_ids: LOGGER.warning(f"[AppLoader] 工作流 {flow_file} 不在元数据中") - flow = await flow_loader.load(flow_file) + flow = await flow_loader.load(app_id, flow_file.stem) if not flow: err = f"[AppLoader] 工作流 {flow_file} 加载失败" LOGGER.error(err) diff --git a/apps/scheduler/pool/loader/flow.py b/apps/scheduler/pool/loader/flow.py index 3140feb4217ca63f6469dc77dc7ddece193b30c3..2baf03e6ea07aeefb9fdb8ddf5d9793c2dcb6890 100644 --- a/apps/scheduler/pool/loader/flow.py +++ b/apps/scheduler/pool/loader/flow.py @@ -5,21 +5,24 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. from typing import Optional import aiofiles +from fastapi.encoders import jsonable_encoder import yaml from anyio import Path from apps.common.config import config from apps.constants import APP_DIR, FLOW_DIR, LOGGER from apps.entities.enum_var import EdgeType -from apps.entities.flow import Flow +from apps.entities.flow import Flow, FlowConfig from apps.manager.node import NodeManager +from apps.models.mongo import MongoDB class FlowLoader: """工作流加载器""" - async def load(self, flow_path: Path) -> Optional[Flow]: + async def load(self, app_id, flow_id) -> Optional[Flow]: """从文件系统中加载【单个】工作流""" + flow_path = Path(config["SEMANTICS_DIR"]) / "app" / app_id / "flow" / f"{flow_id}.yaml" async with aiofiles.open(flow_path, encoding="utf-8") as f: flow_yaml = yaml.safe_load(await f.read()) @@ -55,11 +58,11 @@ class FlowLoader: if key == "start": step["name"] = "开始" step["description"] = "开始节点" - step["type"] = "none" + step["type"] = "start" elif key == "end": step["name"] = "结束" step["description"] = "结束节点" - step["type"] = "none" + step["type"] = "end" else: step["type"] = await NodeManager.get_node_call_id(step["node"]) step["name"] = await NodeManager.get_node_name(step["node"]) if "name" not in step or step["name"] == "" else step["name"] @@ -70,12 +73,15 @@ class FlowLoader: except Exception as e: LOGGER.error(f"Invalid flow format: {e}") return None + await self._updata_db(FlowConfig(flow_config=flow, flow_id=flow_id)) + return flow async def save(self, app_id: str, flow_id: str, flow: Flow) -> None: """保存工作流""" - flow_path = Path(config["SERVICE_DIR"]) / APP_DIR / app_id / FLOW_DIR / f"{flow_id}.yaml" + await self._updata_db(FlowConfig(flow_config=flow, flow_id=flow_id)) + flow_path = Path(config["SEMANTICS_DIR"]) / "app" / app_id / "flow" / f"{flow_id}.yaml" if not await flow_path.parent.exists(): await flow_path.parent.mkdir(parents=True) if not await flow_path.exists(): @@ -91,7 +97,10 @@ class FlowLoader: "description": step.description, "node": step.node, "params": step.params, - "pos": step.pos, + "pos": { + "x":step.pos.x, + "y":step.pos.y, + }, } for step_id, step in flow.steps.items() }, @@ -104,6 +113,7 @@ class FlowLoader: } for edge in flow.edges ], + "debug": flow.debug, } async with aiofiles.open(flow_path, mode="w", encoding="utf-8") as f: @@ -117,11 +127,58 @@ class FlowLoader: if await flow_path.is_file(): try: await flow_path.unlink() - LOGGER.info(f"Successfully deleted flow file: {flow_path}") - return True + LOGGER.info(f"[FlowLoader] Successfully deleted flow file: {flow_path}") except OSError as e: - LOGGER.error(f"Failed to delete flow file {flow_path}: {e}") + LOGGER.error(f"[FlowLoader] Failed to delete flow file {flow_path}: {e}") return False else: - LOGGER.warning(f"Flow file does not exist or is not a file: {flow_path}") + LOGGER.warning(f"[FlowLoader] Flow file does not exist or is not a file: {flow_path}") + return False + + flow_collection = MongoDB.get_collection("flow") + try: + await flow_collection.delete_one({"_id": flow_id}) + except Exception as e: + LOGGER.error(f"[FlowLoader] Failed to delete flow from database: {e}") return False + + + async def _updata_db(self, flow_config: FlowConfig): + """更新数据库""" + try: + flow_collection = MongoDB.get_collection("flow") + flow = flow_config.flow_config + # 查询条件为app_id + if await flow_collection.find_one({"_id": flow_config.flow_id}) is None: + # 创建应用时需写入完整数据结构,自动初始化创建时间、flow列表、收藏列表和权限 + await flow_collection.insert_one( + jsonable_encoder( + Flow( + name=flow.name, + description=flow.description, + on_error=flow.on_error, + steps=flow.steps, + edges=flow.edges, + debug=flow.debug, + ), + ), + ) + else: + # 更新应用数据:部分映射 AppMetadata 到 AppPool,其他字段不更新 + await flow_collection.update_one( + {"_id":flow_config.flow_id}, + jsonable_encoder( + Flow( + name=flow.name, + description=flow.description, + on_error=flow.on_error, + steps=flow.steps, + edges=flow.edges, + debug=flow.debug, + ), + ), + ) + except Exception as e: + err=f"[FlowLoader] Failed to update flow in database: {e}" + LOGGER.error(err) + raise ValueError(err) diff --git a/apps/scheduler/scheduler/scheduler.py b/apps/scheduler/scheduler/scheduler.py index d9c7a9a17a10774d628d6183a04e9ca6b4f9f22a..6ea55901c78688d0142b2405230a00c4f4484a73 100644 --- a/apps/scheduler/scheduler/scheduler.py +++ b/apps/scheduler/scheduler/scheduler.py @@ -26,8 +26,8 @@ from apps.manager.document import DocumentManager from apps.manager.record import RecordManager from apps.manager.task import TaskManager from apps.manager.user import UserManager -from apps.scheduler.executor import Executor from apps.scheduler.scheduler.context import generate_facts, get_context +from apps.scheduler.scheduler.flow import Flow, FlowChooser from apps.scheduler.scheduler.message import ( push_document_message, push_init_message, @@ -70,6 +70,8 @@ class Scheduler: """运行调度器""" try: # 根据用户的请求,返回插件ID列表,选择Flow + flow_chooser = FlowChooser(task_id=self._task_id, question=post_body.question,user_selected=post_body.app) + user_selected_flow = flow_chooser.choose_flow() # 获取当前问答可供关联的文档 docs, doc_ids = await self._get_docs(user_sub, post_body) # 获取上下文;最多20轮 @@ -90,7 +92,7 @@ class Scheduler: ) # 如果是智能问答,直接执行 - if not post_body.app or post_body.app.app_id == "": + if not user_selected_flow: await push_init_message(self._task_id, self._queue, post_body, is_flow=False) await asyncio.sleep(0.1) for doc in docs: @@ -108,6 +110,7 @@ class Scheduler: conversation=context, facts=facts, ) + need_recommend = await self.run_executor(session_id, post_body, background, user_selected_flow) # 记忆提取 self._facts = await generate_facts(self._task_id, post_body.question) @@ -139,6 +142,7 @@ class Scheduler: app_data=selected_flow, background=background, ) + # print("begin_to_run") # 执行Executor # flow_exec = Executor() diff --git a/apps/utils/flow.py b/apps/utils/flow.py index a8613f0ee6326f15bb6c70a4c361dbd7c2b9f289..f4e51939c47c598b2ef87941b2bb730655b8cd24 100644 --- a/apps/utils/flow.py +++ b/apps/utils/flow.py @@ -32,9 +32,9 @@ class FlowService: node_to_branches[node.node_id].add('') new_edges_items = [] for edge in flow_item.edges: - if edge.source_node not in node_to_branches.keys(): + if edge.source_node not in node_to_branches: continue - if edge.target_node not in node_to_branches.keys(): + if edge.target_node not in node_to_branches: continue if edge.branch_id not in node_to_branches[edge.source_node]: continue