[Official Events] #MCP 探索家# MCP场景发现大挑战 MCP+Ollama+Python创造新玩法!
Tofloor
poster avatar
SkyShadowHero
deepin
2025-08-06 13:08
Author

一个MCP

最近看到了UOS AI更新了mcp,逛论坛还发现MCP的活动,那就用AI做了一个mcp应用,接入了UOS AI

功能很简单,就是我发送要求,他就把要求发给本地ollama的模型,我用的deepseek-coder-v2:16b,然后它就会生成python代码以实现这个功能,并安装相应依赖,最后自动执行。

这下有什么要求,只要python能完成,就能交给UOSAI了!

【工具名】:MCP+Ollama+Python全能工具

【应用场景】:生活娱乐

办公效率 / 设计创作 / 开发工具之类的也行

【功能与使用方法】:

复制评论区楼主发的python代码并命名为main.py(可能会更新),创建环境,安装相应依赖,然后在UOS AI里导入

{
  "mcpServers": {
    "一个名字": {
      "command": "你的路径/.venv/bin/python",
      "args": [
        "你的路径/main.py"
      ]
    }
  }
}

这个需要本地的ollama模型来生成代码,所以请配置好ollama和在此python生成的配置文件里配置好模型

如果有bug的话找llm修一修

如果不想用ollama,想用其他的api的话也找llm改改doubt

下面的GIF【5倍速,超低画质(里面第一个bilibili小图标是蓝色的,生成的是粉色的)】是一个示例

freecompress-rotate-录屏_deepin-terminal_20250806121045.gif

还有图片

svg图片换色

影院20250806122425.jpg

影院20250806130942.jpg

废话文字生成

影院20250806122510.jpg

抓取bing美图到本地

截图_deepin-image-viewer_20250806141235.png

将MiSans字体中的天影大侠提取出来绘制为白色图片并添加黑色边框

(凑合吧)

图片.png

运行时会创建一个文件夹放生成的py,同时会使用一个共用的env,但env安装依赖还没有做好, 目前依赖能安装了,deepseek在执行命令发现依赖问题时会尝试pip install xxx,把这个指令单独弄了出来识别并执行

目前会在工作文件夹里生成py、log和requirements.txt文件,运行基本正常

图片.png

这个帖子也是抛砖引玉,展现UOS Ai用上MCP后的可能的高级玩法

【推荐指数】:★★★★

(因为好玩且配置简单)

顺便分享一下llm构建mcp的教程

使用 LLM 构建 MCP

Reply Favorite View the author
All Replies
深蓝色
deepin
2025-08-06 13:18
#1

端云协同了like

Reply View the author
deepin小助手
Super Moderator
OM
2025-08-06 13:22
#2

太酷啦 yeah

Reply View the author
SkyShadowHero
deepin
2025-08-06 13:23
#3
深蓝色

端云协同了like

其实我挺想所有都在本地运行的,但我用的本地大模型似乎用不了集成的mcpsweat

Reply View the author
芝士堡
deepin
2025-08-06 13:26
#4
SkyShadowHero

其实我挺想所有都在本地运行的,但我用的本地大模型似乎用不了集成的mcpsweat

应该很快本地小模型能力也够了

Reply View the author
深蓝色
deepin
2025-08-06 13:27
#5
SkyShadowHero

其实我挺想所有都在本地运行的,但我用的本地大模型似乎用不了集成的mcpsweat

端侧也需要大参数量的模型才能支撑起智能体复杂的任务分析、规划与工具调用的工作。

Reply View the author
黑桃老K
deepin
2025-08-06 13:27
#6

太牛了,玩MCP的乐趣瞬间就感觉到了。

Reply View the author
SkyShadowHero
deepin
2025-08-06 13:39
#7
黑桃老K

太牛了,玩MCP的乐趣瞬间就感觉到了。

哈哈是挺好玩的,找ai写集成ai的东西来玩,而且python能做好多有趣的东西

Reply View the author
一头牛
deepin product team
2025-08-06 13:53
#8

动手能力太强了老哥👍

Reply View the author
SkyShadowHero
deepin
2025-08-06 13:57
#9

第一版python文件,目前依赖安装无法使用

import os
import re
import sys
import json
import logging
import subprocess
import datetime
from pathlib import Path
from typing import Dict, Any, Optional, Tuple, Set

# 导入 MCP 核心库
from mcp.server.fastmcp import FastMCP, Context

# 导入 LangChain 和 Pydantic
from langchain_community.llms import Ollama
from pydantic import BaseModel, Field

# --- 日志配置 ---
logger = logging.getLogger(__name__)

# --- MCP 服务器定义 ---
mcp = FastMCP(
    name="SkyShadowHero Task Execution Server",
)

# --- Pydantic 返回模型 ---
class ExecutionStage(BaseModel):
    code_generation: str
    dependency_installation: str
    execution: str

class ExecutionResult(BaseModel):
    stages: ExecutionStage
    code: str
    output: str
    error: str
    work_dir: str
    returncode: int

class ServerInfo(BaseModel):
    name: str
    model: Optional[str]
    status: str

class CommandOutput(BaseModel):
    status: str
    result: ExecutionResult
    server: ServerInfo


# --- 配置管理 ---
class MCPConfig:
    _instance = None
    config_path = Path(__file__).parent / "mcp_config.json"

    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance.load_config()
        return cls._instance

    def load_config(self):
        if not self.config_path.exists():
            self.config = {
                "mcpServers": {
                    "default": {
                        "model": "deepseek-coder-v2:16b",
                        "task_templates": {
                            "file_operations": {
                                "prompt": (
                                    "你是一个顶级的Python自动化专家。你的任务是根据用户的自然语言指令,生成一段完整、健壮、可直接在标准Python环境中执行的脚本。你拥有完全的创作自由,但必须严格遵守以下规范。\n\n"
                                    "## 用户指令:\n"
                                    "{task}\n\n"
                                    "## 代码生成规范 (必须严格遵守):\n"
                                    "1.  **【代码纯净性】**: 你的输出必须是纯粹的Python代码。绝对禁止包含任何Markdown标记,尤其是 ` ```python ` 和 ` ``` `。\n"
                                    "2.  **【依赖声明】**: 如果代码需要任何第三方库 (例如 `requests`, `pandas`),必须在代码的最开始,使用 `# REQUIRE: ` 的格式进行声明。**每个依赖独立一行**。如果不需要任何第三方库,则完全不要写 `# REQUIRE:` 注释。\n"
                                    "3.  **【日志记录】**: 必须使用Python的 `logging` 模块。在脚本开始处配置好 `basicConfig`,确保日志同时输出到控制台(stdout)和当前工作目录下的 `task.log` 文件。在关键步骤和任何 `except` 块中,都必须使用 `logging.info()` 或 `logging.error()` 进行记录。\n"
                                    "4.  **【错误处理】**: 所有可能失败的操作都必须被包含在 `try...except Exception as e:` 块中。\n"
                                    "5.  **【成功信号】**: 在脚本所有操作成功完成的最后,必须调用 `print(\"任务成功完成\")`。\n"
                                    "6.  **【完整性】**: 生成的代码必须是完整的、自包含的,包含所有必要的 `import` 语句。\n\n"
                                    "现在,请根据用户指令生成代码。"
                                ),
                            }
                        }
                    }
                }
            }
            self.save_config()
        else:
            with open(self.config_path, 'r', encoding='utf-8') as f:
                self.config = json.load(f)

    def save_config(self):
        with open(self.config_path, 'w', encoding='utf-8') as f:
            json.dump(self.config, f, indent=2, ensure_ascii=False)

    def get_server(self, name: str) -> Optional[Dict[str, Any]]:
        return self.config.get("mcpServers", {}).get(name)


