亚欧色一区w666天堂,色情一区二区三区免费看,少妇特黄A片一区二区三区,亚洲人成网站999久久久综合,国产av熟女一区二区三区

  • 發布文章
  • 消息中心
點贊
收藏
評論
分享
原創

【MCP-03】一次完整的MCP和LLM交互流程

2025-08-19 10:32:01
10
0

前言

從[SpringAI MCP技術試用]到[【01】JSON-RPC2.0協議],[【02】SSE和StreamableHttp技術整理]這幾篇總結大概說明了下MCP出現的原因,以及簡單說明了下技術細節,但是MCP還是主要為LLM大模型服務的,那MCP和LLM大模型是怎么交互的呢?這里簡單總結下交互流程。

整體流程

企業微信截圖_b2eb3720-59d8-4390-bdf1-363573f56018.png
企業微信截圖_af8aa055-e815-4c23-9671-e5168c6fb90f.png

一些說明:
1)用戶向AIAgent問“幫我計算3*111”,此時AIAgent就是MCP Client,MCP Client會先拉取所有注冊到AIAgent的MCPServer元數據信息,然后把用戶Query和所有的MCP Server以及MCP Tool的信息一起發送給 LLM。
2)LLM拿到信息后開始推理,基于用戶的Query和MCP Server的信息,選出解決用戶問題最合適的那個MCP Server和MCP Tool,然后返回給AIAgent(MCP Client)。這里LLM返回給AIAgent的信息是:“你用multiply這個 MCP Server里的Calculate_McpServer這個MCP Tool吧,它可以解決用戶的問題”
3)AIAgent(MCP Client)現在知道該使用哪個MCP Server里的哪個MCP Tool了,直接調用那個MCP Tool,獲取結果。調用Calculate_McpServer這個MCP Server里的multiply這個MCP Tool。
4)Calculate_McpServer 返回結果(計算乘法后的結果)給 AIAgent(MCP Client)。
5)AIAgent(MCP Client)把用戶的問題和從Calculate_McpServer處拿到的結果再一次給了LLM,目的是讓LLM結合問題和答案判斷下是否要做上述的循環,如果沒有則規整一下內容返回。
6)LLM把整理后的內容返回給AIAgent(MCP Client),最后AIAgent(MCP Client)再返回整合后的內容給用戶。
需要注意的是不是所有的大模型都支持FunctionCall的方式,還有一種通用的方式是走系統提示詞(system prompt),大體思路和FunctionCall差不多,只是系統提示詞(system prompt)需要LLM打模型按指定的格式返回MCPServer的名稱Tool以及args參數。

Demo

下文用Python,Java,Golang編寫上文時序圖中的Calculate_McpServer的Demo,嘗試LLM使用MCP Tool完成四則計算任務,由于HTTP+SSE已經被官方廢棄,下文只使用streamable_http傳輸協議編寫Demo。

MCPServer

from mcp.server.fastmcp import FastMCP
import logging
 
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
 
MCP_SERVER_NAME = "Calculate_McpServer"
 
logger = logging.getLogger(MCP_SERVER_NAME)
# 初始化FastMCP服務器
mcp = FastMCP(name=MCP_SERVER_NAME,instructions="數學四則計算")
 
@mcp.tool(name="add", description="對兩個數字進行加法")
def add(a: float, b: float) -> float:
    """
    Add two numbers.
 
    Parameters:
    - a (float): First number (required)
    - b (float): Second number (required)
 
    Returns:
    - float: The result of a + b
    """
    return a + b
 
@mcp.tool(name="subtract", description="對兩個數字進行減法")
def subtract(a: float, b: float) -> float:
    """
    Subtract two numbers.
 
    Parameters:
    - a (float): The number to subtract from (required)
    - b (float): The number to subtract (required)
 
    Returns:
    - float: The result of a - b
    """
    return a - b
 
@mcp.tool(name="multiply", description="對兩個數字進行乘法")
def multiply(a: float, b: float) -> float:
    """
    Multiply two numbers.
 
    Parameters:
    - a (float): First number (required)
    - b (float): Second number (required)
 
    Returns:
    - float: The result of a * b
    """
    return a * b
 
@mcp.tool(name="divide", description="對兩個數字進行除法")
def divide(a: float, b: float) -> float:
    """
    Divide two numbers.
 
    Parameters:
    - a (float): Numerator (required)
    - b (float): Denominator (required, must not be zero)
 
    Returns:
    - float: The result of a / b
    """
    if b == 0:
        raise ValueError("Division by zero is not allowed")
    return a / b
 
if __name__ == "__main__":
    mcp.settings.host = "0.0.0.0"
    mcp.settings.port = 8000
    mcp.settings.log_level = "INFO"
 
    # stateless_http和json_response,兩個參數默認都為False
    # stateless_http
    # 控制是否開啟SSE通道和是否對對客戶端會話進行管理
    # json_response
    # 控制Post請求響應結果數據結構是否用JSON還是SSE事件數據流(不是走SSE通道,只是用SSE事件數據格式)
    # mcp.settings.json_response = True
    # mcp.settings.stateless_http = True
     
    # 初始化并運行服務器
    print("Starting MCPServer...")
    # mcp.run(transport='sse')
    mcp.run(transport="streamable-http")

