深蓝色
deepin
2025-08-06 13:18 端云协同了
Reply Like 1 View the author
端云协同了
太酷啦
端云协同了
其实我挺想所有都在本地运行的,但我用的本地大模型似乎用不了集成的mcp
其实我挺想所有都在本地运行的,但我用的本地大模型似乎用不了集成的mcp
应该很快本地小模型能力也够了
其实我挺想所有都在本地运行的,但我用的本地大模型似乎用不了集成的mcp
端侧也需要大参数量的模型才能支撑起智能体复杂的任务分析、规划与工具调用的工作。
太牛了,玩MCP的乐趣瞬间就感觉到了。
太牛了,玩MCP的乐趣瞬间就感觉到了。
哈哈是挺好玩的,找ai写集成ai的东西来玩,而且python能做好多有趣的东西
动手能力太强了老哥👍
第一版python文件,目前依赖安装无法使用
import os
import re
import sys
import json
import logging
import subprocess
import datetime
from pathlib import Path
from typing import Dict, Any, Optional, Tuple, Set
# 导入 MCP 核心库
from mcp.server.fastmcp import FastMCP, Context
# 导入 LangChain 和 Pydantic
from langchain_community.llms import Ollama
from pydantic import BaseModel, Field
# --- 日志配置 ---
logger = logging.getLogger(__name__)
# --- MCP 服务器定义 ---
mcp = FastMCP(
name="SkyShadowHero Task Execution Server",
)
# --- Pydantic 返回模型 ---
class ExecutionStage(BaseModel):
code_generation: str
dependency_installation: str
execution: str
class ExecutionResult(BaseModel):
stages: ExecutionStage
code: str
output: str
error: str
work_dir: str
returncode: int
class ServerInfo(BaseModel):
name: str
model: Optional[str]
status: str
class CommandOutput(BaseModel):
status: str
result: ExecutionResult
server: ServerInfo
# --- 配置管理 ---
class MCPConfig:
_instance = None
config_path = Path(__file__).parent / "mcp_config.json"
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance.load_config()
return cls._instance
def load_config(self):
if not self.config_path.exists():
self.config = {
"mcpServers": {
"default": {
"model": "deepseek-coder-v2:16b",
"task_templates": {
"file_operations": {
"prompt": (
"你是一个顶级的Python自动化专家。你的任务是根据用户的自然语言指令,生成一段完整、健壮、可直接在标准Python环境中执行的脚本。你拥有完全的创作自由,但必须严格遵守以下规范。\n\n"
"## 用户指令:\n"
"{task}\n\n"
"## 代码生成规范 (必须严格遵守):\n"
"1. **【代码纯净性】**: 你的输出必须是纯粹的Python代码。绝对禁止包含任何Markdown标记,尤其是 ` ```python ` 和 ` ``` `。\n"
"2. **【依赖声明】**: 如果代码需要任何第三方库 (例如 `requests`, `pandas`),必须在代码的最开始,使用 `# REQUIRE: ` 的格式进行声明。**每个依赖独立一行**。如果不需要任何第三方库,则完全不要写 `# REQUIRE:` 注释。\n"
"3. **【日志记录】**: 必须使用Python的 `logging` 模块。在脚本开始处配置好 `basicConfig`,确保日志同时输出到控制台(stdout)和当前工作目录下的 `task.log` 文件。在关键步骤和任何 `except` 块中,都必须使用 `logging.info()` 或 `logging.error()` 进行记录。\n"
"4. **【错误处理】**: 所有可能失败的操作都必须被包含在 `try...except Exception as e:` 块中。\n"
"5. **【成功信号】**: 在脚本所有操作成功完成的最后,必须调用 `print(\"任务成功完成\")`。\n"
"6. **【完整性】**: 生成的代码必须是完整的、自包含的,包含所有必要的 `import` 语句。\n\n"
"现在,请根据用户指令生成代码。"
),
}
}
}
}
}
self.save_config()
else:
with open(self.config_path, 'r', encoding='utf-8') as f:
self.config = json.load(f)
def save_config(self):
with open(self.config_path, 'w', encoding='utf-8') as f:
json.dump(self.config, f, indent=2, ensure_ascii=False)
def get_server(self, name: str) -> Optional[Dict[str, Any]]:
return self.config.get("mcpServers", {}).get(name)
class TaskWorkflow:
def __init__(self):
self.config = MCPConfig().config
self.llm_cache = {}
self.standard_libs = self._get_standard_libs()
script_dir = Path(__file__).parent.resolve()
self.shared_work_dir = script_dir / "mcp_tasks"
self.shared_work_dir.mkdir(exist_ok=True)
logger.info("正在初始化并检查共享虚拟环境...")
try:
self.venv_path = self.shared_work_dir / "venv"
# _create_virtual_env 会返回可执行文件的绝对路径
self.python_executable, self.pip_executable = self._create_virtual_env(str(self.venv_path))
logger.info(f"共享虚拟环境已就绪。Python: {self.python_executable}, Pip: {self.pip_executable}")
except Exception as e:
logger.error(f"初始化共享虚拟环境失败: {e}", exc_info=True)
raise RuntimeError(f"无法创建或验证共享虚拟环境,服务无法启动。错误: {e}")
def _get_standard_libs(self) -> Set[str]:
common_libs = {'os', 'sys', 'json', 're', 'logging', 'subprocess', 'pathlib', 'datetime', 'time', 'math', 'random', 'collections', 'itertools', 'functools', 'glob', 'shutil', 'tempfile', 'argparse', 'typing', '__future__'}
if sys.version_info >= (3, 10):
try:
from sys import stdlib_module_names
return set(stdlib_module_names)
except ImportError:
return common_libs
return common_libs
async def get_llm(self, model_name: str) -> Ollama:
if model_name not in self.llm_cache:
logger.info(f"正在加载模型: {model_name}")
self.llm_cache[model_name] = Ollama(model=model_name, temperature=0.1, top_p=0.9, timeout=300)
return self.llm_cache[model_name]
def _create_virtual_env(self, venv_path_str: str) -> Tuple[str, str]:
"""
【已更新】创建虚拟环境,并返回 Python 和 Pip 可执行文件的绝对路径。
"""
venv_path = Path(venv_path_str)
if sys.platform == "win32":
python_exe = venv_path / 'Scripts' / 'python.exe'
pip_exe = venv_path / 'Scripts' / 'pip.exe'
else:
python_exe = venv_path / 'bin' / 'python'
pip_exe = venv_path / 'bin' / 'pip'
if not python_exe.exists() or not pip_exe.exists():
logger.info(f"共享虚拟环境不完整或不存在,正在创建于: {venv_path_str}")
try:
subprocess.run(
[sys.executable, "-m", "venv", venv_path_str],
check=True, capture_output=True, text=True, timeout=120
)
except subprocess.CalledProcessError as e:
logger.error(f"创建虚拟环境失败: {e.stderr}")
raise RuntimeError(f"创建虚拟环境失败,错误: {e.stderr}")
if not python_exe.exists() or not pip_exe.exists():
raise FileNotFoundError(f"虚拟环境创建后,未找到 Python/Pip 可执行文件。")
logger.info("虚拟环境验证成功。")
return str(python_exe), str(pip_exe)
def _post_process_code(self, generated_code: str) -> Tuple[str, Set[str]]:
cleaned_code = re.sub(r"```python\n|```", "", generated_code).strip()
required_deps = set(re.findall(r"#\s*REQUIRE:\s*(\S+)", cleaned_code))
final_code = "\n".join([line for line in cleaned_code.split('\n') if not line.strip().startswith("# REQUIRE:")])
logger.info(f"代码后处理完成。提取的依赖: {required_deps or '无'}。")
return final_code.strip(), required_deps
def _install_dependencies(self, deps: Set[str]):
"""
【需要改进】使用 Pip 的绝对路径来安装依赖。
"""
if not deps:
logger.info("代码中未发现 # REQUIRE: 声明,跳过依赖安装。")
return
deps_to_install = {dep for dep in deps if dep.lower() not in self.standard_libs}
if not deps_to_install:
logger.info(f"所有声明的依赖 {list(deps)} 均为标准库,无需安装。")
return
# 这里需要改进
command = [self.pip_executable, "install", *deps_to_install]
logger.info(f"执行依赖安装命令: {' '.join(command)}")
result = subprocess.run(
command,
cwd=str(self.shared_work_dir),
capture_output=True,
text=True,
timeout=300,
check=False,
encoding='utf-8',
)
if result.returncode != 0:
error_message = f"依赖安装失败: {result.stderr}"
logger.error(error_message)
raise RuntimeError(error_message)
logger.info(f"依赖 {list(deps_to_install)} 安装成功。")
def _execute_code(self, code_to_execute: str, task_work_dir: str) -> Dict[str, Any]:
"""
【已更新】使用 Python 的绝对路径来执行代码。
"""
script_name = "generated_script.py"
code_path = os.path.join(task_work_dir, script_name)
with open(code_path, "w", encoding="utf-8") as f:
f.write(code_to_execute)
logger.info(f"最终执行的脚本已保存至: {code_path}")
command = [self.python_executable, script_name]
logger.info(f"执行代码命令: {' '.join(command)}")
result = subprocess.run(
command,
cwd=task_work_dir,
capture_output=True,
text=True,
timeout=300,
check=False,
encoding='utf-8',
)
return {"output": result.stdout, "error": result.stderr, "returncode": result.returncode}
async def run_workflow(self, instruction: str, server_name: str, ctx: Context):
result = {
"stages": {
"code_generation": "pending",
"dependency_installation": "pending",
"execution": "pending"
},
"code": "", "output": "", "error": "", "work_dir": "", "returncode": -1
}
try:
timestamp = datetime.datetime.now().strftime("task_%Y%m%d_%H%M%S")
task_work_dir = self.shared_work_dir / timestamp
task_work_dir.mkdir(exist_ok=True)
result["work_dir"] = str(task_work_dir)
await ctx.info(f"任务工作目录已创建: {task_work_dir}")
server_config = self.config.get("mcpServers", {}).get(server_name)
if not server_config: raise ValueError(f"服务器 '{server_name}' 未配置")
template = server_config["task_templates"]["file_operations"]
prompt = template["prompt"].format(task=instruction)
await ctx.info(f"正在使用模型 '{server_config['model']}' 生成代码...")
llm = await self.get_llm(server_config['model'])
generated_code = await llm.ainvoke(prompt)
result["stages"]["code_generation"] = "success"
await ctx.info("代码生成成功。")
pure_code, dependencies = self._post_process_code(generated_code)
result["code"] = pure_code
result["stages"]["dependency_installation"] = "pending"
await ctx.info(f"正在检查并安装依赖: {dependencies or '无'}")
self._install_dependencies(dependencies)
result["stages"]["dependency_installation"] = "success"
await ctx.info("所有依赖已就绪。")
result["stages"]["execution"] = "pending"
await ctx.info("正在执行生成的代码...")
exec_result = self._execute_code(pure_code, str(task_work_dir))
result.update(exec_result)
is_successful = "任务成功完成" in exec_result.get("output", "")
result["stages"]["execution"] = "success" if is_successful else "failed"
final_status = "success" if is_successful else "failed"
return {"status": final_status, "result": result, "server": {"name": server_name, "model": server_config.get("model"), "status": "active"}}
except Exception as e:
current_stage = next((s for s, status in result["stages"].items() if status == "pending"), "unknown")
result["stages"][current_stage] = "failed"
error_message = f"在 '{current_stage}' 阶段失败: {e}"
result["error"] = error_message
logger.error(error_message, exc_info=True)
await ctx.error(error_message)
for stage, status in result["stages"].items():
if status == "pending":
result["stages"][stage] = "skipped"
return {"status": "failed", "result": result, "server": {"name": server_name, "model": self.config.get("mcpServers", {}).get(server_name, {}).get("model"), "status": "error"}}
# --- 一个单例的工作流执行器 ---
workflow_executor = TaskWorkflow()
# --- MCP 工具定义 ---
@mcp.tool()
async def execute_natural_command(
instruction: str = Field(
description="用户用自然语言下达的、需要在本地计算机上执行的具体任务指令。例如:'在桌面上创建一个名为'todo.txt'的文件' 或 '将/home/user/docs目录下的所有.log文件压缩成一个zip包'。"
),
server: str = Field(default="default", description="要使用的服务器配置名称。"),
ctx: Context = Field(exclude=True)
) -> CommandOutput:
"""
当用户需要执行任何与文件、文件夹或系统相关的本地操作时,请使用此工具。
此工具接收一条自然语言指令,将其转换为可执行的Python代码,并在安全的虚拟环境中运行,然后返回详细的执行结果。
非常适合处理创建文件、修改文件、删除文件、压缩文件、移动目录、运行脚本等任务。
"""
try:
await ctx.info(f"收到指令,开始处理: '{instruction}'")
# 调用单例执行器的方法,并传入当前的 ctx
result_dict = await workflow_executor.run_workflow(
instruction=instruction,
server_name=server,
ctx=ctx
)
await ctx.info("任务流程执行完毕。")
return CommandOutput.model_validate(result_dict)
except Exception as e:
await ctx.error(f"执行工具时发生严重错误: {e}")
return CommandOutput(
status="failed",
result=ExecutionResult(
stages=ExecutionStage(
code_generation="failed",
dependency_installation="skipped",
execution="skipped"
),
code="",
output="",
error=f"执行工具时发生顶层错误: {e}",
work_dir="",
returncode=-1
),
server=ServerInfo(name=server, model=None, status="error")
)
# --- 服务器启动 ---
def run():
"""
服务器主入口函数。
"""
try:
subprocess.run(["ollama", "list"], check=True, capture_output=True, text=True)
logger.info("Ollama 服务已在运行。")
except (subprocess.CalledProcessError, FileNotFoundError):
logger.warning("Ollama服务未运行,请确保Ollama已安装并正在运行。")
mcp.run()
if __name__ == "__main__":
run()
第二版python文件,目前能正常安装依赖
import os
import re
import sys
import json
import logging
import subprocess
import datetime
from pathlib import Path
from typing import Dict, Any, Optional, Set, Tuple
# 导入 MCP 核心库
from mcp.server.fastmcp import FastMCP, Context
# 导入 LangChain 和 Pydantic
from langchain_community.llms import Ollama
from pydantic import BaseModel, Field
# --- 日志配置 ---
logger = logging.getLogger(__name__)
# --- MCP 服务器定义 ---
mcp = FastMCP(name="SkyShadowHero Task Execution Server")
# --- Pydantic 返回模型 ---
class ExecutionStage(BaseModel):
code_generation: str
dependency_installation: str
execution: str
class ExecutionResult(BaseModel):
stages: ExecutionStage
code: str
output: str
error: str
work_dir: str
returncode: int
class ServerInfo(BaseModel):
name: str
model: Optional[str]
status: str
class CommandOutput(BaseModel):
status: str
result: ExecutionResult
server: ServerInfo
# --- 配置管理 ---
class MCPConfig:
_instance = None
config_path = Path(__file__).parent / "mcp_config.json"
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance.load_config()
return cls._instance
def load_config(self):
if not self.config_path.exists():
self.config = {
"mcpServers": {
"default": {
"model": "deepseek-coder-v2:16b",
"task_templates": {
"file_operations": {
"prompt": (
"你是一个顶级的Python自动化专家。你的任务是根据用户的自然语言指令,生成一段完整、健壮、可直接在标准Python环境中执行的脚本。你拥有完全的创作自由,但必须严格遵守以下规范。\n\n"
"## 用户指令:\n"
"{task}\n\n"
"## 代码生成规范 (必须严格遵守):\n"
"1. **【代码纯净性】**: 你的输出必须是纯粹的Python代码。绝对禁止包含任何Markdown标记,尤其是 ` ```python ` 和 ` ``` `。\n"
"2. **【依赖声明】**: 如果代码需要任何第三方库 (例如 `requests`, `pandas`),必须在代码的最开始,使用 `# REQUIRE: ` 的格式进行声明。**每个依赖独立一行**。如果不需要任何第三方库,则完全不要写 `# REQUIRE:` 注释。\n"
"3. **【日志记录】**: 必须使用Python的 `logging` 模块。在脚本开始处配置好 `basicConfig`,确保日志同时输出到控制台(stdout)和当前工作目录下的 `task.log` 文件。在关键步骤和任何 `except` 块中,都必须使用 `logging.info()` 或 `logging.error()` 进行记录。\n"
"4. **【错误处理】**: 所有可能失败的操作都必须被包含在 `try...except Exception as e:` 块中。\n"
"5. **【成功信号】**: 在脚本所有操作成功完成的最后,必须调用 `print(\"任务成功完成\")`。\n"
"6. **【完整性】**: 生成的代码必须是完整的、自包含的,包含所有必要的 `import` 语句。\n\n"
"现在,请根据用户指令生成代码。"
),
}
}
}
}
}
self.save_config()
else:
with open(self.config_path, 'r', encoding='utf-8') as f:
self.config = json.load(f)
def save_config(self):
with open(self.config_path, 'w', encoding='utf-8') as f:
json.dump(self.config, f, indent=2, ensure_ascii=False)
def get_server(self, name: str) -> Optional[Dict[str, Any]]:
return self.config.get("mcpServers", {}).get(name)
# --- 【最终修复架构】核心逻辑封装 ---
class TaskWorkflow:
def __init__(self):
self.config = MCPConfig().config
self.llm_cache = {}
self.standard_libs = self._get_standard_libs()
script_dir = Path(__file__).parent.resolve()
self.shared_work_dir = script_dir / "mcp_tasks"
self.shared_work_dir.mkdir(exist_ok=True)
logger.info("正在初始化并检查共享虚拟环境...")
try:
self.venv_path = self.shared_work_dir / "venv"
self.python_executable, self.pip_executable = self._create_virtual_env(str(self.venv_path))
logger.info(f"共享虚拟环境已就绪。Python: {self.python_executable}, Pip: {self.pip_executable}")
except Exception as e:
logger.error(f"初始化共享虚拟环境失败: {e}", exc_info=True)
raise RuntimeError(f"无法创建或验证共享虚拟环境,服务无法启动。错误: {e}")
def _get_standard_libs(self) -> Set[str]:
common_libs = {'os', 'sys', 'json', 're', 'logging', 'subprocess', 'pathlib', 'datetime', 'time', 'math', 'random', 'collections', 'itertools', 'functools', 'glob', 'shutil', 'tempfile', 'argparse', 'typing', '__future__'}
if sys.version_info >= (3, 10):
try:
from sys import stdlib_module_names
return set(stdlib_module_names)
except ImportError:
return common_libs
return common_libs
async def get_llm(self, model_name: str) -> Ollama:
if model_name not in self.llm_cache:
logger.info(f"正在加载模型: {model_name}")
self.llm_cache[model_name] = Ollama(model=model_name, temperature=0.1, top_p=0.9, timeout=300)
return self.llm_cache[model_name]
def _create_virtual_env(self, venv_path_str: str) -> Tuple[str, str]:
venv_path = Path(venv_path_str)
if sys.platform == "win32":
python_exe = venv_path / 'Scripts' / 'python.exe'
pip_exe = venv_path / 'Scripts' / 'pip.exe'
else:
python_exe = venv_path / 'bin' / 'python'
pip_exe = venv_path / 'bin' / 'pip'
if not python_exe.exists() or not pip_exe.exists():
logger.info(f"共享虚拟环境不完整或不存在,正在创建于: {venv_path_str}")
try:
subprocess.run(
[sys.executable, "-m", "venv", venv_path_str],
check=True, capture_output=True, text=True, timeout=120
)
except subprocess.CalledProcessError as e:
logger.error(f"创建虚拟环境失败: {e.stderr}")
raise RuntimeError(f"创建虚拟环境失败,错误: {e.stderr}")
if not python_exe.exists() or not pip_exe.exists():
raise FileNotFoundError(f"虚拟环境创建后,未找到 Python/Pip 可执行文件。")
logger.info("虚拟环境验证成功。")
return str(python_exe), str(pip_exe)
def _post_process_code(self, generated_code: str) -> Tuple[str, Set[str]]:
cleaned_code = re.sub(r"```python\n|```", "", generated_code).strip()
required_deps = set(re.findall(r"#\s*REQUIRE:\s*(\S+)", cleaned_code))
final_code = "\n".join([line for line in cleaned_code.split('\n') if not line.strip().startswith("# REQUIRE:")])
logger.info(f"代码后处理完成。提取的依赖: {required_deps or '无'}。")
return final_code.strip(), required_deps
def _install_dependencies(self, deps: Set[str]):
if not deps:
logger.info("代码中未发现 # REQUIRE: 声明,跳过依赖安装。")
return
deps_to_install = {dep for dep in deps if dep.lower() not in self.standard_libs}
if not deps_to_install:
logger.info(f"所有声明的依赖 {list(deps)} 均为标准库,无需安装。")
return
command = [self.pip_executable, "install", *deps_to_install]
logger.info(f"执行依赖安装命令: {' '.join(command)}")
result = subprocess.run(
command,
cwd=str(self.shared_work_dir),
capture_output=True,
text=True,
timeout=300,
check=False,
encoding='utf-8',
)
if result.returncode != 0:
error_message = f"依赖安装失败: {result.stderr}"
logger.error(error_message)
raise RuntimeError(error_message)
logger.info(f"依赖 {list(deps_to_install)} 安装成功。")
def _execute_code(self, code_to_execute: str, task_work_dir: str) -> Dict[str, Any]:
script_name = "generated_script.py"
code_path = os.path.join(task_work_dir, script_name)
with open(code_path, "w", encoding="utf-8") as f:
f.write(code_to_execute)
logger.info(f"最终执行的脚本已保存至: {code_path}")
command = [self.python_executable, script_name]
logger.info(f"执行代码命令: {' '.join(command)}")
result = subprocess.run(
command,
cwd=task_work_dir,
capture_output=True,
text=True,
timeout=300,
check=False,
encoding='utf-8',
)
return {"output": result.stdout, "error": result.stderr, "returncode": result.returncode}
def _is_direct_command(self, instruction: str) -> bool:
"""
【新】检查指令是否为直接的 shell 命令。
"""
# 简单但有效的检查:如果指令以常见命令开头,则认为是直接命令
common_commands = ['python', 'pip', 'uv', 'bash', 'sh', 'ls', 'rm', 'cp', 'mv', 'mkdir']
return any(instruction.strip().startswith(cmd) for cmd in common_commands)
def _execute_direct_command(self, command: str, task_work_dir: str) -> Dict[str, Any]:
"""
【新】在虚拟环境的上下文中直接执行 shell 命令。
"""
logger.info(f"检测到直接命令,将在虚拟环境中执行: {command}")
# 为了在虚拟环境中执行,我们构造一个激活并执行的命令
# 这对于需要虚拟环境上下文的命令(如 pip install)至关重要
if sys.platform == "win32":
activate_script = Path(self.python_executable).parent / "activate.bat"
full_command = f'call "{activate_script}" && {command}'
else:
activate_script = Path(self.python_executable).parent / "activate"
full_command = f'. "{activate_script}" && {command}'
result = subprocess.run(
full_command,
cwd=task_work_dir,
capture_output=True,
text=True,
timeout=300,
check=False,
encoding='utf-8',
shell=True, # shell=True 对于 `.` 和 `&&` 是必需的
executable='/bin/bash' if sys.platform != "win32" else None
)
return {"output": result.stdout, "error": result.stderr, "returncode": result.returncode}
async def run_workflow(self, instruction: str, server_name: str, ctx: Context):
result = {
"stages": {
"code_generation": "pending",
"dependency_installation": "pending",
"execution": "pending"
},
"code": "", "output": "", "error": "", "work_dir": "", "returncode": -1
}
try:
timestamp = datetime.datetime.now().strftime("task_%Y%m%d_%H%M%S")
task_work_dir = self.shared_work_dir / timestamp
task_work_dir.mkdir(exist_ok=True)
result["work_dir"] = str(task_work_dir)
await ctx.info(f"任务工作目录已创建: {task_work_dir}")
# --- 指令嗅探和模式切换 ---
if self._is_direct_command(instruction):
await ctx.info("检测到直接命令模式。")
result["stages"]["code_generation"] = "skipped (direct command)"
result["stages"]["dependency_installation"] = "skipped (direct command)"
result["code"] = f"# Direct Command Execution\n{instruction}"
result["stages"]["execution"] = "pending"
await ctx.info(f"正在直接执行命令: {instruction}")
exec_result = self._execute_direct_command(instruction, str(task_work_dir))
result.update(exec_result)
is_successful = exec_result.get("returncode") == 0
result["stages"]["execution"] = "success" if is_successful else "failed"
final_status = "success" if is_successful else "failed"
return {"status": final_status, "result": result, "server": {"name": server_name, "model": "N/A (Direct Command)", "status": "active"}}
# --- 如果不是直接命令,则执行原有的 LLM 工作流 ---
await ctx.info("进入 LLM 代码生成模式。")
server_config = self.config.get("mcpServers", {}).get(server_name)
if not server_config: raise ValueError(f"服务器 '{server_name}' 未配置")
template = server_config["task_templates"]["file_operations"]
prompt = template["prompt"].format(task=instruction)
await ctx.info(f"正在使用模型 '{server_config['model']}' 生成代码...")
llm = await self.get_llm(server_config['model'])
generated_code = await llm.ainvoke(prompt)
result["stages"]["code_generation"] = "success"
await ctx.info("代码生成成功。")
pure_code, dependencies = self._post_process_code(generated_code)
result["code"] = pure_code
result["stages"]["dependency_installation"] = "pending"
await ctx.info(f"正在检查并安装依赖: {dependencies or '无'}")
self._install_dependencies(dependencies)
result["stages"]["dependency_installation"] = "success"
await ctx.info("所有依赖已就绪。")
result["stages"]["execution"] = "pending"
await ctx.info("正在执行生成的代码...")
exec_result = self._execute_code(pure_code, str(task_work_dir))
result.update(exec_result)
is_successful = "任务成功完成" in exec_result.get("output", "")
result["stages"]["execution"] = "success" if is_successful else "failed"
final_status = "success" if is_successful else "failed"
return {"status": final_status, "result": result, "server": {"name": server_name, "model": server_config.get("model"), "status": "active"}}
except Exception as e:
current_stage = next((s for s, status in result["stages"].items() if status == "pending"), "unknown")
result["stages"][current_stage] = "failed"
error_message = f"在 '{current_stage}' 阶段失败: {e}"
result["error"] = error_message
logger.error(error_message, exc_info=True)
await ctx.error(error_message)
for stage, status in result["stages"].items():
if status == "pending":
result["stages"][stage] = "skipped"
return {"status": "failed", "result": result, "server": {"name": server_name, "model": self.config.get("mcpServers", {}).get(server_name, {}).get("model"), "status": "error"}}
# --- 单例和工具定义 ---
workflow_executor = TaskWorkflow()
@mcp.tool()
async def execute_natural_command(
instruction: str = Field(
description="用户用自然语言下达的、需要在本地计算机上执行的具体任务指令。例如:'在桌面上创建一个名为'todo.txt'的文件' 或 '将/home/user/docs目录下的所有.log文件压缩成一个zip包'。"
),
server: str = Field(default="default", description="要使用的服务器配置名称。"),
ctx: Context = Field(exclude=True)
) -> CommandOutput:
"""
当用户需要执行任何与文件、文件夹或系统相关的本地操作时,请使用此工具。
此工具接收一条自然语言指令,将其转换为可执行的Python代码,并在安全的虚拟环境中运行,然后返回详细的执行结果。
"""
try:
await ctx.info(f"收到指令,开始处理: '{instruction}'")
result_dict = await workflow_executor.run_workflow(
instruction=instruction,
server_name=server,
ctx=ctx
)
await ctx.info("任务流程执行完毕。")
return CommandOutput.model_validate(result_dict)
except Exception as e:
await ctx.error(f"执行工具时发生严重错误: {e}")
return CommandOutput(
status="failed",
result=ExecutionResult(
stages=ExecutionStage(
code_generation="failed",
dependency_installation="skipped",
execution="skipped"
),
code="",
output="",
error=f"执行工具时发生顶层错误: {e}",
work_dir="",
returncode=-1
),
server=ServerInfo(name=server, model=None, status="error")
)
# --- 服务器启动---
def run():
"""
服务器主入口函数。
"""
try:
subprocess.run(["ollama", "list"], check=True, capture_output=True, text=True)
logger.info("Ollama 服务已在运行。")
except (subprocess.CalledProcessError, FileNotFoundError):
logger.warning("Ollama服务未运行,请确保Ollama已安装并正在运行。")
mcp.run()
if __name__ == "__main__":
run()
第三版,使用前先在环境里
pip install mcp langchain-community
依赖安装换成了requirements.txt的方式。使用前请先python运行一下该python然后生成配置文件,在配置文件里修改ollama模型
import os
import re
import sys
import json
import logging
import subprocess
import datetime
from pathlib import Path
from typing import Dict, Any, Optional, Set, Tuple
# 导入 MCP 核心库
from mcp.server.fastmcp import FastMCP, Context
# 导入 LangChain 和 Pydantic
from langchain_community.llms import Ollama
from pydantic import BaseModel, Field
# --- 全局日志配置 (用于服务本身) ---
# 这个 logger 用于记录服务启动、关闭等非任务相关的核心日志
global_logger = logging.getLogger("mcp_service")
if not global_logger.handlers:
global_logger.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
global_logger.addHandler(handler)
# --- MCP 服务器定义 ---
mcp = FastMCP(name="SkyShadowHero Task Execution Server")
# --- Pydantic 返回模型 ---
class ExecutionStage(BaseModel):
code_generation: str
dependency_installation: str
execution: str
class ExecutionResult(BaseModel):
stages: ExecutionStage
code: str
output: str
error: str
work_dir: str
returncode: int
class ServerInfo(BaseModel):
name: str
model: Optional[str]
status: str
class CommandOutput(BaseModel):
status: str
result: ExecutionResult
server: ServerInfo
# --- 配置管理 ---
class MCPConfig:
_instance = None
config_path = Path(__file__).parent / "mcp_config.json"
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance.load_config()
return cls._instance
def load_config(self):
if not self.config_path.exists():
self.config = {
"mcpServers": {
"default": {
"model": "deepseek-coder-v2:16b",
"task_templates": {
"file_operations": {
"prompt": (
"你是一个顶级的Python自动化专家。你的任务是根据用户的自然语言指令,生成一段完整、健壮、可直接在标准Python环境中执行的脚本。你拥有完全的创作自由,但必须严格遵守以下规范。\n\n"
"## 用户指令:\n"
"{task}\n\n"
"## 代码生成规范 (必须严格遵守):\n"
"1. **【代码纯净性】**: 你的输出必须是纯粹的Python代码。绝对禁止包含任何Markdown标记,尤其是 ` ```python ` 和 ` ``` `。\n"
"2. **【依赖声明】**: 如果代码需要任何第三方库 (例如 `requests`, `pandas`),必须在代码的最开始,使用 `# REQUIRE: ` 的格式进行声明。**每个依赖独立一行**。如果不需要任何第三方库,则完全不要写 `# REQUIRE:` 注释。\n"
"3. **【日志记录】**: 必须使用Python的 `logging` 模块。在脚本开始处配置好 `basicConfig`,确保日志同时输出到控制台(stdout)和当前工作目录下的 `task.log` 文件。在关键步骤和任何 `except` 块中,都必须使用 `logging.info()` 或 `logging.error()` 进行记录。\n"
"4. **【错误处理】**: 所有可能失败的操作都必须被包含在 `try...except Exception as e:` 块中。\n"
"5. **【成功信号】**: 在脚本所有操作成功完成的最后,必须调用 `print(\"任务成功完成\")`。\n"
"6. **【完整性】**: 生成的代码必须是完整的、自包含的,包含所有必要的 `import` 语句。\n\n"
"现在,请根据用户指令生成代码。"
),
}
}
}
}
}
self.save_config()
else:
with open(self.config_path, 'r', encoding='utf-8') as f:
self.config = json.load(f)
def save_config(self):
with open(self.config_path, 'w', encoding='utf-8') as f:
json.dump(self.config, f, indent=2, ensure_ascii=False)
def get_server(self, name: str) -> Optional[Dict[str, Any]]:
return self.config.get("mcpServers", {}).get(name)
# --- 【最终修复架构】核心逻辑封装 ---
class TaskWorkflow:
def __init__(self):
self.config = MCPConfig().config
self.llm_cache = {}
self.standard_libs = self._get_standard_libs()
script_dir = Path(__file__).parent.resolve()
self.shared_work_dir = script_dir / "mcp_tasks"
self.shared_work_dir.mkdir(exist_ok=True)
global_logger.info("正在初始化并检查共享虚拟环境...")
try:
self.venv_path = self.shared_work_dir / "venv"
self.python_executable, self.pip_executable = self._create_virtual_env(str(self.venv_path))
global_logger.info(f"共享虚拟环境已就绪。Python: {self.python_executable}, Pip: {self.pip_executable}")
except Exception as e:
global_logger.error(f"初始化共享虚拟环境失败: {e}", exc_info=True)
raise RuntimeError(f"无法创建或验证共享虚拟环境,服务无法启动。错误: {e}")
def _get_standard_libs(self) -> Set[str]:
common_libs = {'os', 'sys', 'json', 're', 'logging', 'subprocess', 'pathlib', 'datetime', 'time', 'math', 'random', 'collections', 'itertools', 'functools', 'glob', 'shutil', 'tempfile', 'argparse', 'typing', '__future__'}
if sys.version_info >= (3, 10):
try:
from sys import stdlib_module_names
return set(stdlib_module_names)
except ImportError:
return common_libs
return common_libs
async def get_llm(self, model_name: str) -> Ollama:
if model_name not in self.llm_cache:
global_logger.info(f"正在加载模型: {model_name}")
self.llm_cache[model_name] = Ollama(model=model_name, temperature=0.1, top_p=0.9, timeout=300)
return self.llm_cache[model_name]
def _create_virtual_env(self, venv_path_str: str) -> Tuple[str, str]:
venv_path = Path(venv_path_str)
if sys.platform == "win32":
python_exe = venv_path / 'Scripts' / 'python.exe'
pip_exe = venv_path / 'Scripts' / 'pip.exe'
else:
python_exe = venv_path / 'bin' / 'python'
pip_exe = venv_path / 'bin' / 'pip'
if not python_exe.exists() or not pip_exe.exists():
global_logger.info(f"共享虚拟环境不完整或不存在,正在创建于: {venv_path_str}")
try:
subprocess.run(
[sys.executable, "-m", "venv", venv_path_str],
check=True, capture_output=True, text=True, timeout=120
)
except subprocess.CalledProcessError as e:
global_logger.error(f"创建虚拟环境失败: {e.stderr}")
raise RuntimeError(f"创建虚拟环境失败,错误: {e.stderr}")
if not python_exe.exists() or not pip_exe.exists():
raise FileNotFoundError(f"虚拟环境创建后,未找到 Python/Pip 可执行文件。")
global_logger.info("虚拟环境验证成功。")
return str(python_exe), str(pip_exe)
def _post_process_code(self, generated_code: str) -> Tuple[str, Set[str]]:
cleaned_code = re.sub(r"```python\n|```", "", generated_code).strip()
required_deps = set(re.findall(r"#\s*REQUIRE:\s*(\S+)", cleaned_code))
final_code = "\n".join([line for line in cleaned_code.split('\n') if not line.strip().startswith("# REQUIRE:")])
global_logger.info(f"代码后处理完成。提取的依赖: {required_deps or '无'}。")
return final_code.strip(), required_deps
def _install_dependencies(self, deps: Set[str], task_work_dir: Path):
if not deps:
global_logger.info("代码中未发现 # REQUIRE: 声明,跳过依赖安装。")
return
deps_to_install = {dep for dep in deps if dep.lower() not in self.standard_libs}
if not deps_to_install:
global_logger.info(f"所有声明的依赖 {list(deps)} 均为标准库,无需安装。")
return
# 确保文件名是 requirements.txt
requirements_path = task_work_dir / "requirements.txt"
with open(requirements_path, 'w', encoding='utf-8') as f:
for dep in deps_to_install:
f.write(f"{dep}\n")
global_logger.info(f"已生成依赖文件: {requirements_path}")
command = [self.pip_executable, "install", "-r", str(requirements_path)]
global_logger.info(f"执行依赖安装命令: {' '.join(command)}")
result = subprocess.run(
command,
cwd=str(task_work_dir),
capture_output=True,
text=True,
timeout=300,
check=False,
encoding='utf-8',
)
if result.returncode != 0:
error_message = f"依赖安装失败: {result.stderr}"
global_logger.error(error_message)
raise RuntimeError(error_message)
global_logger.info(f"依赖 {list(deps_to_install)} 安装成功。")
def _execute_code(self, code_to_execute: str, task_work_dir: str) -> Dict[str, Any]:
script_name = "generated_script.py"
code_path = os.path.join(task_work_dir, script_name)
with open(code_path, "w", encoding="utf-8") as f:
f.write(code_to_execute)
global_logger.info(f"最终执行的脚本已保存至: {code_path}")
command = [self.python_executable, script_name]
global_logger.info(f"执行代码命令: {' '.join(command)}")
result = subprocess.run(
command,
cwd=task_work_dir,
capture_output=True,
text=True,
timeout=300,
check=False,
encoding='utf-8',
)
return {"output": result.stdout, "error": result.stderr, "returncode": result.returncode}
def _is_direct_command(self, instruction: str) -> bool:
common_commands = ['python', 'pip', 'uv', 'bash', 'sh', 'ls', 'rm', 'cp', 'mv', 'mkdir']
return any(instruction.strip().startswith(cmd) for cmd in common_commands)
def _execute_direct_command(self, command: str, task_work_dir: str) -> Dict[str, Any]:
global_logger.info(f"检测到直接命令,将在虚拟环境中执行: {command}")
if sys.platform == "win32":
activate_script = Path(self.python_executable).parent / "activate.bat"
full_command = f'call "{activate_script}" && {command}'
else:
activate_script = Path(self.python_executable).parent / "activate"
full_command = f'. "{activate_script}" && {command}'
result = subprocess.run(
full_command,
cwd=task_work_dir,
capture_output=True,
text=True,
timeout=300,
check=False,
encoding='utf-8',
shell=True,
executable='/bin/bash' if sys.platform != "win32" else None
)
return {"output": result.stdout, "error": result.stderr, "returncode": result.returncode}
async def run_workflow(self, instruction: str, server_name: str, ctx: Context):
result = {
"stages": {
"code_generation": "pending",
"dependency_installation": "pending",
"execution": "pending"
},
"code": "", "output": "", "error": "", "work_dir": "", "returncode": -1
}
try:
timestamp = datetime.datetime.now().strftime("task_%Y%m%d_%H%M%S")
task_work_dir = self.shared_work_dir / timestamp
task_work_dir.mkdir(exist_ok=True)
result["work_dir"] = str(task_work_dir)
global_logger.info(f"任务 '{timestamp}' 启动,指令: '{instruction}'")
await ctx.info(f"任务工作目录已创建: {task_work_dir}")
# --- 指令嗅探和模式切换 ---
if self._is_direct_command(instruction):
await ctx.info("检测到直接命令模式。")
result["stages"]["code_generation"] = "skipped (direct command)"
result["stages"]["dependency_installation"] = "skipped (direct command)"
result["code"] = f"# Direct Command Execution\n{instruction}"
result["stages"]["execution"] = "pending"
await ctx.info(f"正在直接执行命令: {instruction}")
exec_result = self._execute_direct_command(instruction, str(task_work_dir))
result.update(exec_result)
is_successful = exec_result.get("returncode") == 0
result["stages"]["execution"] = "success" if is_successful else "failed"
final_status = "success" if is_successful else "failed"
global_logger.info(f"直接命令执行完成。状态: {final_status}。")
return {"status": final_status, "result": result, "server": {"name": server_name, "model": "N/A (Direct Command)", "status": "active"}}
# --- 如果不是直接命令,则执行原有的 LLM 工作流 ---
await ctx.info("进入 LLM 代码生成模式。")
server_config = self.config.get("mcpServers", {}).get(server_name)
if not server_config: raise ValueError(f"服务器 '{server_name}' 未配置")
template = server_config["task_templates"]["file_operations"]
prompt = template["prompt"].format(task=instruction)
await ctx.info(f"正在使用模型 '{server_config['model']}' 生成代码...")
llm = await self.get_llm(server_config['model'])
generated_code = await llm.ainvoke(prompt)
result["stages"]["code_generation"] = "success"
await ctx.info("代码生成成功。")
pure_code, dependencies = self._post_process_code(generated_code)
result["code"] = pure_code
result["stages"]["dependency_installation"] = "pending"
await ctx.info(f"正在检查并安装依赖: {dependencies or '无'}")
self._install_dependencies(dependencies, task_work_dir)
result["stages"]["dependency_installation"] = "success"
await ctx.info("所有依赖已就绪。")
result["stages"]["execution"] = "pending"
await ctx.info("正在执行生成的代码...")
exec_result = self._execute_code(pure_code, str(task_work_dir))
result.update(exec_result)
is_successful = "任务成功完成" in exec_result.get("output", "")
result["stages"]["execution"] = "success" if is_successful else "failed"
final_status = "success" if is_successful else "failed"
global_logger.info(f"代码执行完成。状态: {final_status}。")
return {"status": final_status, "result": result, "server": {"name": server_name, "model": server_config.get("model"), "status": "active"}}
except Exception as e:
current_stage = next((s for s, status in result["stages"].items() if status == "pending"), "unknown")
result["stages"][current_stage] = "failed"
error_message = f"在 '{current_stage}' 阶段失败: {e}"
result["error"] = error_message
global_logger.error(error_message, exc_info=True)
await ctx.error(error_message)
for stage, status in result["stages"].items():
if status == "pending":
result["stages"][stage] = "skipped"
return {"status": "failed", "result": result, "server": {"name": server_name, "model": self.config.get("mcpServers", {}).get(server_name, {}).get("model"), "status": "error"}}
# --- 单例和工具定义 ---
workflow_executor = TaskWorkflow()
@mcp.tool()
async def do_it_using_python(
instruction: str = Field(
description="用户用自然语言下达的、需要在本地计算机上执行的具体任务指令。例如:'在桌面上创建一个名为'todo.txt'的文件' 或 '将/home/user/docs目录下的所有.log文件压缩成一个zip包'。"
),
server: str = Field(default="default", description="要使用的服务器配置名称。"),
ctx: Context = Field(exclude=True)
) -> CommandOutput:
"""
当用户需要执行任何与文件、文件夹或系统相关的本地操作时,请使用此工具。
此工具接收一条自然语言指令,将其转换为可执行的Python代码,并在安全的虚拟环境中运行,然后返回详细的执行结果。
"""
try:
await ctx.info(f"收到指令,开始处理: '{instruction}'")
result_dict = await workflow_executor.run_workflow(
instruction=instruction,
server_name=server,
ctx=ctx
)
await ctx.info("任务流程执行完毕。")
return CommandOutput.model_validate(result_dict)
except Exception as e:
global_logger.error(f"执行工具时发生严重错误: {e}", exc_info=True)
await ctx.error(f"执行工具时发生严重错误: {e}")
return CommandOutput(
status="failed",
result=ExecutionResult(
stages=ExecutionStage(
code_generation="failed",
dependency_installation="skipped",
execution="skipped"
),
code="",
output="",
error=f"执行工具时发生顶层错误: {e}",
work_dir="",
returncode=-1
),
server=ServerInfo(name=server, model=None, status="error")
)
# --- 服务器启动---
def run():
"""
服务器主入口函数。
"""
try:
subprocess.run(["ollama", "list"], check=True, capture_output=True, text=True)
global_logger.info("Ollama 服务已在运行。")
except (subprocess.CalledProcessError, FileNotFoundError):
global_logger.warning("Ollama服务未运行,请确保Ollama已安装并正在运行。")
mcp.run()
if __name__ == "__main__":
run()
第四版。添加了api模式,并且把生成代码,安装依赖和执行代码分开了,现在需要 pip install fastmcp langchain-community langchain-openai httpx
,使用前先删掉之前的mcp_config.json文件并运行一下python重新生成配置文件,生成的配置文件差不多长这样,其中如果provider为ollama的话,model为必填,目前ollama有点问题,如果要使用ollama的话建议用第三版
"llm_config": {
"provider": "ollama或api",
"model": "deepseek-coder-v2:16b",
"api_key": "YOUR_API_KEY_HERE (if provider is 'api')",
"base_url": "YOUR_API_BASE_URL_HERE (if provider is 'api')"
}
import os
import re
import sys
import json
import logging
import subprocess
import datetime
from pathlib import Path
from typing import Dict, Any, Optional, Set, Tuple, List
# 导入 MCP 核心库
from mcp.server.fastmcp import FastMCP, Context
# 导入 LangChain 和 Pydantic
from langchain_community.llms import Ollama
from langchain_openai import ChatOpenAI
from langchain_core.language_models.base import BaseLanguageModel
from pydantic import BaseModel, Field
# 导入 httpx 用于创建自定义网络客户端
import httpx
# --- 提示词定义 ---
CODE_GENERATION_PROMPT = """
你是一个顶级的Python自动化专家 。你的任务是根据用户的自然语言指令,生成一段完整、健壮、可直接在标准Python环境中执行的脚本。你拥有完全的创作自由,但必须严格遵守以下规范。
## 用户指令:
{task}
## 代码生成规范 (必须严格遵守):
1. **【代码纯净性】**: 你的输出必须是纯粹的Python代码。绝对禁止包含任何Markdown标记,尤其是 ` ```python ` 和 ` ``` `。
2. **【依赖声明】**: 如果代码需要任何第三方库 (例如 `requests`, `pandas`),必须在代码的最开始,使用 `# REQUIRE: ` 的格式进行声明。**每个依赖独立一行**。如果不需要任何第三方库,则完全不要写 `# REQUIRE:` 注释。
3. **【日志记录】**: 必须使用Python的 `logging` 模块。在脚本开始处配置好 `basicConfig`,确保日志同时输出到控制台(stdout)和当前工作目录下的 `task.log` 文件。在关键步骤和任何 `except` 块中,都必须使用 `logging.info()` 或 `logging.error()` 进行记录。
4. **【错误处理】**: 所有可能失败的操作都必须被包含在 `try...except Exception as e:` 块中。
5. **【成功信号】**: 在脚本所有操作成功完成的最后,必须调用 `print("任务成功完成")`。
6. **【完整性】**: 生成的代码必须是完整的、自包含的,包含所有必要的 `import` 语句。
现在,请根据用户指令生成代码。
"""
# --- 全局日志配置 ---
global_logger = logging.getLogger("mcp_service")
if not global_logger.handlers:
global_logger.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
global_logger.addHandler(handler)
# --- MCP 服务器定义 ---
mcp = FastMCP(name="SkyShadowHero Task Execution Server")
# --- Pydantic 返回模型 ---
class CodeGenerationResult(BaseModel):
status: str = Field(description="代码生成阶段的状态 ('success' 或 'failed')")
code: str = Field(description="生成或提供的Python代码")
dependencies: List[str] = Field(description="从代码中提取的依赖库列表")
work_dir: str = Field(description="为本次任务创建的工作目录路径")
error: Optional[str] = Field(None, description="如果生成失败,此字段包含错误信息")
class DependencyInstallationResult(BaseModel):
status: str = Field(description="依赖安装阶段的状态 ('success', 'failed', 或 'skipped')")
installed_packages: List[str] = Field(description="成功安装的包列表")
work_dir: str = Field(description="执行安装的工作目录")
output: str = Field(description="pip install 命令的输出")
error: Optional[str] = Field(None, description="如果安装失败,此字段包含错误信息")
class ExecutionResult(BaseModel):
status: str = Field(description="代码执行阶段的状态 ('success' 或 'failed')")
output: str = Field(description="脚本执行的标准输出 (stdout)")
error: str = Field(description="脚本执行的标准错误 (stderr)")
returncode: int = Field(description="脚本执行的返回码")
work_dir: str = Field(description="执行代码的工作目录")
# --- 配置管理 (MCPConfig) ---
class MCPConfig:
_instance = None
config_path = Path(__file__).parent / "mcp_config.json"
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance.load_config()
return cls._instance
def load_config(self):
if not self.config_path.exists():
self.config = {
"llm_config": {
"provider": "ollama",
"model": "deepseek-coder-v2:16b",
"api_key": "YOUR_API_KEY_HERE (if provider is 'api')",
"base_url": "YOUR_API_BASE_URL_HERE (if provider is 'api')"
}
}
self.save_config()
else:
with open(self.config_path, 'r', encoding='utf-8') as f:
self.config = json.load(f)
def save_config(self):
with open(self.config_path, 'w', encoding='utf-8') as f:
json.dump(self.config, f, indent=4, ensure_ascii=False)
def get_llm_config(self) -> Optional[Dict[str, Any]]:
return self.config.get("llm_config")
# --- 核心逻辑封装 (TaskWorkflow) ---
class TaskWorkflow:
def __init__(self):
self.llm_config = MCPConfig().get_llm_config()
self.llm_instance: Optional[BaseLanguageModel] = None
self.standard_libs = self._get_standard_libs()
script_dir = Path(__file__).parent.resolve()
self.shared_work_dir = script_dir / "mcp_tasks"
self.shared_work_dir.mkdir(exist_ok=True)
global_logger.info("正在初始化并检查共享虚拟环境...")
try:
self.venv_path = self.shared_work_dir / "venv"
self.python_executable, self.pip_executable = self._create_virtual_env(str(self.venv_path))
global_logger.info(f"共享虚拟环境已就绪。Python: {self.python_executable}, Pip: {self.pip_executable}")
except Exception as e:
global_logger.error(f"初始化共享虚拟环境失败: {e}", exc_info=True)
raise RuntimeError(f"无法创建或验证共享虚拟环境,服务无法启动。错误: {e}")
def _get_standard_libs(self) -> Set[str]:
common_libs = {'os', 'sys', 'json', 're', 'logging', 'subprocess', 'pathlib', 'datetime', 'time', 'math', 'random', 'collections', 'itertools', 'functools', 'glob', 'shutil', 'tempfile', 'argparse', 'typing', '__future__'}
if sys.version_info >= (3, 10):
try:
from sys import stdlib_module_names
return set(stdlib_module_names)
except ImportError:
return common_libs
return common_libs
def _create_virtual_env(self, venv_path_str: str) -> Tuple[str, str]:
venv_path = Path(venv_path_str)
if sys.platform == "win32":
python_exe = venv_path / 'Scripts' / 'python.exe'
pip_exe = venv_path / 'Scripts' / 'pip.exe'
else:
python_exe = venv_path / 'bin' / 'python'
pip_exe = venv_path / 'bin' / 'pip'
if not python_exe.exists() or not pip_exe.exists():
global_logger.info(f"共享虚拟环境不完整或不存在,正在创建于: {venv_path_str}")
try:
subprocess.run([sys.executable, "-m", "venv", venv_path_str], check=True, capture_output=True, text=True, timeout=120)
except subprocess.CalledProcessError as e:
global_logger.error(f"创建虚拟环境失败: {e.stderr}")
raise RuntimeError(f"创建虚拟环境失败,错误: {e.stderr}")
if not python_exe.exists() or not pip_exe.exists():
raise FileNotFoundError(f"虚拟环境创建后,未找到 Python/Pip 可执行文件。")
global_logger.info("虚拟环境验证成功。")
return str(python_exe), str(pip_exe)
def _post_process_code(self, generated_code: str) -> Tuple[str, Set[str]]:
cleaned_code = re.sub(r"```python\n|```", "", generated_code).strip()
required_deps = set(re.findall(r"#\s*REQUIRE:\s*(\S+)", cleaned_code))
final_code = "\n".join([line for line in cleaned_code.split('\n') if not line.strip().startswith("# REQUIRE:")])
global_logger.info(f"代码后处理完成。提取的依赖: {required_deps or '无'}。")
return final_code.strip(), required_deps
def _create_task_work_dir(self) -> Path:
timestamp = datetime.datetime.now().strftime("task_%Y%m%d_%H%M%S")
task_work_dir = self.shared_work_dir / timestamp
task_work_dir.mkdir(exist_ok=True)
global_logger.info(f"任务工作目录已创建: {task_work_dir}")
return task_work_dir
async def get_llm(self) -> BaseLanguageModel:
"""
根据单一的LLM配置获取并缓存LLM实例。
通过创建自定义httpx客户端来禁用系统代理 。
"""
if self.llm_instance:
return self.llm_instance
if not self.llm_config:
raise ValueError("LLM 配置 'llm_config' 在 mcp_config.json 中未找到。")
provider = self.llm_config.get("provider")
model_name = self.llm_config.get("model")
global_logger.info(f"正在加载模型: {model_name} (Provider: {provider})")
# ⭐️ 关键修复:创建同步的 httpx.Client 实例 ,而不是异步的
# trust_env=False 会让 httpx 忽略所有代理环境变量 (HTTP_PROXY, HTTPS_PROXY等 )
sync_client = httpx.Client(trust_env=False )
llm: BaseLanguageModel
if provider == "ollama":
if not model_name: raise ValueError("当 provider 是 'ollama' 时, 必须在配置中提供 'model'。")
llm = Ollama(
model=model_name,
temperature=0.1,
top_p=0.9,
timeout=300,
# 传入正确的同步客户端实例
http_client=sync_client
)
elif provider == "api":
api_key = self.llm_config.get("api_key")
base_url = self.llm_config.get("base_url")
if not model_name: raise ValueError("当 provider 是 'api' 时, 必须在配置中提供 'model'。")
if not api_key or "YOUR_" in api_key:
raise ValueError("当 provider 是 'api' 时, 必须在 mcp_config.json 中设置有效的 'api_key'。")
if not base_url or "YOUR_" in base_url:
raise ValueError("当 provider 是 'api' 时, 必须在 mcp_config.json 中设置有效的 'base_url'。")
llm = ChatOpenAI(
model=model_name,
api_key=api_key,
base_url=base_url,
temperature=0.1,
max_retries=2,
timeout=300,
# 传入正确的同步客户端实例
http_client=sync_client
)
else:
raise ValueError(f"不支持的 provider: '{provider}'。请在 mcp_config.json 中选择 'ollama' 或 'api'。")
self.llm_instance = llm
return llm
# --- 单例和工具定义 ---
workflow_executor = TaskWorkflow()
@mcp.tool()
async def generate_code(
instruction: str = Field(description="用户的自然语言指令,用于生成Python代码。"),
ctx: Context = Field(exclude=True)
) -> CodeGenerationResult:
"""
第一步:根据用户的自然语言指令,使用配置好的LLM生成可执行的Python代码。
此工具会创建一个工作目录,生成代码,并从中提取依赖项。
"""
await ctx.info(f"收到代码生成指令: '{instruction}'")
try:
task_work_dir = workflow_executor._create_task_work_dir()
await ctx.info(f"工作目录已创建: {task_work_dir}")
llm = await workflow_executor.get_llm()
prompt = CODE_GENERATION_PROMPT.format(task=instruction)
model_name = workflow_executor.llm_config.get('model')
await ctx.info(f"正在使用模型 '{model_name}' 生成代码...")
response = await llm.ainvoke(prompt)
generated_code = response.content if hasattr(response, 'content') else str(response)
await ctx.info("代码生成成功。")
pure_code, dependencies = workflow_executor._post_process_code(generated_code)
code_path = task_work_dir / "generated_script.py"
with open(code_path, "w", encoding="utf-8") as f:
f.write(pure_code)
global_logger.info(f"生成的代码已保存至: {code_path}")
return CodeGenerationResult(
status="success",
code=pure_code,
dependencies=list(dependencies),
work_dir=str(task_work_dir)
)
except Exception as e:
error_message = f"代码生成失败: {e}"
global_logger.error(error_message, exc_info=True)
await ctx.error(error_message)
return CodeGenerationResult(
status="failed", code="", dependencies=[], work_dir="", error=error_message
)
@mcp.tool()
async def install_dependencies(dependencies: List[str] = Field(description="需要安装的Python包列表。"), work_dir: str = Field(description="必须提供由 'generate_code' 工具返回的工作目录路径。"), ctx: Context = Field(exclude=True)) -> DependencyInstallationResult:
"""
第二步:在指定工作目录的虚拟环境中安装Python依赖项。
如果依赖列表为空,则跳过此步骤。
"""
await ctx.info(f"收到依赖安装指令: {dependencies} in '{work_dir}'")
task_work_dir = Path(work_dir)
if not task_work_dir.exists():
return DependencyInstallationResult(status="failed", installed_packages=[], work_dir=work_dir, output="", error="工作目录不存在!")
if not dependencies:
await ctx.info("依赖列表为空,跳过安装。")
return DependencyInstallationResult(status="skipped", installed_packages=[], work_dir=work_dir, output="No dependencies to install.")
deps_to_install = {dep for dep in dependencies if dep.lower() not in workflow_executor.standard_libs}
if not deps_to_install:
await ctx.info(f"所有声明的依赖 {dependencies} 均为标准库,无需安装。")
return DependencyInstallationResult(status="skipped", installed_packages=[], work_dir=work_dir, output="All dependencies are standard libraries.")
try:
requirements_path = task_work_dir / "requirements.txt"
with open(requirements_path, 'w', encoding='utf-8') as f:
for dep in deps_to_install:
f.write(f"{dep}\n")
command = [workflow_executor.pip_executable, "install", "-r", str(requirements_path)]
await ctx.info(f"执行依赖安装命令: {' '.join(command)}")
result = subprocess.run(command, cwd=str(task_work_dir), capture_output=True, text=True, timeout=300, check=False, encoding='utf-8')
if result.returncode != 0:
error_message = f"依赖安装失败: {result.stderr}"
global_logger.error(error_message)
await ctx.error(error_message)
return DependencyInstallationResult(status="failed", installed_packages=[], work_dir=work_dir, output=result.stdout, error=result.stderr)
await ctx.info(f"依赖 {list(deps_to_install)} 安装成功。")
return DependencyInstallationResult(status="success", installed_packages=list(deps_to_install), work_dir=work_dir, output=result.stdout)
except Exception as e:
error_message = f"安装依赖时发生意外错误: {e}"
global_logger.error(error_message, exc_info=True)
await ctx.error(error_message)
return DependencyInstallationResult(status="failed", installed_packages=[], work_dir=work_dir, output="", error=error_message)
@mcp.tool()
async def execute_code(work_dir: str = Field(description="必须提供由 'generate_code' 工具返回的、包含 'generated_script.py' 的工作目录路径。"), ctx: Context = Field(exclude=True)) -> ExecutionResult:
"""
第三步:在指定工作目录的虚拟环境中执行 'generated_script.py' 文件。
这个工具应该在代码生成和依赖安装(如果需要)之后被调用。
"""
await ctx.info(f"收到代码执行指令 in '{work_dir}'")
task_work_dir = Path(work_dir)
script_path = task_work_dir / "generated_script.py"
if not script_path.exists():
error_msg = f"执行失败: 在工作目录 '{work_dir}' 中未找到 'generated_script.py'。"
await ctx.error(error_msg)
return ExecutionResult(status="failed", output="", error=error_msg, returncode=-1, work_dir=work_dir)
try:
command = [workflow_executor.python_executable, str(script_path)]
await ctx.info(f"执行代码命令: {' '.join(command)}")
result = subprocess.run(command, cwd=str(task_work_dir), capture_output=True, text=True, timeout=300, check=False, encoding='utf-8')
is_successful = "任务成功完成" in result.stdout
final_status = "success" if is_successful and result.returncode == 0 else "failed"
await ctx.info(f"代码执行完成。状态: {final_status}。")
return ExecutionResult(status=final_status, output=result.stdout, error=result.stderr, returncode=result.returncode, work_dir=work_dir)
except Exception as e:
error_message = f"执行代码时发生意外错误: {e}"
global_logger.error(error_message, exc_info=True)
await ctx.error(error_message)
return ExecutionResult(status="failed", output="", error=error_message, returncode=-1, work_dir=work_dir)
# --- 服务器启动---
def run():
"""
服务器主入口函数。
"""
try:
subprocess.run(["ollama", "list"], check=True, capture_output=True, text=True)
global_logger.info("Ollama 服务已在运行。")
except (subprocess.CalledProcessError, FileNotFoundError):
global_logger.warning("Ollama服务可能未运行。如果需要使用 'ollama' provider,请确保Ollama已安装并正在运行。")
mcp.run()
if __name__ == "__main__":
run()
Popular Ranking
ChangePopular Events
More
一个MCP
最近看到了UOS AI更新了mcp,逛论坛还发现MCP的活动,那就用AI做了一个mcp应用,接入了UOS AI
功能很简单,就是我发送要求,他就把要求发给本地ollama的模型,我用的deepseek-coder-v2:16b,然后它就会生成python代码以实现这个功能,并安装相应依赖,最后自动执行。
这下有什么要求,只要python能完成,就能交给UOSAI了!
【工具名】:MCP+Ollama+Python全能工具
【应用场景】:生活娱乐
办公效率 / 设计创作 / 开发工具之类的也行【功能与使用方法】:
复制评论区楼主发的python代码并命名为main.py(可能会更新),创建环境,安装相应依赖,然后在UOS AI里导入
这个需要本地的ollama模型来生成代码,所以请配置好ollama和在此python生成的配置文件里配置好模型
如果有bug的话找llm修一修
如果不想用ollama,想用其他的api的话也找llm改改
下面的GIF【5倍速,超低画质(里面第一个bilibili小图标是蓝色的,生成的是粉色的)】是一个示例
还有图片
svg图片换色
废话文字生成
抓取bing美图到本地
将MiSans字体中的天影大侠提取出来绘制为白色图片并添加黑色边框
(凑合吧)
运行时会创建一个文件夹放生成的py,同时会使用一个共用的env,
但env安装依赖还没有做好,目前依赖能安装了,deepseek在执行命令发现依赖问题时会尝试pip install xxx,把这个指令单独弄了出来识别并执行目前会在工作文件夹里生成py、log和requirements.txt文件,运行基本正常
这个帖子也是抛砖引玉,展现UOS Ai用上MCP后的可能的高级玩法
【推荐指数】:★★★★
(因为好玩且配置简单)
顺便分享一下llm构建mcp的教程
使用 LLM 构建 MCP