class TaskWorkflow:
    def __init__(self):
        self.config = MCPConfig().config
        self.llm_cache = {}
        self.standard_libs = self._get_standard_libs()
        script_dir = Path(__file__).parent.resolve()
        self.shared_work_dir = script_dir / "mcp_tasks"
        self.shared_work_dir.mkdir(exist_ok=True)
  
        logger.info("正在初始化并检查共享虚拟环境...")
        try:
            self.venv_path = self.shared_work_dir / "venv"
            # _create_virtual_env 会返回可执行文件的绝对路径
            self.python_executable, self.pip_executable = self._create_virtual_env(str(self.venv_path))
            logger.info(f"共享虚拟环境已就绪。Python: {self.python_executable}, Pip: {self.pip_executable}")
        except Exception as e:
            logger.error(f"初始化共享虚拟环境失败: {e}", exc_info=True)
            raise RuntimeError(f"无法创建或验证共享虚拟环境,服务无法启动。错误: {e}")

    def _get_standard_libs(self) -> Set[str]:
        common_libs = {'os', 'sys', 'json', 're', 'logging', 'subprocess', 'pathlib', 'datetime', 'time', 'math', 'random', 'collections', 'itertools', 'functools', 'glob', 'shutil', 'tempfile', 'argparse', 'typing', '__future__'}
        if sys.version_info >= (3, 10):
            try:
                from sys import stdlib_module_names
                return set(stdlib_module_names)
            except ImportError:
                return common_libs
        return common_libs

    async def get_llm(self, model_name: str) -> Ollama:
        if model_name not in self.llm_cache:
            logger.info(f"正在加载模型: {model_name}")
            self.llm_cache[model_name] = Ollama(model=model_name, temperature=0.1, top_p=0.9, timeout=300)
        return self.llm_cache[model_name]

    def _create_virtual_env(self, venv_path_str: str) -> Tuple[str, str]:
        """
        【已更新】创建虚拟环境,并返回 Python 和 Pip 可执行文件的绝对路径。
        """
        venv_path = Path(venv_path_str)
        if sys.platform == "win32":
            python_exe = venv_path / 'Scripts' / 'python.exe'
            pip_exe = venv_path / 'Scripts' / 'pip.exe'
        else:
            python_exe = venv_path / 'bin' / 'python'
            pip_exe = venv_path / 'bin' / 'pip'

        if not python_exe.exists() or not pip_exe.exists():
            logger.info(f"共享虚拟环境不完整或不存在,正在创建于: {venv_path_str}")
            try:
                subprocess.run(
                    [sys.executable, "-m", "venv", venv_path_str],
                    check=True, capture_output=True, text=True, timeout=120
                )
            except subprocess.CalledProcessError as e:
                logger.error(f"创建虚拟环境失败: {e.stderr}")
                raise RuntimeError(f"创建虚拟环境失败,错误: {e.stderr}")
  
        if not python_exe.exists() or not pip_exe.exists():
             raise FileNotFoundError(f"虚拟环境创建后,未找到 Python/Pip 可执行文件。")
  
        logger.info("虚拟环境验证成功。")
        return str(python_exe), str(pip_exe)

    def _post_process_code(self, generated_code: str) -> Tuple[str, Set[str]]:
        cleaned_code = re.sub(r"```python\n|```", "", generated_code).strip()
        required_deps = set(re.findall(r"#\s*REQUIRE:\s*(\S+)", cleaned_code))
        final_code = "\n".join([line for line in cleaned_code.split('\n') if not line.strip().startswith("# REQUIRE:")])
        logger.info(f"代码后处理完成。提取的依赖: {required_deps or '无'}。")
        return final_code.strip(), required_deps

    def _install_dependencies(self, deps: Set[str]):
        """
        【需要改进】使用 Pip 的绝对路径来安装依赖。
        """
        if not deps:
            logger.info("代码中未发现 # REQUIRE: 声明,跳过依赖安装。")
            return
        deps_to_install = {dep for dep in deps if dep.lower() not in self.standard_libs}
        if not deps_to_install:
            logger.info(f"所有声明的依赖 {list(deps)} 均为标准库,无需安装。")
            return
  
        # 这里需要改进
        command = [self.pip_executable, "install", *deps_to_install]
        logger.info(f"执行依赖安装命令: {' '.join(command)}")
  
        result = subprocess.run(
            command,
            cwd=str(self.shared_work_dir),
            capture_output=True,
            text=True,
            timeout=300,
            check=False,
            encoding='utf-8',
        )
  
        if result.returncode != 0:
            error_message = f"依赖安装失败: {result.stderr}"
            logger.error(error_message)
            raise RuntimeError(error_message)
      
        logger.info(f"依赖 {list(deps_to_install)} 安装成功。")

    def _execute_code(self, code_to_execute: str, task_work_dir: str) -> Dict[str, Any]:
        """
        【已更新】使用 Python 的绝对路径来执行代码。
        """
        script_name = "generated_script.py"
        code_path = os.path.join(task_work_dir, script_name)
        with open(code_path, "w", encoding="utf-8") as f:
            f.write(code_to_execute)
  
        logger.info(f"最终执行的脚本已保存至: {code_path}")
  

        command = [self.python_executable, script_name]
        logger.info(f"执行代码命令: {' '.join(command)}")
  
        result = subprocess.run(
            command,
            cwd=task_work_dir,
            capture_output=True,
            text=True,
            timeout=300,
            check=False,
            encoding='utf-8',
        )
  
        return {"output": result.stdout, "error": result.stderr, "returncode": result.returncode}

    async def run_workflow(self, instruction: str, server_name: str, ctx: Context):
        result = {
            "stages": {
                "code_generation": "pending",
                "dependency_installation": "pending",
                "execution": "pending"
            },
            "code": "", "output": "", "error": "", "work_dir": "", "returncode": -1
        }
        try:
            timestamp = datetime.datetime.now().strftime("task_%Y%m%d_%H%M%S")
            task_work_dir = self.shared_work_dir / timestamp
            task_work_dir.mkdir(exist_ok=True)
            result["work_dir"] = str(task_work_dir)
            await ctx.info(f"任务工作目录已创建: {task_work_dir}")
      
            server_config = self.config.get("mcpServers", {}).get(server_name)
            if not server_config: raise ValueError(f"服务器 '{server_name}' 未配置")
      
            template = server_config["task_templates"]["file_operations"]
            prompt = template["prompt"].format(task=instruction)
      
            await ctx.info(f"正在使用模型 '{server_config['model']}' 生成代码...")
            llm = await self.get_llm(server_config['model'])
            generated_code = await llm.ainvoke(prompt)
            result["stages"]["code_generation"] = "success"
            await ctx.info("代码生成成功。")
      
            pure_code, dependencies = self._post_process_code(generated_code)
            result["code"] = pure_code
      
            result["stages"]["dependency_installation"] = "pending"
            await ctx.info(f"正在检查并安装依赖: {dependencies or '无'}")
            self._install_dependencies(dependencies)
            result["stages"]["dependency_installation"] = "success"
            await ctx.info("所有依赖已就绪。")
      
            result["stages"]["execution"] = "pending"
            await ctx.info("正在执行生成的代码...")
            exec_result = self._execute_code(pure_code, str(task_work_dir))
      
            result.update(exec_result)
      
            is_successful = "任务成功完成" in exec_result.get("output", "")
            result["stages"]["execution"] = "success" if is_successful else "failed"

            final_status = "success" if is_successful else "failed"
            return {"status": final_status, "result": result, "server": {"name": server_name, "model": server_config.get("model"), "status": "active"}}

        except Exception as e:
            current_stage = next((s for s, status in result["stages"].items() if status == "pending"), "unknown")
            result["stages"][current_stage] = "failed"
            error_message = f"在 '{current_stage}' 阶段失败: {e}"
            result["error"] = error_message
            logger.error(error_message, exc_info=True)
            await ctx.error(error_message)
      
            for stage, status in result["stages"].items():
                if status == "pending":
                    result["stages"][stage] = "skipped"

            return {"status": "failed", "result": result, "server": {"name": server_name, "model": self.config.get("mcpServers", {}).get(server_name, {}).get("model"), "status": "error"}}

# --- 一个单例的工作流执行器 ---
workflow_executor = TaskWorkflow()

# --- MCP 工具定义 ---
@mcp.tool()
async def execute_natural_command(
    instruction: str = Field(
        description="用户用自然语言下达的、需要在本地计算机上执行的具体任务指令。例如:'在桌面上创建一个名为'todo.txt'的文件' 或 '将/home/user/docs目录下的所有.log文件压缩成一个zip包'。"
    ),
    server: str = Field(default="default", description="要使用的服务器配置名称。"),
    ctx: Context = Field(exclude=True)
) -> CommandOutput:
    """
    当用户需要执行任何与文件、文件夹或系统相关的本地操作时,请使用此工具。
    此工具接收一条自然语言指令,将其转换为可执行的Python代码,并在安全的虚拟环境中运行,然后返回详细的执行结果。
    非常适合处理创建文件、修改文件、删除文件、压缩文件、移动目录、运行脚本等任务。
    """
    try:
        await ctx.info(f"收到指令,开始处理: '{instruction}'")
  
        # 调用单例执行器的方法,并传入当前的 ctx
        result_dict = await workflow_executor.run_workflow(
            instruction=instruction,
            server_name=server,
            ctx=ctx
        )
  
        await ctx.info("任务流程执行完毕。")
  
        return CommandOutput.model_validate(result_dict)
    except Exception as e:
        await ctx.error(f"执行工具时发生严重错误: {e}")
  
        return CommandOutput(
            status="failed",
            result=ExecutionResult(
                stages=ExecutionStage(
                    code_generation="failed",
                    dependency_installation="skipped",
                    execution="skipped"
                ),
                code="",
                output="",
                error=f"执行工具时发生顶层错误: {e}",
                work_dir="",
                returncode=-1
            ),
            server=ServerInfo(name=server, model=None, status="error")
        )


