mirror of https://github.com/home-assistant/core
109 lines
3.7 KiB
Python
109 lines
3.7 KiB
Python
"""The Model Context Protocol Server implementation.
|
|
|
|
The Model Context Protocol python sdk defines a Server API that provides the
|
|
MCP message handling logic and error handling. The server implementation provided
|
|
here is independent of the lower level transport protocol.
|
|
|
|
See https://modelcontextprotocol.io/docs/concepts/architecture#implementation-example
|
|
"""
|
|
|
|
from collections.abc import Callable, Sequence
|
|
import json
|
|
import logging
|
|
from typing import Any
|
|
|
|
from mcp import types
|
|
from mcp.server import Server
|
|
import voluptuous as vol
|
|
from voluptuous_openapi import convert
|
|
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.exceptions import HomeAssistantError
|
|
from homeassistant.helpers import llm
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
def _format_tool(
|
|
tool: llm.Tool, custom_serializer: Callable[[Any], Any] | None
|
|
) -> types.Tool:
|
|
"""Format tool specification."""
|
|
input_schema = convert(tool.parameters, custom_serializer=custom_serializer)
|
|
return types.Tool(
|
|
name=tool.name,
|
|
description=tool.description or "",
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": input_schema["properties"],
|
|
},
|
|
)
|
|
|
|
|
|
async def create_server(
|
|
hass: HomeAssistant, llm_api_id: str, llm_context: llm.LLMContext
|
|
) -> Server:
|
|
"""Create a new Model Context Protocol Server.
|
|
|
|
A Model Context Protocol Server object is associated with a single session.
|
|
The MCP SDK handles the details of the protocol.
|
|
"""
|
|
|
|
server = Server("home-assistant")
|
|
|
|
@server.list_prompts() # type: ignore[no-untyped-call, misc]
|
|
async def handle_list_prompts() -> list[types.Prompt]:
|
|
llm_api = await llm.async_get_api(hass, llm_api_id, llm_context)
|
|
return [
|
|
types.Prompt(
|
|
name=llm_api.api.name,
|
|
description=f"Default prompt for the Home Assistant LLM API {llm_api.api.name}",
|
|
)
|
|
]
|
|
|
|
@server.get_prompt() # type: ignore[no-untyped-call, misc]
|
|
async def handle_get_prompt(
|
|
name: str, arguments: dict[str, str] | None
|
|
) -> types.GetPromptResult:
|
|
llm_api = await llm.async_get_api(hass, llm_api_id, llm_context)
|
|
if name != llm_api.api.name:
|
|
raise ValueError(f"Unknown prompt: {name}")
|
|
|
|
return types.GetPromptResult(
|
|
description=f"Default prompt for the Home Assistant LLM API {llm_api.api.name}",
|
|
messages=[
|
|
types.PromptMessage(
|
|
role="assistant",
|
|
content=types.TextContent(
|
|
type="text",
|
|
text=llm_api.api_prompt,
|
|
),
|
|
)
|
|
],
|
|
)
|
|
|
|
@server.list_tools() # type: ignore[no-untyped-call, misc]
|
|
async def list_tools() -> list[types.Tool]:
|
|
"""List available time tools."""
|
|
llm_api = await llm.async_get_api(hass, llm_api_id, llm_context)
|
|
return [_format_tool(tool, llm_api.custom_serializer) for tool in llm_api.tools]
|
|
|
|
@server.call_tool() # type: ignore[no-untyped-call, misc]
|
|
async def call_tool(name: str, arguments: dict) -> Sequence[types.TextContent]:
|
|
"""Handle calling tools."""
|
|
llm_api = await llm.async_get_api(hass, llm_api_id, llm_context)
|
|
tool_input = llm.ToolInput(tool_name=name, tool_args=arguments)
|
|
_LOGGER.debug("Tool call: %s(%s)", tool_input.tool_name, tool_input.tool_args)
|
|
|
|
try:
|
|
tool_response = await llm_api.async_call_tool(tool_input)
|
|
except (HomeAssistantError, vol.Invalid) as e:
|
|
raise HomeAssistantError(f"Error calling tool: {e}") from e
|
|
return [
|
|
types.TextContent(
|
|
type="text",
|
|
text=json.dumps(tool_response),
|
|
)
|
|
]
|
|
|
|
return server
|