MCPClient

from typing import Optional
import asyncio
from contextlib import AsyncExitStack
from mcp.client.streamable_http import streamablehttp_client
from mcp import ClientSession, Tool
from typing import Optional, List
from utils import logger
 
"""
官方MCP client demo:github.com/modelcontextprotocol/quickstart-resources/blob/main/mcp-client-python/client.py
 """
 
# 配置日志記錄
logger = logger.setup_logging()
 
 
def convert_tools(tools: List[Tool]):
    """
    將MCP Server的list tools獲取到的工具列表,轉換為OpenAI API的可用工具列表
    """
    ret = []
    for tool in tools:
        parameters = {
            "type": "object",
            "properties": {},
            "required": (
                tool.inputSchema["required"] if "required" in tool.inputSchema else []
            ),
        }
        properties = tool.inputSchema["properties"]
        for param_name in properties:
            if "type" in properties[param_name]:
                param_type = properties[param_name]["type"]
            elif (
                "anyOf" in properties[param_name]
                and len(properties[param_name]["anyOf"]) > 0
            ):
                param_type = properties[param_name]["anyOf"][0]["type"]
            else:
                param_type = "string"
            parameters["properties"][param_name] = {
                "type": param_type,
                "description": properties[param_name].get("description", ""),
            }
 
        ret.append(
            {
                "type": "function",
                "function": {
                    "name": tool.name,
                    "description": tool.description,
                    "parameters": parameters,
                },
            }
        )
    return ret
 
class MCPClient:
    def __init__(self):
        # 初始化會話和客戶端對象
        self.session: Optional[ClientSession] = None
        self.exit_stack = AsyncExitStack()
        self.stream_client = None  # 修改為更通用的名稱
 
    async def connect_to_server(self, url):
        """Connect to an MCP server using Streamable HTTP.
        Args:
            url: Streamable HTTP 地址
        """
        mcp_timeout = 300
        logger.info(f"mcp_timeout: {mcp_timeout}")
 
        # 使用 streamable_http_client 替代 sse_client
        http_transport = await self.exit_stack.enter_async_context(
            streamablehttp_client(url=url, timeout=mcp_timeout)  # 修改為 streamable_http_client
        )
        self.streamable_http, self.write, _ = http_transport
        self.session = await self.exit_stack.enter_async_context(
            ClientSession(self.streamable_http, self.write)
        )
 
        logger.info(f"connect_to_server successful, streamable http url: {url}")
        await self.session.initialize()
 
    async def cleanup(self):
        await self.exit_stack.aclose()
 
    @classmethod
    def list_tools(cls, mcp_server_url, reqid):
        """
        列出指定MCP Server上可用的工具
        Args:
            mcp_server_url: MCP Server的URL
        Returns:
            可用的工具列表
        """
        async def async_task():
            client = cls()
            await client.connect_to_server(mcp_server_url)
            response = await client.session.list_tools()
            mcp_tools = response.tools
            available_tools = convert_tools(mcp_tools)
            logger.info(f"reqid:{reqid},tool size:{len(available_tools)},tool name:{",".join([tool["function"]["name"] for tool in available_tools])}")
            # logger.info([tool["function"]["name"] for tool in available_tools])
            await client.cleanup()
            return available_tools
 
        available_tools = asyncio.run(async_task())
        return available_tools
 
    @classmethod
    def call_tool(cls, mcp_server_url: str, tool_name: str, tool_args: dict, reqid):
        """
        調用指定MCP Server上的工具
        Args:
            mcp_server_url: MCP Server的URL
            tool_name: 工具名稱
            tool_args: 工具參數
        Returns:
            工具調用結果
        """
        async def async_task():
            client = cls()
            await client.connect_to_server(mcp_server_url)
            logger.info(f"reqid;{reqid}, mcp client, tool_name: {tool_name}, tool_args: {tool_args}")
            result = await client.session.call_tool(tool_name, tool_args)
            logger.info(f"reqid;{reqid}, mcp client, tool_args: {tool_args}, result: {result}")
            await client.cleanup()
            return result
 
        result = asyncio.run(async_task())
        return result
 
if __name__ == "__main__":
    mcpserver_url = "127.0.0.1:8000/mcp"  # 修改為stream端點
    # MCPClient.list_tools(mcpserver_url, "mock reqid")
    MCPClient.call_tool(mcpserver_url,"multiply",{"a": 3, "b": 111},"mock reqid")

LLMChat2MCP_FunctionCall

from openai import OpenAI
from utils import logger
from dotenv import load_dotenv
import os
from streamablehttp_mcpclient import MCPClient
import json
import uuid
import argparse
import traceback
 
logger = logger.setup_logging()
 
# load environment variables from .env
load_dotenv()
 
API_KEY = os.environ["API_KEY"]
BASE_URL = os.environ["BASE_URL"]
MODEL_NAME = os.environ["MODEL_NAME"]
 
 
def chat_loop(mcp_server_url):
    """Run an interactive chat loop"""
    logger.info("\nMCP Client Started!")
    logger.info("Type your queries or 'quit' to exit.")
 
    while True:
        try:
            query = input("\nQuery: ").strip()
            if not query:  # 如果輸入為空,直接跳過
                continue
            if query.lower() == "quit":
                break
            random_uuid = uuid.uuid4()
            response = process_query(mcp_server_url, random_uuid, query)
            logger.info(response)
        except Exception as e:
            logger.error(f"\nchat_loop Error: {e}\n{traceback.format_exc()}")
 
 