# --- 服务器启动 ---
def run():
    """
    服务器主入口函数。
    """
    try:
        subprocess.run(["ollama", "list"], check=True, capture_output=True, text=True)
        logger.info("Ollama 服务已在运行。")
    except (subprocess.CalledProcessError, FileNotFoundError):
        logger.warning("Ollama服务未运行,请确保Ollama已安装并正在运行。")
  
    mcp.run()

if __name__ == "__main__":
    run()
Reply View the author
SkyShadowHero
deepin
2025-08-06 14:43
#10

第二版python文件,目前能正常安装依赖

import os
import re
import sys
import json
import logging
import subprocess
import datetime
from pathlib import Path
from typing import Dict, Any, Optional, Set, Tuple

# 导入 MCP 核心库
from mcp.server.fastmcp import FastMCP, Context

# 导入 LangChain 和 Pydantic
from langchain_community.llms import Ollama
from pydantic import BaseModel, Field

# --- 日志配置 ---
logger = logging.getLogger(__name__)

# --- MCP 服务器定义 ---
mcp = FastMCP(name="SkyShadowHero Task Execution Server")

# --- Pydantic 返回模型 ---
class ExecutionStage(BaseModel):
    code_generation: str
    dependency_installation: str
    execution: str

class ExecutionResult(BaseModel):
    stages: ExecutionStage
    code: str
    output: str
    error: str
    work_dir: str
    returncode: int

class ServerInfo(BaseModel):
    name: str
    model: Optional[str]
    status: str

class CommandOutput(BaseModel):
    status: str
    result: ExecutionResult
    server: ServerInfo

# --- 配置管理 ---
class MCPConfig:
    _instance = None
    config_path = Path(__file__).parent / "mcp_config.json"

    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance.load_config()
        return cls._instance

    def load_config(self):
        if not self.config_path.exists():
            self.config = {
                "mcpServers": {
                    "default": {
                        "model": "deepseek-coder-v2:16b",
                        "task_templates": {
                            "file_operations": {
                                "prompt": (
                                    "你是一个顶级的Python自动化专家。你的任务是根据用户的自然语言指令,生成一段完整、健壮、可直接在标准Python环境中执行的脚本。你拥有完全的创作自由,但必须严格遵守以下规范。\n\n"
                                    "## 用户指令:\n"
                                    "{task}\n\n"
                                    "## 代码生成规范 (必须严格遵守):\n"
                                    "1.  **【代码纯净性】**: 你的输出必须是纯粹的Python代码。绝对禁止包含任何Markdown标记,尤其是 ` ```python ` 和 ` ``` `。\n"
                                    "2.  **【依赖声明】**: 如果代码需要任何第三方库 (例如 `requests`, `pandas`),必须在代码的最开始,使用 `# REQUIRE: ` 的格式进行声明。**每个依赖独立一行**。如果不需要任何第三方库,则完全不要写 `# REQUIRE:` 注释。\n"
                                    "3.  **【日志记录】**: 必须使用Python的 `logging` 模块。在脚本开始处配置好 `basicConfig`,确保日志同时输出到控制台(stdout)和当前工作目录下的 `task.log` 文件。在关键步骤和任何 `except` 块中,都必须使用 `logging.info()` 或 `logging.error()` 进行记录。\n"
                                    "4.  **【错误处理】**: 所有可能失败的操作都必须被包含在 `try...except Exception as e:` 块中。\n"
                                    "5.  **【成功信号】**: 在脚本所有操作成功完成的最后,必须调用 `print(\"任务成功完成\")`。\n"
                                    "6.  **【完整性】**: 生成的代码必须是完整的、自包含的,包含所有必要的 `import` 语句。\n\n"
                                    "现在,请根据用户指令生成代码。"
                                ),
                            }
                        }
                    }
                }
            }
            self.save_config()
        else:
            with open(self.config_path, 'r', encoding='utf-8') as f:
                self.config = json.load(f)

    def save_config(self):
        with open(self.config_path, 'w', encoding='utf-8') as f:
            json.dump(self.config, f, indent=2, ensure_ascii=False)

    def get_server(self, name: str) -> Optional[Dict[str, Any]]:
        return self.config.get("mcpServers", {}).get(name)

