A Proof of Concept of vLLM at the Edge with MCP calling
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

217 lines
7.4 KiB

#!/usr/bin/env python3
"""
MCP Client - Integrates vLLM with MCP weather server
"""
import asyncio
import json
import os
from typing import Optional
from contextlib import asynccontextmanager
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from openai import OpenAI
class MCPWeatherClient:
"""Client that connects vLLM to MCP weather server"""
def __init__(self, vllm_url: str = "http://127.0.0.1:8000/v1", model: str = "qwen"):
self.vllm_url = vllm_url
self.model = model
self.openai_client = OpenAI(
api_key="EMPTY", # vLLM doesn't need API key
base_url=vllm_url
)
self.mcp_session: Optional[ClientSession] = None
self.available_tools = []
@asynccontextmanager
async def connect_to_mcp(self, server_script_path: str):
"""Connect to MCP server via stdio"""
server_params = StdioServerParameters(
command="python",
args=[server_script_path],
env=None
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
self.mcp_session = session
# List available tools
tools_list = await session.list_tools()
print(f"\nConnected to MCP server. Available tools:")
for tool in tools_list.tools:
print(f" - {tool.name}: {tool.description}")
self.available_tools.append(tool)
yield session
def mcp_tools_to_openai_format(self):
"""Convert MCP tools to OpenAI function calling format"""
openai_tools = []
for tool in self.available_tools:
# Convert MCP tool schema to OpenAI format
tool_def = {
"type": "function",
"function": {
"name": tool.name,
"description": tool.description or "",
"parameters": tool.inputSchema if tool.inputSchema else {
"type": "object",
"properties": {},
"required": []
}
}
}
openai_tools.append(tool_def)
return openai_tools
async def call_mcp_tool(self, tool_name: str, arguments: dict):
"""Call an MCP tool and return the result"""
if not self.mcp_session:
raise RuntimeError("MCP session not initialized")
result = await self.mcp_session.call_tool(tool_name, arguments)
return result
async def chat(self, user_message: str, max_iterations: int = 5):
"""
Run a chat interaction with the model, handling tool calls via MCP
Args:
user_message: The user's question/message
max_iterations: Maximum number of turns to prevent infinite loops
"""
messages = [
{
"role": "system",
"content": "You are a helpful assistant that can get weather information for cities. When asked about weather, use the get_weather tool."
},
{
"role": "user",
"content": user_message
}
]
tools = self.mcp_tools_to_openai_format()
print(f"\nUser: {user_message}\n")
for iteration in range(max_iterations):
# Call the model
response = self.openai_client.chat.completions.create(
model=self.model,
messages=messages,
tools=tools if tools else None,
tool_choice="auto" if tools else None
)
assistant_message = response.choices[0].message
# Add assistant response to messages
messages.append({
"role": "assistant",
"content": assistant_message.content,
"tool_calls": [
{
"id": tc.id,
"type": tc.type,
"function": {
"name": tc.function.name,
"arguments": tc.function.arguments
}
}
for tc in (assistant_message.tool_calls or [])
]
})
# Check if model wants to call tools
if assistant_message.tool_calls:
print(f"Assistant wants to call {len(assistant_message.tool_calls)} tool(s):\n")
# Process each tool call
for tool_call in assistant_message.tool_calls:
function_name = tool_call.function.name
function_args = json.loads(tool_call.function.arguments)
print(f" Calling tool: {function_name}")
print(f" Arguments: {function_args}")
# Call the MCP tool
mcp_result = await self.call_mcp_tool(function_name, function_args)
# Extract content from MCP result
if mcp_result.content:
# Handle different content types
result_text = ""
for content in mcp_result.content:
if hasattr(content, 'text'):
result_text += content.text
else:
result_text += str(content)
print(f" Result: {result_text}\n")
# Add tool result to messages
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": result_text
})
else:
print(f" Result: No content returned\n")
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": "No result"
})
# Continue the loop to get final response
continue
# No tool calls, this is the final response
if assistant_message.content:
print(f"Assistant: {assistant_message.content}\n")
return assistant_message.content
else:
print("Assistant: (no response)")
return None
print("\nReached maximum iterations")
return None
async def main():
"""Main function to run the client"""
import sys
# Get the MCP server script path
script_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(script_dir)
mcp_server_path = os.path.join(project_root, "mcp-server", "weather_server.py")
if not os.path.exists(mcp_server_path):
print(f"Error: MCP server script not found at {mcp_server_path}")
sys.exit(1)
# Create client
client = MCPWeatherClient()
# Connect to MCP server and run chat
async with client.connect_to_mcp(mcp_server_path):
# Example question about weather
question = "What's the weather like in Paris?"
if len(sys.argv) > 1:
question = " ".join(sys.argv[1:])
await client.chat(question)
if __name__ == "__main__":
asyncio.run(main())