def process_query(mcp_server_url, reqid, query):
    """
    處理查詢,使用OpenAI API和MCP工具
    Args:
        mcp_server_url: MCP Server的URL
        reqid: 請求ID
        query: 用戶查詢
    Returns:
        處理結果
    """
    available_tools = MCPClient.list_tools(mcp_server_url, reqid)
    openai = OpenAI(api_key=API_KEY, base_url=BASE_URL)
 
    messages = [{"role": "user", "content": query}]
 
    try:
        current_response = openai.chat.completions.create(
            model=MODEL_NAME,
            messages=messages,
            tools=available_tools,
            stream=False,
        )
 
        final_text = []
 
        if current_response.choices[0].message.content:
            final_result = current_response.choices[0].message.content
            final_text.append(final_result)
            # logger.info("AI:" + final_result)
 
        # 處理返回的內容
        content = current_response.choices[0]
        # logger.info(
        #     "OpenAI Response JSON:\n%s",
        #     json.dumps(current_response.model_dump(), indent=4),
        # )
        if content.finish_reason == "tool_calls":
            # 如果需要使用工具,解析工具調用
            tool_call = content.message.tool_calls[0]
            tool_name = tool_call.function.name
            tool_args = json.loads(tool_call.function.arguments)
            callInfoStr = f"[Calling tool {tool_name} with args {tool_args}]"
            logger.info(callInfoStr)
            # 執行工具
            result = MCPClient.call_tool(mcp_server_url, tool_name, tool_args, "LLMreqId1")
            final_text.append(callInfoStr)
 
            # 將結果存入消息歷史
            # 檢查 result 和 result.content 是否存在
            tool_response = ""
            if result and hasattr(result, "content") and result.content:
                tool_response = result.content[0].text
            else:
                tool_response = "Tool returned empty or invalid response"
 
            messages.append(content.message.model_dump())
            messages.append(
                {
                    "role": "tool",
                    "content": tool_response,
                    "tool_call_id": tool_call.id,
                }
            )
 
            # 將結果返回給大模型生成最終響應
            current_response = openai.chat.completions.create(
                model=MODEL_NAME,
                messages=messages,
                tools=available_tools,
                stream=False,
            )
            final_result = current_response.choices[0].message.content
            final_text.append(final_result)
        return "\n".join(final_text)
    except Exception as e:
        logger.error(
            f"process_query Error processing query: {e}\n{traceback.format_exc()}"
        )
        return None
 
 
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="mcp client call tool")
    parser.add_argument(
        "--url", required=True, help="Full URL, e.g. localhost:8114/sse"
    )
    args = parser.parse_args()
    # query = "你是誰?"
    # result = process_query("localhost:8114/sse", "mock reqid", query)
    # logger.info(f"Query result: {result}")
    chat_loop(args.url)

LLMChat2MCP_SystemPrompt

from openai import OpenAI
from utils import logger
from dotenv import load_dotenv
import os
from streamablehttp_mcpclient import MCPClient
import json
import uuid
import argparse
import traceback
import re
 
logger = logger.setup_logging()
 
# 加載環境變量
load_dotenv()
 
API_KEY = os.environ["API_KEY"]
BASE_URL = os.environ["BASE_URL"]
MODEL_NAME = os.environ["MODEL_NAME"]
MAX_TOOL_CALLS = 3  # 最大工具調用次數,防止無限循環
 
 
def build_system_prompt(available_tools):
    """構建系統提示詞,描述所有可用的MCP工具"""
    tools_desc = []
 
    # 解析工具列表并生成描述
    for tool in available_tools:
        func = tool["function"]
        name = func["name"]
        desc = func["description"]
        params = func["parameters"]["properties"]
 
        # 生成參數描述
        param_desc = []
        for param_name, param_info in params.items():
            param_desc.append(
                f"{param_name} ({param_info.get('type', 'string')}): "
                f"{param_info.get('description', 'No description')}"
            )
 
        tools_desc.append(
            f"工具名稱: {name}\n"
            f"描述: {desc}\n"
            f"參數: {', '.join(param_desc)}\n"
            "---"
        )
 
    # 構建完整的系統提示詞
    return (
        "你是一個智能助手,可以使用以下工具解決問題。當用戶請求需要工具時,"
        "請嚴格按以下格式響應:\n"
        'TOOL_CALL: {"tool": "工具名稱", "arguments": {"參數1": "值1", ...}}\n\n'
        "可用工具列表:\n" + "\n".join(tools_desc) + "\n\n" + "重要規則:\n"
        "1. 只有在需要時才調用工具\n"
        "2. 響應必須包含TOOL_CALL: 前綴\n"
        "3. 不要解釋工具調用,只需輸出JSON\n"
        "4. 如果不需要工具,直接回答用戶問題"
    )
 
 