# --- 【最终修复架构】核心逻辑封装 ---
class TaskWorkflow:
    def __init__(self):
        self.config = MCPConfig().config
        self.llm_cache = {}
        self.standard_libs = self._get_standard_libs()
        script_dir = Path(__file__).parent.resolve()
        self.shared_work_dir = script_dir / "mcp_tasks"
        self.shared_work_dir.mkdir(exist_ok=True)
  
        logger.info("正在初始化并检查共享虚拟环境...")
        try:
            self.venv_path = self.shared_work_dir / "venv"
            self.python_executable, self.pip_executable = self._create_virtual_env(str(self.venv_path))
            logger.info(f"共享虚拟环境已就绪。Python: {self.python_executable}, Pip: {self.pip_executable}")
        except Exception as e:
            logger.error(f"初始化共享虚拟环境失败: {e}", exc_info=True)
            raise RuntimeError(f"无法创建或验证共享虚拟环境,服务无法启动。错误: {e}")

    def _get_standard_libs(self) -> Set[str]:
        common_libs = {'os', 'sys', 'json', 're', 'logging', 'subprocess', 'pathlib', 'datetime', 'time', 'math', 'random', 'collections', 'itertools', 'functools', 'glob', 'shutil', 'tempfile', 'argparse', 'typing', '__future__'}
        if sys.version_info >= (3, 10):
            try:
                from sys import stdlib_module_names
                return set(stdlib_module_names)
            except ImportError:
                return common_libs
        return common_libs

    async def get_llm(self, model_name: str) -> Ollama:
        if model_name not in self.llm_cache:
            logger.info(f"正在加载模型: {model_name}")
            self.llm_cache[model_name] = Ollama(model=model_name, temperature=0.1, top_p=0.9, timeout=300)
        return self.llm_cache[model_name]

    def _create_virtual_env(self, venv_path_str: str) -> Tuple[str, str]:
        venv_path = Path(venv_path_str)
        if sys.platform == "win32":
            python_exe = venv_path / 'Scripts' / 'python.exe'
            pip_exe = venv_path / 'Scripts' / 'pip.exe'
        else:
            python_exe = venv_path / 'bin' / 'python'
            pip_exe = venv_path / 'bin' / 'pip'

        if not python_exe.exists() or not pip_exe.exists():
            logger.info(f"共享虚拟环境不完整或不存在,正在创建于: {venv_path_str}")
            try:
                subprocess.run(
                    [sys.executable, "-m", "venv", venv_path_str],
                    check=True, capture_output=True, text=True, timeout=120
                )
            except subprocess.CalledProcessError as e:
                logger.error(f"创建虚拟环境失败: {e.stderr}")
                raise RuntimeError(f"创建虚拟环境失败,错误: {e.stderr}")
  
        if not python_exe.exists() or not pip_exe.exists():
             raise FileNotFoundError(f"虚拟环境创建后,未找到 Python/Pip 可执行文件。")
  
        logger.info("虚拟环境验证成功。")
        return str(python_exe), str(pip_exe)

    def _post_process_code(self, generated_code: str) -> Tuple[str, Set[str]]:
        cleaned_code = re.sub(r"```python\n|```", "", generated_code).strip()
        required_deps = set(re.findall(r"#\s*REQUIRE:\s*(\S+)", cleaned_code))
        final_code = "\n".join([line for line in cleaned_code.split('\n') if not line.strip().startswith("# REQUIRE:")])
        logger.info(f"代码后处理完成。提取的依赖: {required_deps or '无'}。")
        return final_code.strip(), required_deps

    def _install_dependencies(self, deps: Set[str]):
        if not deps:
            logger.info("代码中未发现 # REQUIRE: 声明,跳过依赖安装。")
            return
        deps_to_install = {dep for dep in deps if dep.lower() not in self.standard_libs}
        if not deps_to_install:
            logger.info(f"所有声明的依赖 {list(deps)} 均为标准库,无需安装。")
            return
  
        command = [self.pip_executable, "install", *deps_to_install]
        logger.info(f"执行依赖安装命令: {' '.join(command)}")
  
        result = subprocess.run(
            command,
            cwd=str(self.shared_work_dir),
            capture_output=True,
            text=True,
            timeout=300,
            check=False,
            encoding='utf-8',
        )
  
        if result.returncode != 0:
            error_message = f"依赖安装失败: {result.stderr}"
            logger.error(error_message)
            raise RuntimeError(error_message)
      
        logger.info(f"依赖 {list(deps_to_install)} 安装成功。")

    def _execute_code(self, code_to_execute: str, task_work_dir: str) -> Dict[str, Any]:
        script_name = "generated_script.py"
        code_path = os.path.join(task_work_dir, script_name)
        with open(code_path, "w", encoding="utf-8") as f:
            f.write(code_to_execute)
  
        logger.info(f"最终执行的脚本已保存至: {code_path}")
  
        command = [self.python_executable, script_name]
        logger.info(f"执行代码命令: {' '.join(command)}")
  
        result = subprocess.run(
            command,
            cwd=task_work_dir,
            capture_output=True,
            text=True,
            timeout=300,
            check=False,
            encoding='utf-8',
        )
  
        return {"output": result.stdout, "error": result.stderr, "returncode": result.returncode}

    def _is_direct_command(self, instruction: str) -> bool:
        """
        【新】检查指令是否为直接的 shell 命令。
        """
        # 简单但有效的检查:如果指令以常见命令开头,则认为是直接命令
        common_commands = ['python', 'pip', 'uv', 'bash', 'sh', 'ls', 'rm', 'cp', 'mv', 'mkdir']
        return any(instruction.strip().startswith(cmd) for cmd in common_commands)

    def _execute_direct_command(self, command: str, task_work_dir: str) -> Dict[str, Any]:
        """
        【新】在虚拟环境的上下文中直接执行 shell 命令。
        """
        logger.info(f"检测到直接命令,将在虚拟环境中执行: {command}")
  
        # 为了在虚拟环境中执行,我们构造一个激活并执行的命令
        # 这对于需要虚拟环境上下文的命令(如 pip install)至关重要
        if sys.platform == "win32":
            activate_script = Path(self.python_executable).parent / "activate.bat"
            full_command = f'call "{activate_script}" && {command}'
        else:
            activate_script = Path(self.python_executable).parent / "activate"
            full_command = f'. "{activate_script}" && {command}'

        result = subprocess.run(
            full_command,
            cwd=task_work_dir,
            capture_output=True,
            text=True,
            timeout=300,
            check=False,
            encoding='utf-8',
            shell=True, # shell=True 对于 `.` 和 `&&` 是必需的
            executable='/bin/bash' if sys.platform != "win32" else None
        )
  
        return {"output": result.stdout, "error": result.stderr, "returncode": result.returncode}

    async def run_workflow(self, instruction: str, server_name: str, ctx: Context):
        result = {
            "stages": {
                "code_generation": "pending",
                "dependency_installation": "pending",
                "execution": "pending"
            },
            "code": "", "output": "", "error": "", "work_dir": "", "returncode": -1
        }
        try:
            timestamp = datetime.datetime.now().strftime("task_%Y%m%d_%H%M%S")
            task_work_dir = self.shared_work_dir / timestamp
            task_work_dir.mkdir(exist_ok=True)
            result["work_dir"] = str(task_work_dir)
            await ctx.info(f"任务工作目录已创建: {task_work_dir}")

            # --- 指令嗅探和模式切换 ---
            if self._is_direct_command(instruction):
                await ctx.info("检测到直接命令模式。")
                result["stages"]["code_generation"] = "skipped (direct command)"
                result["stages"]["dependency_installation"] = "skipped (direct command)"
                result["code"] = f"# Direct Command Execution\n{instruction}"

                result["stages"]["execution"] = "pending"
                await ctx.info(f"正在直接执行命令: {instruction}")
                exec_result = self._execute_direct_command(instruction, str(task_work_dir))
                result.update(exec_result)
          
                is_successful = exec_result.get("returncode") == 0
                result["stages"]["execution"] = "success" if is_successful else "failed"
                final_status = "success" if is_successful else "failed"
          
                return {"status": final_status, "result": result, "server": {"name": server_name, "model": "N/A (Direct Command)", "status": "active"}}
      
            # --- 如果不是直接命令,则执行原有的 LLM 工作流 ---
            await ctx.info("进入 LLM 代码生成模式。")
            server_config = self.config.get("mcpServers", {}).get(server_name)
            if not server_config: raise ValueError(f"服务器 '{server_name}' 未配置")
      
            template = server_config["task_templates"]["file_operations"]
            prompt = template["prompt"].format(task=instruction)
      
            await ctx.info(f"正在使用模型 '{server_config['model']}' 生成代码...")
            llm = await self.get_llm(server_config['model'])
            generated_code = await llm.ainvoke(prompt)
            result["stages"]["code_generation"] = "success"
            await ctx.info("代码生成成功。")
      
            pure_code, dependencies = self._post_process_code(generated_code)
            result["code"] = pure_code
      
            result["stages"]["dependency_installation"] = "pending"
            await ctx.info(f"正在检查并安装依赖: {dependencies or '无'}")
            self._install_dependencies(dependencies)
            result["stages"]["dependency_installation"] = "success"
            await ctx.info("所有依赖已就绪。")
      
            result["stages"]["execution"] = "pending"
            await ctx.info("正在执行生成的代码...")
            exec_result = self._execute_code(pure_code, str(task_work_dir))
      
            result.update(exec_result)
      
            is_successful = "任务成功完成" in exec_result.get("output", "")
            result["stages"]["execution"] = "success" if is_successful else "failed"

            final_status = "success" if is_successful else "failed"
            return {"status": final_status, "result": result, "server": {"name": server_name, "model": server_config.get("model"), "status": "active"}}

        except Exception as e:
            current_stage = next((s for s, status in result["stages"].items() if status == "pending"), "unknown")
            result["stages"][current_stage] = "failed"
            error_message = f"在 '{current_stage}' 阶段失败: {e}"
            result["error"] = error_message
            logger.error(error_message, exc_info=True)
            await ctx.error(error_message)
      
            for stage, status in result["stages"].items():
                if status == "pending":
                    result["stages"][stage] = "skipped"

            return {"status": "failed", "result": result, "server": {"name": server_name, "model": self.config.get("mcpServers", {}).get(server_name, {}).get("model"), "status": "error"}}

# --- 单例和工具定义 ---
workflow_executor = TaskWorkflow()

@mcp.tool()
async def execute_natural_command(
    instruction: str = Field(
        description="用户用自然语言下达的、需要在本地计算机上执行的具体任务指令。例如:'在桌面上创建一个名为'todo.txt'的文件' 或 '将/home/user/docs目录下的所有.log文件压缩成一个zip包'。"
    ),
    server: str = Field(default="default", description="要使用的服务器配置名称。"),
    ctx: Context = Field(exclude=True)
) -> CommandOutput:
    """
    当用户需要执行任何与文件、文件夹或系统相关的本地操作时,请使用此工具。
    此工具接收一条自然语言指令,将其转换为可执行的Python代码,并在安全的虚拟环境中运行,然后返回详细的执行结果。
    """
    try:
        await ctx.info(f"收到指令,开始处理: '{instruction}'")
  
        result_dict = await workflow_executor.run_workflow(
            instruction=instruction,
            server_name=server,
            ctx=ctx
        )
  
        await ctx.info("任务流程执行完毕。")
  
        return CommandOutput.model_validate(result_dict)
    except Exception as e:
        await ctx.error(f"执行工具时发生严重错误: {e}")
  
        return CommandOutput(
            status="failed",
            result=ExecutionResult(
                stages=ExecutionStage(
                    code_generation="failed",
                    dependency_installation="skipped",
                    execution="skipped"
                ),
                code="",
                output="",
                error=f"执行工具时发生顶层错误: {e}",
                work_dir="",
                returncode=-1
            ),
            server=ServerInfo(name=server, model=None, status="error")
        )

# --- 服务器启动---
def run():
    """
    服务器主入口函数。
    """
    try:
        subprocess.run(["ollama", "list"], check=True, capture_output=True, text=True)
        logger.info("Ollama 服务已在运行。")
    except (subprocess.CalledProcessError, FileNotFoundError):
        logger.warning("Ollama服务未运行,请确保Ollama已安装并正在运行。")
  
    mcp.run()

if __name__ == "__main__":
    run()

Reply View the author
SkyShadowHero
deepin
2025-08-28 15:20
#11

第三版,使用前先在环境里

pip install mcp langchain-community

依赖安装换成了requirements.txt的方式。使用前请先python运行一下该python然后生成配置文件,在配置文件里修改ollama模型

