diff --git a/mcp_center/servers/remote_info/src/server.py b/mcp_center/servers/remote_info/src/server.py index 3f9721b13fe4c359bc62d27df8fde4f30ac89fee..43be2c1122aaa9758ddb58851efa7704b20d7e10 100644 --- a/mcp_center/servers/remote_info/src/server.py +++ b/mcp_center/servers/remote_info/src/server.py @@ -742,6 +742,554 @@ def get_os_info_tool(host: Union[str, None] = None) -> str: pass +@mcp.tool( + name="get_network_info_tool" + if RemoteInfoConfig().get_config().public_config.language == LanguageEnum.ZH + else "get_network_info_tool", + description=''' + 获取网络接口信息 + 1. 输入值如下: + - host: 远程主机名称或IP地址,若不提供则表示获取本机的网络接口信息 + 2. 返回值为包含网络接口信息的字典列表,每个字典包含以下键 + - interface: 接口名称 + - ip_address: IP地址 + - netmask: 子网掩码 + - mac_address: MAC地址 + - is_up: 接口是否启用(布尔值) + ''' + if RemoteInfoConfig().get_config().public_config.language == LanguageEnum.ZH + else + ''' + Get network interface information. + 1. Input values are as follows: + - host: Remote host name or IP address. If not provided, it means to get + the network interface information of the local machine. + 2. The return value is a list of dictionaries containing network interface information, + each dictionary contains the following keys: + - interface: Interface name + - ip_address: IP address + - netmask: Netmask + - mac_address: MAC address + - is_up: Whether the interface is up (boolean) + ''' +) +def get_network_info_tool(host: Union[str, None] = None) -> List[Dict[str, Any]]: + """获取网络接口信息""" + if host is None: + # 获取本地网络接口信息 + try: + net_if_addrs = psutil.net_if_addrs() + net_if_stats = psutil.net_if_stats() + network_info = [] + for interface, addrs in net_if_addrs.items(): + ip_address = None + netmask = None + mac_address = None + for addr in addrs: + if addr.family == socket.AF_INET: + ip_address = addr.address + netmask = addr.netmask + elif addr.family == psutil.AF_LINK: + mac_address = addr.address + is_up = net_if_stats[interface].isup if interface in net_if_stats else False + network_info.append({ + 'interface': interface, + 'ip_address': ip_address, + 'netmask': netmask, + 'mac_address': mac_address, + 'is_up': is_up + }) + return network_info + except Exception as e: + return {"error": f"获取本地网络接口信息失败: {str(e)}"} + else: + # 查找远程主机配置 + remote_hosts = RemoteInfoConfig().get_config().public_config.remote_hosts + target_host = next( + (h for h in remote_hosts if host == h.name or host == h.host), + None + ) + + if not target_host: + if RemoteInfoConfig().get_config().public_config.language == LanguageEnum.ZH: + raise ValueError(f"未找到远程主机: {host}") + else: + raise ValueError(f"Remote host not found: {host}") + + ssh = None + try: + # 建立SSH连接 + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + ssh.connect( + hostname=target_host.host, + port=target_host.port, + username=target_host.username, + password=target_host.password, + timeout=10, + banner_timeout=10 + ) + + # 使用ip命令获取网络接口信息,增加兼容性和容错性 + cmd = "ip -o addr show" + stdin, stdout, stderr = ssh.exec_command(cmd, timeout=5) + error = stderr.read().decode().strip() + output = stdout.read().decode().strip() + if error: + raise ValueError(f"Command {cmd} error: {error}") + if not output: + raise ValueError("未能获取网络接口信息") + lines = output.split('\n') + network_info = [] + for line in lines: + parts = line.split() + if len(parts) < 4: + continue + interface = parts[1] + ip_address = None + netmask = None + if parts[2] == 'inet': + ip_address = parts[3].split('/')[0] + netmask = parts[3].split('/')[1] + mac_address = None + is_up = False + # 获取MAC地址和接口状态 + cmd_mac = f"cat /sys/class/net/{interface}/address" + stdin_mac, stdout_mac, stderr_mac = ssh.exec_command(cmd_mac, timeout=5) + mac_output = stdout_mac.read().decode().strip() + if mac_output: + mac_address = mac_output + cmd_state = f"cat /sys/class/net/{interface}/operstate" + stdin_state, stdout_state, stderr_state = ssh.exec_command(cmd_state, timeout=5) + state_output = stdout_state.read().decode().strip() + if state_output == 'up': + is_up = True + network_info.append({ + 'interface': interface, + 'ip_address': ip_address, + 'netmask': netmask, + 'mac_address': mac_address, + 'is_up': is_up + }) + return network_info + except paramiko.AuthenticationException: + raise ValueError("SSH认证失败,请检查用户名和密码") + except paramiko.SSHException as e: + raise ValueError(f"SSH连接错误: {str(e)}") + except Exception as e: + raise ValueError(f"获取远程网络接口信息失败: {str(e)}") + finally: + # 确保SSH连接关闭 + if ssh is not None: + try: + ssh.close() + except Exception: + pass + + +@mcp.tool( + name="write_report_tool" + if RemoteInfoConfig().get_config().public_config.language == LanguageEnum.ZH + else "write_report_tool", + description=''' + 将分析结果写入报告文件 + 1. 输入值如下: + - report: 报告内容字符串 + 2. 返回值为写入报告文件的路径字符串 + ''' + if RemoteInfoConfig().get_config().public_config.language == LanguageEnum.ZH + else + ''' + Write analysis results to a report file. + 1. Input values are as follows: + - report: Report content string + 2. The return value is the path string of the written report file. + ''' +) +def write_report_tool(report: str) -> str: + """将分析结果写入报告文件""" + if not report: + if RemoteInfoConfig().get_config().public_config.language == LanguageEnum.ZH: + raise ValueError("报告内容不能为空") + else: + raise ValueError("Report content cannot be empty") + try: + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S%f") + report_path = f"system_report_{timestamp}.txt" + with open(report_path, 'w', encoding='utf-8', errors='ignore') as f: + f.write(report) + real_path = os.path.realpath(report_path) + return real_path + except Exception as e: + if RemoteInfoConfig().get_config().public_config.language == LanguageEnum.ZH: + raise ValueError(f"写入报告文件失败: {str(e)}") + else: + raise ValueError(f"Failed to write report file: {str(e)}") + + +@mcp.tool( + name="telnet_test_tool" + if RemoteInfoConfig().get_config().public_config.language == LanguageEnum.ZH + else "telnet_test_tool", + description=''' + 测试Telnet连接 + 1. 输入值如下: + - host: 远程主机名称或IP地址 + - port: 端口号 + 2. 返回值为布尔值,表示连接是否成功 + ''' + if RemoteInfoConfig().get_config().public_config.language == LanguageEnum.ZH + else + ''' + Test Telnet connection. + 1. Input values are as follows: + - host: Remote host name or IP address + - port: Port number + 2. The return value is a boolean indicating whether the connection was successful. + ''' +) +def telnet_test_tool(host: str, port: int) -> bool: + """测试Telnet连接""" + if not host: + if RemoteInfoConfig().get_config().public_config.language == LanguageEnum.ZH: + raise ValueError("主机不能为空") + else: + raise ValueError("Host cannot be empty") + if port <= 0 or port > 65535: + if RemoteInfoConfig().get_config().public_config.language == LanguageEnum.ZH: + raise ValueError("端口号必须在1到65535之间") + else: + raise ValueError("Port number must be between 1 and 65535") + try: + with telnetlib.Telnet(host, port, timeout=5) as tn: + return True + except Exception as e: + print(f"Telnet连接失败: {str(e)}") + return False + + +@mcp.tool( + name="ping_test_tool" + if RemoteInfoConfig().get_config().public_config.language == LanguageEnum.ZH + else "ping_test_tool", + description=''' + 测试Ping连接 + 1. 输入值如下: + - host: 远程主机名称或IP地址 + 2. 返回值为布尔值,表示连接是否成功 + ''' + if RemoteInfoConfig().get_config().public_config.language == LanguageEnum.ZH + else + ''' + Test Ping connection. + 1. Input values are as follows: + - host: Remote host name or IP address + 2. The return value is a boolean indicating whether the connection was successful. + ''' +) +def ping_test_tool(host: str) -> bool: + """测试Ping连接""" + if not host: + if RemoteInfoConfig().get_config().public_config.language == LanguageEnum.ZH: + raise ValueError("主机不能为空") + else: + raise ValueError("Host cannot be empty") + param = '-n' if platform.system().lower() == 'windows' else '-c' + command = ['ping', param, '1', host] + try: + output = subprocess.check_output(command, stderr=subprocess.STDOUT, universal_newlines=True) + return True + except subprocess.CalledProcessError as e: + print(f"Ping连接失败: {str(e)}") + return False + + +@mcp.tool( + name="get_dns_info_tool" + if RemoteInfoConfig().get_config().public_config.language == LanguageEnum.ZH + else "get_dns_info_tool", + description=''' + 获取DNS配置信息 + 1. 输入值如下: + - host: 远程主机名称或IP地址,若不提供则表示 + 获取本机的DNS配置信息 + 2. 返回值为包含DNS配置信息的字典,包含以下 + 键 + - nameservers: DNS服务器列表 + - search: 搜索域列表 + ''' + if RemoteInfoConfig().get_config().public_config.language == LanguageEnum.ZH + else + ''' + Get DNS configuration information. + 1. Input values are as follows: + - host: Remote host name or IP address. If not provided, it means to get + the DNS configuration information of the local machine. + 2. The return value is a dictionary containing DNS configuration information, containing the following + keys: + - nameservers: List of DNS servers + - search: List of search domains + ''' +) +def get_dns_info_tool(host: Union[str, None] = None) -> Dict[str, Any]: + """获取DNS配置信息""" + if host is None: + # 获取本地DNS信息 + try: + dns_info = {'nameservers': [], 'search': []} + with open('/etc/resolv.conf', 'r') as f: + for line in f: + if line.startswith('nameserver'): + dns_info['nameservers'].append(line.split()[1]) + elif line.startswith('search'): + dns_info['search'].extend(line.split()[1:]) + return dns_info + except Exception as e: + return {"error": f"获取本地DNS信息失败: {str(e)}"} + else: + # 查找远程主机配置 + remote_hosts = RemoteInfoConfig().get_config().public_config.remote_hosts + target_host = next( + (h for h in remote_hosts if host == h.name or host == h.host), + None + ) + + if not target_host: + if RemoteInfoConfig().get_config().public_config.language == LanguageEnum.ZH: + raise ValueError(f"未找到远程主机: {host}") + else: + raise ValueError(f"Remote host not found: {host}") + + ssh = None + try: + # 建立SSH连接 + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + ssh.connect( + hostname=target_host.host, + port=target_host.port, + username=target_host.username, + password=target_host.password, + timeout=10, + banner_timeout=10 + ) + + # 使用cat命令获取DNS信息,增加兼容性和容错性 + cmd = "cat /etc/resolv.conf" + stdin, stdout, stderr = ssh.exec_command(cmd, timeout=5) + error = stderr.read().decode().strip() + output = stdout.read().decode().strip() + if error: + raise ValueError(f"Command {cmd} error: {error}") + if not output: + raise ValueError("未能获取DNS信息") + dns_info = {'nameservers': [], 'search': []} + for line in output.split('\n'): + if line.startswith('nameserver'): + dns_info['nameservers'].append(line.split()[1]) + elif line.startswith('search'): + dns_info['search'].extend(line.split()[1:]) + return dns_info + except paramiko.AuthenticationException: + raise ValueError("SSH认证失败,请检查用户名和密码") + except paramiko.SSHException as e: + raise ValueError(f"SSH连接错误: {str(e)}") + except Exception as e: + raise ValueError(f"获取远程DNS信息失败: {str(e)}") + finally: + if ssh is not None: + try: + ssh.close() + except Exception: + pass + + +@mcp.tool( + name="perf_data_tool" + if RemoteInfoConfig().get_config().public_config.language == LanguageEnum.ZH + else "perf_data_tool", + description=''' + 收集性能数据 + 1. 输入值如下: + - host: 远程主机名称或IP地址,若不提供则表示收集本机的性能数据 + - pid : 进程ID,若不提供则表示收集所有进程的性能数据 + 2. 返回值为包含性能数据的字典,包含以下键 + - cpu_usage: CPU使用率(百分比) + - memory_usage: 内存使用率(百分比) + - io_counters: I/O统计信息(字典) + ''' + if RemoteInfoConfig().get_config().public_config.language == LanguageEnum.ZH + else + ''' + Collect performance data. + 1. Input values are as follows: + - host: Remote host name or IP address. If not provided, it means to collect + the performance data of the local machine. + - pid : Process ID. If not provided, it means to collect performance data for all processes. + 2. The return value is a dictionary containing performance data, containing the following + keys: + - cpu_usage: CPU usage (percentage) + - memory_usage: Memory usage (percentage) + - io_counters: I/O statistics (dictionary) + ''' +) +def perf_data_tool(host: Union[str, None] = None, pid: Union[int, None] = None) -> Dict[str, Any]: + """收集性能数据""" + if host is None: + # 获取本地性能数据 + try: + if pid is not None: + # 获取指定进程的性能数据 + proc = psutil.Process(pid) + cpu_usage = proc.cpu_percent(interval=1) + memory_info = proc.memory_info() + memory_usage = proc.memory_percent() + io_counters = proc.io_counters()._asdict() if hasattr(proc, 'io_counters') else {} + else: + # 获取所有进程的性能数据 + cpu_usage = psutil.cpu_percent(interval=1) + memory_info = psutil.virtual_memory() + memory_usage = memory_info.percent + io_counters = {} + performance_data = { + 'cpu_usage': cpu_usage, + 'memory_usage': memory_usage, + 'io_counters': io_counters + } + return performance_data + except psutil.NoSuchProcess: + return {"error": f"进程ID {pid} 不存在"} + except Exception as e: + return {"error": f"获取本地性能数据失败: {str(e)}"} + else: + # 查找远程主机配置 + remote_hosts = RemoteInfoConfig().get_config().public_config.remote_hosts + target_host = next( + (h for h in remote_hosts if host == h.name or host == h.host), + None + ) + + if not target_host: + if RemoteInfoConfig().get_config().public_config.language == LanguageEnum.ZH: + raise ValueError(f"未找到远程主机: {host}") + else: + raise ValueError(f"Remote host not found: {host}") + + ssh = None + try: + # 建立SSH连接 + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + ssh.connect( + hostname=target_host.host, + port=target_host.port, + username=target_host.username, + password=target_host.password, + timeout=10, + banner_timeout=10 + ) + + if pid is not None: + # 获取指定进程的性能数据 + cmd_cpu = f"ps -p {pid} -o %cpu --no-headers" + cmd_mem = f"ps -p {pid} -o %mem --no-headers" + cmd_io = f"cat /proc/{pid}/io" + stdin_cpu, stdout_cpu, stderr_cpu = ssh.exec_command(cmd_cpu, timeout=5) + stdin_mem, stdout_mem, stderr_mem = ssh.exec_command(cmd_mem, timeout=5) + stdin_io, stdout_io, stderr_io = ssh.exec_command(cmd_io, timeout=5) + error_cpu = stderr_cpu.read().decode().strip() + error_mem = stderr_mem.read().decode().strip() + error_io = stderr_io.read().decode().strip() + output_cpu = stdout_cpu.read().decode().strip() + output_mem = stdout_mem.read().decode().strip() + output_io = stdout_io.read().decode().strip() + if error_cpu: + raise ValueError(f"Command {cmd_cpu} error: {error_cpu}") + if error_mem: + raise ValueError(f"Command {cmd_mem} error: {error_mem}") + if error_io: + raise ValueError(f"Command {cmd_io} error: {error_io}") + if not output_cpu or not output_mem: + raise ValueError(f"未能获取进程ID {pid} 的性能数据") + cpu_usage = float(output_cpu) + memory_usage = float(output_mem) + io_counters = {} + for line in output_io.split('\n'): + parts = line.split(':') + if len(parts) == 2: + io_counters[parts[0].strip()] = int(parts[1].strip()) + else: + # 获取所有进程的性能数据 + cmd_cpu = "top -b -n2 -d1 | grep 'Cpu(s)' | tail -n1" + cmd_mem = "free -m | grep Mem" + + # 执行命令获取CPU和内存数据 + stdin_cpu, stdout_cpu, stderr_cpu = ssh.exec_command(cmd_cpu, timeout=5) + stdin_mem, stdout_mem, stderr_mem = ssh.exec_command(cmd_mem, timeout=5) + + # 读取命令执行结果和错误信息 + error_cpu = stderr_cpu.read().decode().strip() + error_mem = stderr_mem.read().decode().strip() + output_cpu = stdout_cpu.read().decode().strip() + output_mem = stdout_mem.read().decode().strip() + + # 检查命令执行错误 + if error_cpu: + raise ValueError(f"Command {cmd_cpu} error: {error_cpu}") + if error_mem: + raise ValueError(f"Command {cmd_mem} error: {error_mem}") + + # 检查输出是否为空 + if not output_cpu or not output_mem: + raise ValueError("未能获取系统性能数据") + + # 解析CPU使用率(适配新格式) + cpu_parts = output_cpu.split(',') # 按逗号分割各项指标 + idle_value = None + + for part in cpu_parts: + part = part.strip() # 去除空格 + if part.endswith('id'): # 查找包含空闲时间的项 + # 提取数字部分(如 "95.8 id" 中的 "95.8") + idle_value = part.split()[0] + break + + if idle_value is None: + raise ValueError(f"无法解析CPU空闲时间: {output_cpu}") + + # 计算CPU使用率 + cpu_usage = 100.0 - float(idle_value) + + # 解析内存使用率 + mem_parts = output_mem.split() + if len(mem_parts) < 7: + raise ValueError("内存信息格式异常") + + # 计算内存使用率 + memory_usage = (float(mem_parts[2]) / float(mem_parts[1])) * 100 if float(mem_parts[1]) > 0 else 0 + + # IO计数器(可根据需要补充实现) + io_counters = {} + performance_data = { + 'cpu_usage': cpu_usage, + 'memory_usage': memory_usage, + 'io_counters': io_counters + } + return performance_data + except paramiko.AuthenticationException: + raise ValueError("SSH认证失败,请检查用户名和密码") + except paramiko.SSHException as e: + raise ValueError(f"SSH连接错误: {str(e)}") + except Exception as e: + raise ValueError(f"获取远程性能数据失败: {str(e)}") + finally: + if ssh is not None: + try: + ssh.close() + except Exception: + pass + + if __name__ == "__main__": # Initialize and run the server mcp.run(transport='sse')