def extract_tool_call(response_content):
    """從LLM響應中提取工具調用信息"""
    # 使用正則表達式匹配TOOL_CALL: {...} 格式
    pattern = r'TOOL_CALL:\s*(\{.*\})'
    match = re.search(pattern, response_content, re.DOTALL)
    if not match:
        return None
    jsonStr=match.group(1)
    try:
        tool_call = json.loads(jsonStr)
        return tool_call
    except json.JSONDecodeError:
        logger.error(f"JSON解析錯誤: {jsonStr}")
        return None
 
 
def process_query(mcp_server_url, reqid, query):
    """
    處理查詢,使用OpenAI API和MCP工具(系統提示詞方式)
    """
    # 獲取可用工具列表
    available_tools = MCPClient.list_tools(mcp_server_url, reqid)
 
    # 構建系統提示詞
    system_prompt = build_system_prompt(available_tools)
 
    openai = OpenAI(api_key=API_KEY, base_url=BASE_URL)
    messages = [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": query},
    ]
 
    tool_calls_count = 0
    final_response = None
 
    while tool_calls_count < MAX_TOOL_CALLS:
        try:
            # 調用OpenAI API
            response = openai.chat.completions.create(
                model=MODEL_NAME,
                messages=messages,
                stream=False,
            )
 
            # 獲取響應內容
            response_content = response.choices[0].message.content
            # logger.info(f"LLM原始響應: {response_content}")
 
            # 檢查是否需要調用工具
            tool_call = extract_tool_call(response_content)
 
            if tool_call:
                # 處理工具調用
                tool_name = tool_call["tool"]
                tool_args = tool_call["arguments"]
 
                call_info = f"[調用工具 {tool_name} 參數: {tool_args}]"
                logger.info(call_info)
 
                # 執行工具調用
                result = MCPClient.call_tool(
                    mcp_server_url, tool_name, tool_args, reqid
                )
 
                # 處理工具響應
                tool_response = ""
                if result and hasattr(result, "content") and result.content:
                    tool_response = result.content[0].text
                else:
                    tool_response = "工具返回空響應或無效響應"
 
                logger.info(f"工具響應: {tool_response}")
 
                # 添加到消息歷史
                messages.append({"role": "assistant", "content": response_content})
                messages.append(
                    {
                        "role": "user",
                        "content": f"工具調用結果: {tool_response}\n\n請根據此結果回答用戶問題",
                    }
                )
 
                tool_calls_count += 1
            else:
                # 沒有工具調用,直接返回結果
                final_response = response_content
                break
 
        except Exception as e:
            logger.error(f"處理錯誤: {e}\n{traceback.format_exc()}")
            final_response = "處理查詢時出錯"
            break
 
    # 如果達到最大調用次數仍未獲得最終響應
    if not final_response:
        final_response = "已達到最大工具調用次數。" "最后響應: " + response_content
 
    return final_response
 
 
def chat_loop(mcp_server_url):
    """Run an interactive chat loop"""
    logger.info("\nMCP Client Started!")
    logger.info("Type your queries or 'quit' to exit.")
 
    while True:
        try:
            query = input("\nQuery: ").strip()
            if not query:  # 如果輸入為空,直接跳過
                continue
            if query.lower() == "quit":
                break
            random_uuid = uuid.uuid4()
            response = process_query(mcp_server_url, random_uuid, query)
            logger.info(response)
        except Exception as e:
            logger.error(f"\nchat_loop Error: {e}\n{traceback.format_exc()}")
 
 
# chat_loop 和 __main__ 部分保持不變(與原始代碼相同)
if __name__ == "__main__":
#    tool_call= extract_tool_call('TOOL_CALL: {"tool": "multiply", "arguments": {"a": 3, "b": 77}}')
#    logger.info(tool_call)
    parser = argparse.ArgumentParser(description="mcp client call tool")
    parser.add_argument(
        "--url", required=True, help="Full URL, e.g. localhost:8114/sse"
    )
    args = parser.parse_args()
    # query = "你是誰?"
    # result = process_query("localhost:8114/sse", "mock reqid", query)
    # logger.info(f"Query result: {result}")
    chat_loop(args.url)

企業微信截圖_769c503d-1943-4049-bed5-e5dba05d9183.png

企業微信截圖_9075ba78-e1b1-4cec-8c50-26d68bf801d2.png

總結

1,MCP HTTP+SSE傳輸方式官方已經廢棄,后續主要使用StreamableHTTP傳輸方式。
2,MCP StreamableHTTP提供了stateless_http和json_response兩個重要參數細化對的AI不同場景處理能力。
3,不是所有的大模型都支持FunctionCall的方式,還有一種通用的方式是走系統提示詞(system prompt),大體思路和FunctionCall差不多,只是系統提示詞(system prompt)需要LLM打模型按指定的格式返回MCPServer的名稱Tool以及args參數。
 4,通過上述代碼可知,在AIAgent的整個調用流程中,LLM大模型只做推理,真正的ToolCall還是AIAgent角色,LLM大模型會返回需要調用的MCPServer、調用的Tool、調用的參數給到AIAgent,AIAgent在使用MCPClient調用MCPServer,在將返回的結果給到LLM做下一步的推理,重復上述過程,直到LLM認為任務結束。