import os
import re
import sys
import json
import logging
import subprocess
import datetime
from pathlib import Path
from typing import Dict, Any, Optional, Set, Tuple

# 导入 MCP 核心库
from mcp.server.fastmcp import FastMCP, Context

# 导入 LangChain 和 Pydantic
from langchain_community.llms import Ollama
from pydantic import BaseModel, Field

# --- 全局日志配置 (用于服务本身) ---
# 这个 logger 用于记录服务启动、关闭等非任务相关的核心日志
global_logger = logging.getLogger("mcp_service")
if not global_logger.handlers:
    global_logger.setLevel(logging.INFO)
    handler = logging.StreamHandler(sys.stdout)
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    handler.setFormatter(formatter)
    global_logger.addHandler(handler)


# --- MCP 服务器定义 ---
mcp = FastMCP(name="SkyShadowHero Task Execution Server")

# --- Pydantic 返回模型 ---
class ExecutionStage(BaseModel):
    code_generation: str
    dependency_installation: str
    execution: str

class ExecutionResult(BaseModel):
    stages: ExecutionStage
    code: str
    output: str
    error: str
    work_dir: str
    returncode: int

class ServerInfo(BaseModel):
    name: str
    model: Optional[str]
    status: str

class CommandOutput(BaseModel):
    status: str
    result: ExecutionResult
    server: ServerInfo

# --- 配置管理 ---
class MCPConfig:
    _instance = None
    config_path = Path(__file__).parent / "mcp_config.json"

    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance.load_config()
        return cls._instance

    def load_config(self):
        if not self.config_path.exists():
            self.config = {
                "mcpServers": {
                    "default": {
                        "model": "deepseek-coder-v2:16b",
                        "task_templates": {
                            "file_operations": {
                                "prompt": (
                                    "你是一个顶级的Python自动化专家。你的任务是根据用户的自然语言指令,生成一段完整、健壮、可直接在标准Python环境中执行的脚本。你拥有完全的创作自由,但必须严格遵守以下规范。\n\n"
                                    "## 用户指令:\n"
                                    "{task}\n\n"
                                    "## 代码生成规范 (必须严格遵守):\n"
                                    "1.  **【代码纯净性】**: 你的输出必须是纯粹的Python代码。绝对禁止包含任何Markdown标记,尤其是 ` ```python ` 和 ` ``` `。\n"
                                    "2.  **【依赖声明】**: 如果代码需要任何第三方库 (例如 `requests`, `pandas`),必须在代码的最开始,使用 `# REQUIRE: ` 的格式进行声明。**每个依赖独立一行**。如果不需要任何第三方库,则完全不要写 `# REQUIRE:` 注释。\n"
                                    "3.  **【日志记录】**: 必须使用Python的 `logging` 模块。在脚本开始处配置好 `basicConfig`,确保日志同时输出到控制台(stdout)和当前工作目录下的 `task.log` 文件。在关键步骤和任何 `except` 块中,都必须使用 `logging.info()` 或 `logging.error()` 进行记录。\n"
                                    "4.  **【错误处理】**: 所有可能失败的操作都必须被包含在 `try...except Exception as e:` 块中。\n"
                                    "5.  **【成功信号】**: 在脚本所有操作成功完成的最后,必须调用 `print(\"任务成功完成\")`。\n"
                                    "6.  **【完整性】**: 生成的代码必须是完整的、自包含的,包含所有必要的 `import` 语句。\n\n"
                                    "现在,请根据用户指令生成代码。"
                                ),
                            }
                        }
                    }
                }
            }
            self.save_config()
        else:
            with open(self.config_path, 'r', encoding='utf-8') as f:
                self.config = json.load(f)

    def save_config(self):
        with open(self.config_path, 'w', encoding='utf-8') as f:
            json.dump(self.config, f, indent=2, ensure_ascii=False)

    def get_server(self, name: str) -> Optional[Dict[str, Any]]:
        return self.config.get("mcpServers", {}).get(name)