0條評論
0 / 1000
wanghg11
21文章數
4粉絲數
wanghg11
21 文章 | 4 粉絲
原創

【MCP-03】一次完整的MCP和LLM交互流程

2025-08-19 10:32:01
10
0

前言

從[SpringAI MCP技術試用]到[【01】JSON-RPC2.0協議],[【02】SSE和StreamableHttp技術整理]這幾篇總結大概說明了下MCP出現的原因,以及簡單說明了下技術細節,但是MCP還是主要為LLM大模型服務的,那MCP和LLM大模型是怎么交互的呢?這里簡單總結下交互流程。

整體流程

企業微信截圖_b2eb3720-59d8-4390-bdf1-363573f56018.png
企業微信截圖_af8aa055-e815-4c23-9671-e5168c6fb90f.png

一些說明:
1)用戶向AIAgent問“幫我計算3*111”,此時AIAgent就是MCP Client,MCP Client會先拉取所有注冊到AIAgent的MCPServer元數據信息,然后把用戶Query和所有的MCP Server以及MCP Tool的信息一起發送給 LLM。
2)LLM拿到信息后開始推理,基于用戶的Query和MCP Server的信息,選出解決用戶問題最合適的那個MCP Server和MCP Tool,然后返回給AIAgent(MCP Client)。這里LLM返回給AIAgent的信息是:“你用multiply這個 MCP Server里的Calculate_McpServer這個MCP Tool吧,它可以解決用戶的問題”
3)AIAgent(MCP Client)現在知道該使用哪個MCP Server里的哪個MCP Tool了,直接調用那個MCP Tool,獲取結果。調用Calculate_McpServer這個MCP Server里的multiply這個MCP Tool。
4)Calculate_McpServer 返回結果(計算乘法后的結果)給 AIAgent(MCP Client)。
5)AIAgent(MCP Client)把用戶的問題和從Calculate_McpServer處拿到的結果再一次給了LLM,目的是讓LLM結合問題和答案判斷下是否要做上述的循環,如果沒有則規整一下內容返回。
6)LLM把整理后的內容返回給AIAgent(MCP Client),最后AIAgent(MCP Client)再返回整合后的內容給用戶。
需要注意的是不是所有的大模型都支持FunctionCall的方式,還有一種通用的方式是走系統提示詞(system prompt),大體思路和FunctionCall差不多,只是系統提示詞(system prompt)需要LLM打模型按指定的格式返回MCPServer的名稱Tool以及args參數。

Demo

下文用Python,Java,Golang編寫上文時序圖中的Calculate_McpServer的Demo,嘗試LLM使用MCP Tool完成四則計算任務,由于HTTP+SSE已經被官方廢棄,下文只使用streamable_http傳輸協議編寫Demo。

MCPServer

from mcp.server.fastmcp import FastMCP
import logging
 
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
 
MCP_SERVER_NAME = "Calculate_McpServer"
 
logger = logging.getLogger(MCP_SERVER_NAME)
# 初始化FastMCP服務器
mcp = FastMCP(name=MCP_SERVER_NAME,instructions="數學四則計算")
 
@mcp.tool(name="add", description="對兩個數字進行加法")
def add(a: float, b: float) -> float:
    """
    Add two numbers.
 
    Parameters:
    - a (float): First number (required)
    - b (float): Second number (required)
 
    Returns:
    - float: The result of a + b
    """
    return a + b
 
@mcp.tool(name="subtract", description="對兩個數字進行減法")
def subtract(a: float, b: float) -> float:
    """
    Subtract two numbers.
 
    Parameters:
    - a (float): The number to subtract from (required)
    - b (float): The number to subtract (required)
 
    Returns:
    - float: The result of a - b
    """
    return a - b
 
@mcp.tool(name="multiply", description="對兩個數字進行乘法")
def multiply(a: float, b: float) -> float:
    """
    Multiply two numbers.
 
    Parameters:
    - a (float): First number (required)
    - b (float): Second number (required)
 
    Returns:
    - float: The result of a * b
    """
    return a * b
 
@mcp.tool(name="divide", description="對兩個數字進行除法")
def divide(a: float, b: float) -> float:
    """
    Divide two numbers.
 
    Parameters:
    - a (float): Numerator (required)
    - b (float): Denominator (required, must not be zero)
 
    Returns:
    - float: The result of a / b
    """
    if b == 0:
        raise ValueError("Division by zero is not allowed")
    return a / b
 
if __name__ == "__main__":
    mcp.settings.host = "0.0.0.0"
    mcp.settings.port = 8000
    mcp.settings.log_level = "INFO"
 
    # stateless_http和json_response,兩個參數默認都為False
    # stateless_http
    # 控制是否開啟SSE通道和是否對對客戶端會話進行管理
    # json_response
    # 控制Post請求響應結果數據結構是否用JSON還是SSE事件數據流(不是走SSE通道,只是用SSE事件數據格式)
    # mcp.settings.json_response = True
    # mcp.settings.stateless_http = True
     
    # 初始化并運行服務器
    print("Starting MCPServer...")
    # mcp.run(transport='sse')
    mcp.run(transport="streamable-http")

MCPClient

from typing import Optional
import asyncio
from contextlib import AsyncExitStack
from mcp.client.streamable_http import streamablehttp_client
from mcp import ClientSession, Tool
from typing import Optional, List
from utils import logger
 
"""
官方MCP client demo:github.com/modelcontextprotocol/quickstart-resources/blob/main/mcp-client-python/client.py
 """
 
# 配置日志記錄
logger = logger.setup_logging()
 
 
def convert_tools(tools: List[Tool]):
    """
    將MCP Server的list tools獲取到的工具列表,轉換為OpenAI API的可用工具列表
    """
    ret = []
    for tool in tools:
        parameters = {
            "type": "object",
            "properties": {},
            "required": (
                tool.inputSchema["required"] if "required" in tool.inputSchema else []
            ),
        }
        properties = tool.inputSchema["properties"]
        for param_name in properties:
            if "type" in properties[param_name]:
                param_type = properties[param_name]["type"]
            elif (
                "anyOf" in properties[param_name]
                and len(properties[param_name]["anyOf"]) > 0
            ):
                param_type = properties[param_name]["anyOf"][0]["type"]
            else:
                param_type = "string"
            parameters["properties"][param_name] = {
                "type": param_type,
                "description": properties[param_name].get("description", ""),
            }
 
        ret.append(
            {
                "type": "function",
                "function": {
                    "name": tool.name,
                    "description": tool.description,
                    "parameters": parameters,
                },
            }
        )
    return ret
 
class MCPClient:
    def __init__(self):
        # 初始化會話和客戶端對象
        self.session: Optional[ClientSession] = None
        self.exit_stack = AsyncExitStack()
        self.stream_client = None  # 修改為更通用的名稱
 
    async def connect_to_server(self, url):
        """Connect to an MCP server using Streamable HTTP.
        Args:
            url: Streamable HTTP 地址
        """
        mcp_timeout = 300
        logger.info(f"mcp_timeout: {mcp_timeout}")
 
        # 使用 streamable_http_client 替代 sse_client
        http_transport = await self.exit_stack.enter_async_context(
            streamablehttp_client(url=url, timeout=mcp_timeout)  # 修改為 streamable_http_client
        )
        self.streamable_http, self.write, _ = http_transport
        self.session = await self.exit_stack.enter_async_context(
            ClientSession(self.streamable_http, self.write)
        )
 
        logger.info(f"connect_to_server successful, streamable http url: {url}")
        await self.session.initialize()
 
    async def cleanup(self):
        await self.exit_stack.aclose()
 
    @classmethod
    def list_tools(cls, mcp_server_url, reqid):
        """
        列出指定MCP Server上可用的工具
        Args:
            mcp_server_url: MCP Server的URL
        Returns:
            可用的工具列表
        """
        async def async_task():
            client = cls()
            await client.connect_to_server(mcp_server_url)
            response = await client.session.list_tools()
            mcp_tools = response.tools
            available_tools = convert_tools(mcp_tools)
            logger.info(f"reqid:{reqid},tool size:{len(available_tools)},tool name:{",".join([tool["function"]["name"] for tool in available_tools])}")
            # logger.info([tool["function"]["name"] for tool in available_tools])
            await client.cleanup()
            return available_tools
 
        available_tools = asyncio.run(async_task())
        return available_tools
 
    @classmethod
    def call_tool(cls, mcp_server_url: str, tool_name: str, tool_args: dict, reqid):
        """
        調用指定MCP Server上的工具
        Args:
            mcp_server_url: MCP Server的URL
            tool_name: 工具名稱
            tool_args: 工具參數
        Returns:
            工具調用結果
        """
        async def async_task():
            client = cls()
            await client.connect_to_server(mcp_server_url)
            logger.info(f"reqid;{reqid}, mcp client, tool_name: {tool_name}, tool_args: {tool_args}")
            result = await client.session.call_tool(tool_name, tool_args)
            logger.info(f"reqid;{reqid}, mcp client, tool_args: {tool_args}, result: {result}")
            await client.cleanup()
            return result
 
        result = asyncio.run(async_task())
        return result
 
if __name__ == "__main__":
    mcpserver_url = "127.0.0.1:8000/mcp"  # 修改為stream端點
    # MCPClient.list_tools(mcpserver_url, "mock reqid")
    MCPClient.call_tool(mcpserver_url,"multiply",{"a": 3, "b": 111},"mock reqid")

LLMChat2MCP_FunctionCall

from openai import OpenAI
from utils import logger
from dotenv import load_dotenv
import os
from streamablehttp_mcpclient import MCPClient
import json
import uuid
import argparse
import traceback
 
logger = logger.setup_logging()
 
# load environment variables from .env
load_dotenv()
 
API_KEY = os.environ["API_KEY"]
BASE_URL = os.environ["BASE_URL"]
MODEL_NAME = os.environ["MODEL_NAME"]
 
 
def chat_loop(mcp_server_url):
    """Run an interactive chat loop"""
    logger.info("\nMCP Client Started!")
    logger.info("Type your queries or 'quit' to exit.")
 
    while True:
        try:
            query = input("\nQuery: ").strip()
            if not query:  # 如果輸入為空,直接跳過
                continue
            if query.lower() == "quit":
                break
            random_uuid = uuid.uuid4()
            response = process_query(mcp_server_url, random_uuid, query)
            logger.info(response)
        except Exception as e:
            logger.error(f"\nchat_loop Error: {e}\n{traceback.format_exc()}")
 
 