# --- 【最终修复架构】核心逻辑封装 ---
class TaskWorkflow:
    def __init__(self):
        self.config = MCPConfig().config
        self.llm_cache = {}
        self.standard_libs = self._get_standard_libs()
        script_dir = Path(__file__).parent.resolve()
        self.shared_work_dir = script_dir / "mcp_tasks"
        self.shared_work_dir.mkdir(exist_ok=True)

        global_logger.info("正在初始化并检查共享虚拟环境...")
        try:
            self.venv_path = self.shared_work_dir / "venv"
            self.python_executable, self.pip_executable = self._create_virtual_env(str(self.venv_path))
            global_logger.info(f"共享虚拟环境已就绪。Python: {self.python_executable}, Pip: {self.pip_executable}")
        except Exception as e:
            global_logger.error(f"初始化共享虚拟环境失败: {e}", exc_info=True)
            raise RuntimeError(f"无法创建或验证共享虚拟环境,服务无法启动。错误: {e}")

    def _get_standard_libs(self) -> Set[str]:
        common_libs = {'os', 'sys', 'json', 're', 'logging', 'subprocess', 'pathlib', 'datetime', 'time', 'math', 'random', 'collections', 'itertools', 'functools', 'glob', 'shutil', 'tempfile', 'argparse', 'typing', '__future__'}
        if sys.version_info >= (3, 10):
            try:
                from sys import stdlib_module_names
                return set(stdlib_module_names)
            except ImportError:
                return common_libs
        return common_libs

    async def get_llm(self, model_name: str) -> Ollama:
        if model_name not in self.llm_cache:
            global_logger.info(f"正在加载模型: {model_name}")
            self.llm_cache[model_name] = Ollama(model=model_name, temperature=0.1, top_p=0.9, timeout=300)
        return self.llm_cache[model_name]

    def _create_virtual_env(self, venv_path_str: str) -> Tuple[str, str]:
        venv_path = Path(venv_path_str)
        if sys.platform == "win32":
            python_exe = venv_path / 'Scripts' / 'python.exe'
            pip_exe = venv_path / 'Scripts' / 'pip.exe'
        else:
            python_exe = venv_path / 'bin' / 'python'
            pip_exe = venv_path / 'bin' / 'pip'

        if not python_exe.exists() or not pip_exe.exists():
            global_logger.info(f"共享虚拟环境不完整或不存在,正在创建于: {venv_path_str}")
            try:
                subprocess.run(
                    [sys.executable, "-m", "venv", venv_path_str],
                    check=True, capture_output=True, text=True, timeout=120
                )
            except subprocess.CalledProcessError as e:
                global_logger.error(f"创建虚拟环境失败: {e.stderr}")
                raise RuntimeError(f"创建虚拟环境失败,错误: {e.stderr}")
  
        if not python_exe.exists() or not pip_exe.exists():
             raise FileNotFoundError(f"虚拟环境创建后,未找到 Python/Pip 可执行文件。")
  
        global_logger.info("虚拟环境验证成功。")
        return str(python_exe), str(pip_exe)

    def _post_process_code(self, generated_code: str) -> Tuple[str, Set[str]]:
        cleaned_code = re.sub(r"```python\n|```", "", generated_code).strip()
        required_deps = set(re.findall(r"#\s*REQUIRE:\s*(\S+)", cleaned_code))
        final_code = "\n".join([line for line in cleaned_code.split('\n') if not line.strip().startswith("# REQUIRE:")])
        global_logger.info(f"代码后处理完成。提取的依赖: {required_deps or '无'}。")
        return final_code.strip(), required_deps

    def _install_dependencies(self, deps: Set[str], task_work_dir: Path):
        if not deps:
            global_logger.info("代码中未发现 # REQUIRE: 声明,跳过依赖安装。")
            return
  
        deps_to_install = {dep for dep in deps if dep.lower() not in self.standard_libs}
        if not deps_to_install:
            global_logger.info(f"所有声明的依赖 {list(deps)} 均为标准库,无需安装。")
            return

        # 确保文件名是 requirements.txt
        requirements_path = task_work_dir / "requirements.txt"
        with open(requirements_path, 'w', encoding='utf-8') as f:
            for dep in deps_to_install:
                f.write(f"{dep}\n")
        global_logger.info(f"已生成依赖文件: {requirements_path}")

        command = [self.pip_executable, "install", "-r", str(requirements_path)]
        global_logger.info(f"执行依赖安装命令: {' '.join(command)}")
  
        result = subprocess.run(
            command,
            cwd=str(task_work_dir),
            capture_output=True,
            text=True,
            timeout=300,
            check=False,
            encoding='utf-8',
        )
  
        if result.returncode != 0:
            error_message = f"依赖安装失败: {result.stderr}"
            global_logger.error(error_message)
            raise RuntimeError(error_message)
  
        global_logger.info(f"依赖 {list(deps_to_install)} 安装成功。")

    def _execute_code(self, code_to_execute: str, task_work_dir: str) -> Dict[str, Any]:
        script_name = "generated_script.py"
        code_path = os.path.join(task_work_dir, script_name)
        with open(code_path, "w", encoding="utf-8") as f:
            f.write(code_to_execute)
  
        global_logger.info(f"最终执行的脚本已保存至: {code_path}")
  
        command = [self.python_executable, script_name]
        global_logger.info(f"执行代码命令: {' '.join(command)}")
  
        result = subprocess.run(
            command,
            cwd=task_work_dir,
            capture_output=True,
            text=True,
            timeout=300,
            check=False,
            encoding='utf-8',
        )
  
        return {"output": result.stdout, "error": result.stderr, "returncode": result.returncode}

    def _is_direct_command(self, instruction: str) -> bool:
        common_commands = ['python', 'pip', 'uv', 'bash', 'sh', 'ls', 'rm', 'cp', 'mv', 'mkdir']
        return any(instruction.strip().startswith(cmd) for cmd in common_commands)

    def _execute_direct_command(self, command: str, task_work_dir: str) -> Dict[str, Any]:
        global_logger.info(f"检测到直接命令,将在虚拟环境中执行: {command}")
  
        if sys.platform == "win32":
            activate_script = Path(self.python_executable).parent / "activate.bat"
            full_command = f'call "{activate_script}" && {command}'
        else:
            activate_script = Path(self.python_executable).parent / "activate"
            full_command = f'. "{activate_script}" && {command}'

        result = subprocess.run(
            full_command,
            cwd=task_work_dir,
            capture_output=True,
            text=True,
            timeout=300,
            check=False,
            encoding='utf-8',
            shell=True,
            executable='/bin/bash' if sys.platform != "win32" else None
        )
  
        return {"output": result.stdout, "error": result.stderr, "returncode": result.returncode}

    async def run_workflow(self, instruction: str, server_name: str, ctx: Context):
        result = {
            "stages": {
                "code_generation": "pending",
                "dependency_installation": "pending",
                "execution": "pending"
            },
            "code": "", "output": "", "error": "", "work_dir": "", "returncode": -1
        }
  
        try:
            timestamp = datetime.datetime.now().strftime("task_%Y%m%d_%H%M%S")
            task_work_dir = self.shared_work_dir / timestamp
            task_work_dir.mkdir(exist_ok=True)
            result["work_dir"] = str(task_work_dir)
  
            global_logger.info(f"任务 '{timestamp}' 启动,指令: '{instruction}'")
            await ctx.info(f"任务工作目录已创建: {task_work_dir}")

            # --- 指令嗅探和模式切换 ---
            if self._is_direct_command(instruction):
                await ctx.info("检测到直接命令模式。")
                result["stages"]["code_generation"] = "skipped (direct command)"
                result["stages"]["dependency_installation"] = "skipped (direct command)"
                result["code"] = f"# Direct Command Execution\n{instruction}"

                result["stages"]["execution"] = "pending"
                await ctx.info(f"正在直接执行命令: {instruction}")
                exec_result = self._execute_direct_command(instruction, str(task_work_dir))
                result.update(exec_result)
  
                is_successful = exec_result.get("returncode") == 0
                result["stages"]["execution"] = "success" if is_successful else "failed"
                final_status = "success" if is_successful else "failed"
      
                global_logger.info(f"直接命令执行完成。状态: {final_status}。")
  
                return {"status": final_status, "result": result, "server": {"name": server_name, "model": "N/A (Direct Command)", "status": "active"}}
  
            # --- 如果不是直接命令,则执行原有的 LLM 工作流 ---
            await ctx.info("进入 LLM 代码生成模式。")
            server_config = self.config.get("mcpServers", {}).get(server_name)
            if not server_config: raise ValueError(f"服务器 '{server_name}' 未配置")
  
            template = server_config["task_templates"]["file_operations"]
            prompt = template["prompt"].format(task=instruction)
  
            await ctx.info(f"正在使用模型 '{server_config['model']}' 生成代码...")
            llm = await self.get_llm(server_config['model'])
            generated_code = await llm.ainvoke(prompt)
            result["stages"]["code_generation"] = "success"
            await ctx.info("代码生成成功。")
  
            pure_code, dependencies = self._post_process_code(generated_code)
            result["code"] = pure_code
  
            result["stages"]["dependency_installation"] = "pending"
            await ctx.info(f"正在检查并安装依赖: {dependencies or '无'}")
            self._install_dependencies(dependencies, task_work_dir)
            result["stages"]["dependency_installation"] = "success"
            await ctx.info("所有依赖已就绪。")
  
            result["stages"]["execution"] = "pending"
            await ctx.info("正在执行生成的代码...")
            exec_result = self._execute_code(pure_code, str(task_work_dir))
  
            result.update(exec_result)
  
            is_successful = "任务成功完成" in exec_result.get("output", "")
            result["stages"]["execution"] = "success" if is_successful else "failed"

            final_status = "success" if is_successful else "failed"
            global_logger.info(f"代码执行完成。状态: {final_status}。")
            return {"status": final_status, "result": result, "server": {"name": server_name, "model": server_config.get("model"), "status": "active"}}

        except Exception as e:
            current_stage = next((s for s, status in result["stages"].items() if status == "pending"), "unknown")
            result["stages"][current_stage] = "failed"
            error_message = f"在 '{current_stage}' 阶段失败: {e}"
            result["error"] = error_message
            global_logger.error(error_message, exc_info=True)
            await ctx.error(error_message)
  
            for stage, status in result["stages"].items():
                if status == "pending":
                    result["stages"][stage] = "skipped"

            return {"status": "failed", "result": result, "server": {"name": server_name, "model": self.config.get("mcpServers", {}).get(server_name, {}).get("model"), "status": "error"}}

# --- 单例和工具定义 ---
workflow_executor = TaskWorkflow()

@mcp.tool()
async def do_it_using_python(
    instruction: str = Field(
        description="用户用自然语言下达的、需要在本地计算机上执行的具体任务指令。例如:'在桌面上创建一个名为'todo.txt'的文件' 或 '将/home/user/docs目录下的所有.log文件压缩成一个zip包'。"
    ),
    server: str = Field(default="default", description="要使用的服务器配置名称。"),
    ctx: Context = Field(exclude=True)
) -> CommandOutput:
    """
    当用户需要执行任何与文件、文件夹或系统相关的本地操作时,请使用此工具。
    此工具接收一条自然语言指令,将其转换为可执行的Python代码,并在安全的虚拟环境中运行,然后返回详细的执行结果。
    """
    try:
        await ctx.info(f"收到指令,开始处理: '{instruction}'")
  
        result_dict = await workflow_executor.run_workflow(
            instruction=instruction,
            server_name=server,
            ctx=ctx
        )
  
        await ctx.info("任务流程执行完毕。")
  
        return CommandOutput.model_validate(result_dict)
    except Exception as e:
        global_logger.error(f"执行工具时发生严重错误: {e}", exc_info=True)
        await ctx.error(f"执行工具时发生严重错误: {e}")
  
        return CommandOutput(
            status="failed",
            result=ExecutionResult(
                stages=ExecutionStage(
                    code_generation="failed",
                    dependency_installation="skipped",
                    execution="skipped"
                ),
                code="",
                output="",
                error=f"执行工具时发生顶层错误: {e}",
                work_dir="",
                returncode=-1
            ),
            server=ServerInfo(name=server, model=None, status="error")
        )

# --- 服务器启动---
def run():
    """
    服务器主入口函数。
    """
    try:
        subprocess.run(["ollama", "list"], check=True, capture_output=True, text=True)
        global_logger.info("Ollama 服务已在运行。")
    except (subprocess.CalledProcessError, FileNotFoundError):
        global_logger.warning("Ollama服务未运行,请确保Ollama已安装并正在运行。")
  
    mcp.run()

if __name__ == "__main__":
    run()

Reply View the author
SkyShadowHero
deepin
2025-09-09 10:29
#12

第四版。添加了api模式,并且把生成代码,安装依赖和执行代码分开了,现在需要 pip install fastmcp langchain-community langchain-openai httpx ,使用前先删掉之前的mcp_config.json文件并运行一下python重新生成配置文件,生成的配置文件差不多长这样,其中如果provider为ollama的话,model为必填,目前ollama有点问题,如果要使用ollama的话建议用第三版