def process_query(mcp_server_url, reqid, query):
    """
    處理查詢,使用OpenAI API和MCP工具
    Args:
        mcp_server_url: MCP Server的URL
        reqid: 請求ID
        query: 用戶查詢
    Returns:
        處理結果
    """
    available_tools = MCPClient.list_tools(mcp_server_url, reqid)
    openai = OpenAI(api_key=API_KEY, base_url=BASE_URL)
 
    messages = [{"role": "user", "content": query}]
 
    try:
        current_response = openai.chat.completions.create(
            model=MODEL_NAME,
            messages=messages,
            tools=available_tools,
            stream=False,
        )
 
        final_text = []
 
        if current_response.choices[0].message.content:
            final_result = current_response.choices[0].message.content
            final_text.append(final_result)
            # logger.info("AI:" + final_result)
 
        # 處理返回的內容
        content = current_response.choices[0]
        # logger.info(
        #     "OpenAI Response JSON:\n%s",
        #     json.dumps(current_response.model_dump(), indent=4),
        # )
        if content.finish_reason == "tool_calls":
            # 如果需要使用工具,解析工具調用
            tool_call = content.message.tool_calls[0]
            tool_name = tool_call.function.name
            tool_args = json.loads(tool_call.function.arguments)
            callInfoStr = f"[Calling tool {tool_name} with args {tool_args}]"
            logger.info(callInfoStr)
            # 執行工具
            result = MCPClient.call_tool(mcp_server_url, tool_name, tool_args, "LLMreqId1")
            final_text.append(callInfoStr)
 
            # 將結果存入消息歷史
            # 檢查 result 和 result.content 是否存在
            tool_response = ""
            if result and hasattr(result, "content") and result.content:
                tool_response = result.content[0].text
            else:
                tool_response = "Tool returned empty or invalid response"
 
            messages.append(content.message.model_dump())
            messages.append(
                {
                    "role": "tool",
                    "content": tool_response,
                    "tool_call_id": tool_call.id,
                }
            )
 
            # 將結果返回給大模型生成最終響應
            current_response = openai.chat.completions.create(
                model=MODEL_NAME,
                messages=messages,
                tools=available_tools,
                stream=False,
            )
            final_result = current_response.choices[0].message.content
            final_text.append(final_result)
        return "\n".join(final_text)
    except Exception as e:
        logger.error(
            f"process_query Error processing query: {e}\n{traceback.format_exc()}"
        )
        return None
 
 
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="mcp client call tool")
    parser.add_argument(
        "--url", required=True, help="Full URL, e.g. localhost:8114/sse"
    )
    args = parser.parse_args()
    # query = "你是誰?"
    # result = process_query("localhost:8114/sse", "mock reqid", query)
    # logger.info(f"Query result: {result}")
    chat_loop(args.url)

LLMChat2MCP_SystemPrompt

from openai import OpenAI
from utils import logger
from dotenv import load_dotenv
import os
from streamablehttp_mcpclient import MCPClient
import json
import uuid
import argparse
import traceback
import re
 
logger = logger.setup_logging()
 
# 加載環境變量
load_dotenv()
 
API_KEY = os.environ["API_KEY"]
BASE_URL = os.environ["BASE_URL"]
MODEL_NAME = os.environ["MODEL_NAME"]
MAX_TOOL_CALLS = 3  # 最大工具調用次數,防止無限循環
 
 
def build_system_prompt(available_tools):
    """構建系統提示詞,描述所有可用的MCP工具"""
    tools_desc = []
 
    # 解析工具列表并生成描述
    for tool in available_tools:
        func = tool["function"]
        name = func["name"]
        desc = func["description"]
        params = func["parameters"]["properties"]
 
        # 生成參數描述
        param_desc = []
        for param_name, param_info in params.items():
            param_desc.append(
                f"{param_name} ({param_info.get('type', 'string')}): "
                f"{param_info.get('description', 'No description')}"
            )
 
        tools_desc.append(
            f"工具名稱: {name}\n"
            f"描述: {desc}\n"
            f"參數: {', '.join(param_desc)}\n"
            "---"
        )
 
    # 構建完整的系統提示詞
    return (
        "你是一個智能助手,可以使用以下工具解決問題。當用戶請求需要工具時,"
        "請嚴格按以下格式響應:\n"
        'TOOL_CALL: {"tool": "工具名稱", "arguments": {"參數1": "值1", ...}}\n\n'
        "可用工具列表:\n" + "\n".join(tools_desc) + "\n\n" + "重要規則:\n"
        "1. 只有在需要時才調用工具\n"
        "2. 響應必須包含TOOL_CALL: 前綴\n"
        "3. 不要解釋工具調用,只需輸出JSON\n"
        "4. 如果不需要工具,直接回答用戶問題"
    )
 
 