"llm_config": {
    "provider": "ollama或api",
    "model": "deepseek-coder-v2:16b",
    "api_key": "YOUR_API_KEY_HERE (if provider is 'api')",
    "base_url": "YOUR_API_BASE_URL_HERE (if provider is 'api')"
}
import os
import re
import sys
import json
import logging
import subprocess
import datetime
from pathlib import Path
from typing import Dict, Any, Optional, Set, Tuple, List

# 导入 MCP 核心库
from mcp.server.fastmcp import FastMCP, Context

# 导入 LangChain 和 Pydantic
from langchain_community.llms import Ollama
from langchain_openai import ChatOpenAI
from langchain_core.language_models.base import BaseLanguageModel
from pydantic import BaseModel, Field

# 导入 httpx 用于创建自定义网络客户端
import httpx

# --- 提示词定义 ---
CODE_GENERATION_PROMPT = """
你是一个顶级的Python自动化专家 。你的任务是根据用户的自然语言指令,生成一段完整、健壮、可直接在标准Python环境中执行的脚本。你拥有完全的创作自由,但必须严格遵守以下规范。

## 用户指令:
{task}

## 代码生成规范 (必须严格遵守):
1.  **【代码纯净性】**: 你的输出必须是纯粹的Python代码。绝对禁止包含任何Markdown标记,尤其是 ` ```python ` 和 ` ``` `。
2.  **【依赖声明】**: 如果代码需要任何第三方库 (例如 `requests`, `pandas`),必须在代码的最开始,使用 `# REQUIRE: ` 的格式进行声明。**每个依赖独立一行**。如果不需要任何第三方库,则完全不要写 `# REQUIRE:` 注释。
3.  **【日志记录】**: 必须使用Python的 `logging` 模块。在脚本开始处配置好 `basicConfig`,确保日志同时输出到控制台(stdout)和当前工作目录下的 `task.log` 文件。在关键步骤和任何 `except` 块中,都必须使用 `logging.info()` 或 `logging.error()` 进行记录。
4.  **【错误处理】**: 所有可能失败的操作都必须被包含在 `try...except Exception as e:` 块中。
5.  **【成功信号】**: 在脚本所有操作成功完成的最后,必须调用 `print("任务成功完成")`。
6.  **【完整性】**: 生成的代码必须是完整的、自包含的,包含所有必要的 `import` 语句。

现在,请根据用户指令生成代码。
"""

# --- 全局日志配置 ---
global_logger = logging.getLogger("mcp_service")
if not global_logger.handlers:
    global_logger.setLevel(logging.INFO)
    handler = logging.StreamHandler(sys.stdout)
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    handler.setFormatter(formatter)
    global_logger.addHandler(handler)

# --- MCP 服务器定义 ---
mcp = FastMCP(name="SkyShadowHero Task Execution Server")

# --- Pydantic 返回模型 ---
class CodeGenerationResult(BaseModel):
    status: str = Field(description="代码生成阶段的状态 ('success' 或 'failed')")
    code: str = Field(description="生成或提供的Python代码")
    dependencies: List[str] = Field(description="从代码中提取的依赖库列表")
    work_dir: str = Field(description="为本次任务创建的工作目录路径")
    error: Optional[str] = Field(None, description="如果生成失败,此字段包含错误信息")

class DependencyInstallationResult(BaseModel):
    status: str = Field(description="依赖安装阶段的状态 ('success', 'failed', 或 'skipped')")
    installed_packages: List[str] = Field(description="成功安装的包列表")
    work_dir: str = Field(description="执行安装的工作目录")
    output: str = Field(description="pip install 命令的输出")
    error: Optional[str] = Field(None, description="如果安装失败,此字段包含错误信息")

class ExecutionResult(BaseModel):
    status: str = Field(description="代码执行阶段的状态 ('success' 或 'failed')")
    output: str = Field(description="脚本执行的标准输出 (stdout)")
    error: str = Field(description="脚本执行的标准错误 (stderr)")
    returncode: int = Field(description="脚本执行的返回码")
    work_dir: str = Field(description="执行代码的工作目录")

# --- 配置管理 (MCPConfig) ---
class MCPConfig:
    _instance = None
    config_path = Path(__file__).parent / "mcp_config.json"

    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance.load_config()
        return cls._instance

    def load_config(self):
        if not self.config_path.exists():
            self.config = {
                "llm_config": {
                    "provider": "ollama",
                    "model": "deepseek-coder-v2:16b",
                    "api_key": "YOUR_API_KEY_HERE (if provider is 'api')",
                    "base_url": "YOUR_API_BASE_URL_HERE (if provider is 'api')"
                }
            }
            self.save_config()
        else:
            with open(self.config_path, 'r', encoding='utf-8') as f:
                self.config = json.load(f)

    def save_config(self):
        with open(self.config_path, 'w', encoding='utf-8') as f:
            json.dump(self.config, f, indent=4, ensure_ascii=False)

    def get_llm_config(self) -> Optional[Dict[str, Any]]:
        return self.config.get("llm_config")

# --- 核心逻辑封装 (TaskWorkflow) ---
class TaskWorkflow:
    def __init__(self):
        self.llm_config = MCPConfig().get_llm_config()
        self.llm_instance: Optional[BaseLanguageModel] = None
        self.standard_libs = self._get_standard_libs()
        script_dir = Path(__file__).parent.resolve()
        self.shared_work_dir = script_dir / "mcp_tasks"
        self.shared_work_dir.mkdir(exist_ok=True)

        global_logger.info("正在初始化并检查共享虚拟环境...")
        try:
            self.venv_path = self.shared_work_dir / "venv"
            self.python_executable, self.pip_executable = self._create_virtual_env(str(self.venv_path))
            global_logger.info(f"共享虚拟环境已就绪。Python: {self.python_executable}, Pip: {self.pip_executable}")
        except Exception as e:
            global_logger.error(f"初始化共享虚拟环境失败: {e}", exc_info=True)
            raise RuntimeError(f"无法创建或验证共享虚拟环境,服务无法启动。错误: {e}")

    def _get_standard_libs(self) -> Set[str]:
        common_libs = {'os', 'sys', 'json', 're', 'logging', 'subprocess', 'pathlib', 'datetime', 'time', 'math', 'random', 'collections', 'itertools', 'functools', 'glob', 'shutil', 'tempfile', 'argparse', 'typing', '__future__'}
        if sys.version_info >= (3, 10):
            try:
                from sys import stdlib_module_names
                return set(stdlib_module_names)
            except ImportError:
                return common_libs
        return common_libs

    def _create_virtual_env(self, venv_path_str: str) -> Tuple[str, str]:
        venv_path = Path(venv_path_str)
        if sys.platform == "win32":
            python_exe = venv_path / 'Scripts' / 'python.exe'
            pip_exe = venv_path / 'Scripts' / 'pip.exe'
        else:
            python_exe = venv_path / 'bin' / 'python'
            pip_exe = venv_path / 'bin' / 'pip'
        if not python_exe.exists() or not pip_exe.exists():
            global_logger.info(f"共享虚拟环境不完整或不存在,正在创建于: {venv_path_str}")
            try:
                subprocess.run([sys.executable, "-m", "venv", venv_path_str], check=True, capture_output=True, text=True, timeout=120)
            except subprocess.CalledProcessError as e:
                global_logger.error(f"创建虚拟环境失败: {e.stderr}")
                raise RuntimeError(f"创建虚拟环境失败,错误: {e.stderr}")
        if not python_exe.exists() or not pip_exe.exists():
             raise FileNotFoundError(f"虚拟环境创建后,未找到 Python/Pip 可执行文件。")
        global_logger.info("虚拟环境验证成功。")
        return str(python_exe), str(pip_exe)

    def _post_process_code(self, generated_code: str) -> Tuple[str, Set[str]]:
        cleaned_code = re.sub(r"```python\n|```", "", generated_code).strip()
        required_deps = set(re.findall(r"#\s*REQUIRE:\s*(\S+)", cleaned_code))
        final_code = "\n".join([line for line in cleaned_code.split('\n') if not line.strip().startswith("# REQUIRE:")])
        global_logger.info(f"代码后处理完成。提取的依赖: {required_deps or '无'}。")
        return final_code.strip(), required_deps

    def _create_task_work_dir(self) -> Path:
        timestamp = datetime.datetime.now().strftime("task_%Y%m%d_%H%M%S")
        task_work_dir = self.shared_work_dir / timestamp
        task_work_dir.mkdir(exist_ok=True)
        global_logger.info(f"任务工作目录已创建: {task_work_dir}")
        return task_work_dir

    async def get_llm(self) -> BaseLanguageModel:
        """
        根据单一的LLM配置获取并缓存LLM实例。
        通过创建自定义httpx客户端来禁用系统代理 。
        """
        if self.llm_instance:
            return self.llm_instance

        if not self.llm_config:
            raise ValueError("LLM 配置 'llm_config' 在 mcp_config.json 中未找到。")

        provider = self.llm_config.get("provider")
        model_name = self.llm_config.get("model")
  
        global_logger.info(f"正在加载模型: {model_name} (Provider: {provider})")

        # ⭐️ 关键修复:创建同步的 httpx.Client 实例 ,而不是异步的
        # trust_env=False 会让 httpx 忽略所有代理环境变量 (HTTP_PROXY, HTTPS_PROXY等 )
        sync_client = httpx.Client(trust_env=False )

        llm: BaseLanguageModel

        if provider == "ollama":
            if not model_name: raise ValueError("当 provider 是 'ollama' 时, 必须在配置中提供 'model'。")
            llm = Ollama(
                model=model_name,
                temperature=0.1,
                top_p=0.9,
                timeout=300,
                # 传入正确的同步客户端实例
                http_client=sync_client
             )

        elif provider == "api":
            api_key = self.llm_config.get("api_key")
            base_url = self.llm_config.get("base_url")
      
            if not model_name: raise ValueError("当 provider 是 'api' 时, 必须在配置中提供 'model'。")
            if not api_key or "YOUR_" in api_key:
                raise ValueError("当 provider 是 'api' 时, 必须在 mcp_config.json 中设置有效的 'api_key'。")
            if not base_url or "YOUR_" in base_url:
                raise ValueError("当 provider 是 'api' 时, 必须在 mcp_config.json 中设置有效的 'base_url'。")

            llm = ChatOpenAI(
                model=model_name,
                api_key=api_key,
                base_url=base_url,
                temperature=0.1,
                max_retries=2,
                timeout=300,
                # 传入正确的同步客户端实例
                http_client=sync_client
             )
  
        else:
            raise ValueError(f"不支持的 provider: '{provider}'。请在 mcp_config.json 中选择 'ollama' 或 'api'。")

        self.llm_instance = llm
        return llm