def extract_tool_call(response_content):
    """從LLM響應中提取工具調用信息"""
    # 使用正則表達式匹配TOOL_CALL: {...} 格式
    pattern = r'TOOL_CALL:\s*(\{.*\})'
    match = re.search(pattern, response_content, re.DOTALL)
    if not match:
        return None
    jsonStr=match.group(1)
    try:
        tool_call = json.loads(jsonStr)
        return tool_call
    except json.JSONDecodeError:
        logger.error(f"JSON解析錯誤: {jsonStr}")
        return None
 
 
def process_query(mcp_server_url, reqid, query):
    """
    處理查詢,使用OpenAI API和MCP工具(系統提示詞方式)
    """
    # 獲取可用工具列表
    available_tools = MCPClient.list_tools(mcp_server_url, reqid)
 
    # 構建系統提示詞
    system_prompt = build_system_prompt(available_tools)
 
    openai = OpenAI(api_key=API_KEY, base_url=BASE_URL)
    messages = [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": query},
    ]
 
    tool_calls_count = 0
    final_response = None
 
    while tool_calls_count < MAX_TOOL_CALLS:
        try:
            # 調用OpenAI API
            response = openai.chat.completions.create(
                model=MODEL_NAME,
                messages=messages,
                stream=False,
            )
 
            # 獲取響應內容
            response_content = response.choices[0].message.content
            # logger.info(f"LLM原始響應: {response_content}")
 
            # 檢查是否需要調用工具
            tool_call = extract_tool_call(response_content)
 
            if tool_call:
                # 處理工具調用
                tool_name = tool_call["tool"]
                tool_args = tool_call["arguments"]
 
                call_info = f"[調用工具 {tool_name} 參數: {tool_args}]"
                logger.info(call_info)
 
                # 執行工具調用
                result = MCPClient.call_tool(
                    mcp_server_url, tool_name, tool_args, reqid
                )
 
                # 處理工具響應
                tool_response = ""
                if result and hasattr(result, "content") and result.content:
                    tool_response = result.content[0].text
                else:
                    tool_response = "工具返回空響應或無效響應"
 
                logger.info(f"工具響應: {tool_response}")
 
                # 添加到消息歷史
                messages.append({"role": "assistant", "content": response_content})
                messages.append(
                    {
                        "role": "user",
                        "content": f"工具調用結果: {tool_response}\n\n請根據此結果回答用戶問題",
                    }
                )
 
                tool_calls_count += 1
            else:
                # 沒有工具調用,直接返回結果
                final_response = response_content
                break
 
        except Exception as e:
            logger.error(f"處理錯誤: {e}\n{traceback.format_exc()}")
            final_response = "處理查詢時出錯"
            break
 
    # 如果達到最大調用次數仍未獲得最終響應
    if not final_response:
        final_response = "已達到最大工具調用次數。" "最后響應: " + response_content
 
    return final_response
 
 
def chat_loop(mcp_server_url):
    """Run an interactive chat loop"""
    logger.info("\nMCP Client Started!")
    logger.info("Type your queries or 'quit' to exit.")
 
    while True:
        try:
            query = input("\nQuery: ").strip()
            if not query:  # 如果輸入為空,直接跳過
                continue
            if query.lower() == "quit":
                break
            random_uuid = uuid.uuid4()
            response = process_query(mcp_server_url, random_uuid, query)
            logger.info(response)
        except Exception as e:
            logger.error(f"\nchat_loop Error: {e}\n{traceback.format_exc()}")
 
 
# chat_loop 和 __main__ 部分保持不變(與原始代碼相同)
if __name__ == "__main__":
#    tool_call= extract_tool_call('TOOL_CALL: {"tool": "multiply", "arguments": {"a": 3, "b": 77}}')
#    logger.info(tool_call)
    parser = argparse.ArgumentParser(description="mcp client call tool")
    parser.add_argument(
        "--url", required=True, help="Full URL, e.g. localhost:8114/sse"
    )
    args = parser.parse_args()
    # query = "你是誰?"
    # result = process_query("localhost:8114/sse", "mock reqid", query)
    # logger.info(f"Query result: {result}")
    chat_loop(args.url)

企業微信截圖_769c503d-1943-4049-bed5-e5dba05d9183.png

企業微信截圖_9075ba78-e1b1-4cec-8c50-26d68bf801d2.png

總結

1,MCP HTTP+SSE傳輸方式官方已經廢棄,后續主要使用StreamableHTTP傳輸方式。
2,MCP StreamableHTTP提供了stateless_http和json_response兩個重要參數細化對的AI不同場景處理能力。
3,不是所有的大模型都支持FunctionCall的方式,還有一種通用的方式是走系統提示詞(system prompt),大體思路和FunctionCall差不多,只是系統提示詞(system prompt)需要LLM打模型按指定的格式返回MCPServer的名稱Tool以及args參數。
 4,通過上述代碼可知,在AIAgent的整個調用流程中,LLM大模型只做推理,真正的ToolCall還是AIAgent角色,LLM大模型會返回需要調用的MCPServer、調用的Tool、調用的參數給到AIAgent,AIAgent在使用MCPClient調用MCPServer,在將返回的結果給到LLM做下一步的推理,重復上述過程,直到LLM認為任務結束。

文章來自個人專欄
文章 | 訂閱
0條評論
0 / 1000
請輸入你的評論
0
0