# --- 单例和工具定义 ---
workflow_executor = TaskWorkflow()

@mcp.tool()
async def generate_code(
    instruction: str = Field(description="用户的自然语言指令,用于生成Python代码。"),
    ctx: Context = Field(exclude=True)
) -> CodeGenerationResult:
    """
    第一步:根据用户的自然语言指令,使用配置好的LLM生成可执行的Python代码。
    此工具会创建一个工作目录,生成代码,并从中提取依赖项。
    """
    await ctx.info(f"收到代码生成指令: '{instruction}'")
    try:
        task_work_dir = workflow_executor._create_task_work_dir()
        await ctx.info(f"工作目录已创建: {task_work_dir}")

        llm = await workflow_executor.get_llm()
  
        prompt = CODE_GENERATION_PROMPT.format(task=instruction)
  
        model_name = workflow_executor.llm_config.get('model')
        await ctx.info(f"正在使用模型 '{model_name}' 生成代码...")
  
        response = await llm.ainvoke(prompt)
        generated_code = response.content if hasattr(response, 'content') else str(response)

        await ctx.info("代码生成成功。")

        pure_code, dependencies = workflow_executor._post_process_code(generated_code)
  
        code_path = task_work_dir / "generated_script.py"
        with open(code_path, "w", encoding="utf-8") as f:
            f.write(pure_code)
        global_logger.info(f"生成的代码已保存至: {code_path}")

        return CodeGenerationResult(
            status="success",
            code=pure_code,
            dependencies=list(dependencies),
            work_dir=str(task_work_dir)
        )
    except Exception as e:
        error_message = f"代码生成失败: {e}"
        global_logger.error(error_message, exc_info=True)
        await ctx.error(error_message)
        return CodeGenerationResult(
            status="failed", code="", dependencies=[], work_dir="", error=error_message
        )

@mcp.tool()
async def install_dependencies(dependencies: List[str] = Field(description="需要安装的Python包列表。"), work_dir: str = Field(description="必须提供由 'generate_code' 工具返回的工作目录路径。"), ctx: Context = Field(exclude=True)) -> DependencyInstallationResult:
    """
    第二步:在指定工作目录的虚拟环境中安装Python依赖项。
    如果依赖列表为空,则跳过此步骤。
    """
    await ctx.info(f"收到依赖安装指令: {dependencies} in '{work_dir}'")
    task_work_dir = Path(work_dir)
    if not task_work_dir.exists():
        return DependencyInstallationResult(status="failed", installed_packages=[], work_dir=work_dir, output="", error="工作目录不存在!")
    if not dependencies:
        await ctx.info("依赖列表为空,跳过安装。")
        return DependencyInstallationResult(status="skipped", installed_packages=[], work_dir=work_dir, output="No dependencies to install.")
    deps_to_install = {dep for dep in dependencies if dep.lower() not in workflow_executor.standard_libs}
    if not deps_to_install:
        await ctx.info(f"所有声明的依赖 {dependencies} 均为标准库,无需安装。")
        return DependencyInstallationResult(status="skipped", installed_packages=[], work_dir=work_dir, output="All dependencies are standard libraries.")
    try:
        requirements_path = task_work_dir / "requirements.txt"
        with open(requirements_path, 'w', encoding='utf-8') as f:
            for dep in deps_to_install:
                f.write(f"{dep}\n")
        command = [workflow_executor.pip_executable, "install", "-r", str(requirements_path)]
        await ctx.info(f"执行依赖安装命令: {' '.join(command)}")
        result = subprocess.run(command, cwd=str(task_work_dir), capture_output=True, text=True, timeout=300, check=False, encoding='utf-8')
        if result.returncode != 0:
            error_message = f"依赖安装失败: {result.stderr}"
            global_logger.error(error_message)
            await ctx.error(error_message)
            return DependencyInstallationResult(status="failed", installed_packages=[], work_dir=work_dir, output=result.stdout, error=result.stderr)
        await ctx.info(f"依赖 {list(deps_to_install)} 安装成功。")
        return DependencyInstallationResult(status="success", installed_packages=list(deps_to_install), work_dir=work_dir, output=result.stdout)
    except Exception as e:
        error_message = f"安装依赖时发生意外错误: {e}"
        global_logger.error(error_message, exc_info=True)
        await ctx.error(error_message)
        return DependencyInstallationResult(status="failed", installed_packages=[], work_dir=work_dir, output="", error=error_message)

@mcp.tool()
async def execute_code(work_dir: str = Field(description="必须提供由 'generate_code' 工具返回的、包含 'generated_script.py' 的工作目录路径。"), ctx: Context = Field(exclude=True)) -> ExecutionResult:
    """
    第三步:在指定工作目录的虚拟环境中执行 'generated_script.py' 文件。
    这个工具应该在代码生成和依赖安装(如果需要)之后被调用。
    """
    await ctx.info(f"收到代码执行指令 in '{work_dir}'")
    task_work_dir = Path(work_dir)
    script_path = task_work_dir / "generated_script.py"
    if not script_path.exists():
        error_msg = f"执行失败: 在工作目录 '{work_dir}' 中未找到 'generated_script.py'。"
        await ctx.error(error_msg)
        return ExecutionResult(status="failed", output="", error=error_msg, returncode=-1, work_dir=work_dir)
    try:
        command = [workflow_executor.python_executable, str(script_path)]
        await ctx.info(f"执行代码命令: {' '.join(command)}")
        result = subprocess.run(command, cwd=str(task_work_dir), capture_output=True, text=True, timeout=300, check=False, encoding='utf-8')
        is_successful = "任务成功完成" in result.stdout
        final_status = "success" if is_successful and result.returncode == 0 else "failed"
        await ctx.info(f"代码执行完成。状态: {final_status}。")
        return ExecutionResult(status=final_status, output=result.stdout, error=result.stderr, returncode=result.returncode, work_dir=work_dir)
    except Exception as e:
        error_message = f"执行代码时发生意外错误: {e}"
        global_logger.error(error_message, exc_info=True)
        await ctx.error(error_message)
        return ExecutionResult(status="failed", output="", error=error_message, returncode=-1, work_dir=work_dir)

# --- 服务器启动---
def run():
    """
    服务器主入口函数。
    """
    try:
        subprocess.run(["ollama", "list"], check=True, capture_output=True, text=True)
        global_logger.info("Ollama 服务已在运行。")
    except (subprocess.CalledProcessError, FileNotFoundError):
        global_logger.warning("Ollama服务可能未运行。如果需要使用 'ollama' provider,请确保Ollama已安装并正在运行。")
  
    mcp.run()

if __name__ == "__main__":
    run()

Reply View the author