Databricks 上的模型上下文协议 (MCP)

本页介绍如何在 Databricks 上使用 MCP 。 MCP 是一种开源标准,可将 AI 代理连接到工具、资源、提示和其他上下文信息。

MCP 的主要好处是标准化。 可以创建一次工具并将其与任何代理一起使用,无论是你构建的代理还是第三方代理。 同样,你可以使用其他人开发的工具,无论是从团队开发,还是从组织外部开发。

Databricks 提供以下 MCP 选项:

  • 托管 MCP 服务器:Databricks 具有现成的服务器,可让代理查询 Unity 目录中的数据和访问工具。 始终强制实施 Unity 目录权限,因此代理和用户只能访问他们允许的工具和数据。

  • 自定义 MCP 服务器:安全地将自己的 MCP 服务器作为 Databricks 应用 托管,以自带服务器或运行第三方 MCP 服务器。

托管 MCP 服务器

重要

此功能在 Beta 版中。

Databricks 提供以下托管 MCP 服务器,以简化将代理连接到企业数据。 这些服务器现开即用,由 Databricks 托管和维护:

MCP 服务器 DESCRIPTION 网址
矢量搜索 允许代理在指定的 Unity 目录架构中查询 Databricks 矢量搜索索引 https://<your-workspace-hostname>/api/2.0/mcp/vector-search/{catalog_name}/{schema_name}
Unity Catalog 功能 允许代理在指定的 Unity 目录架构中运行 Unity 目录函数 https://<your-workspace-hostname>/api/2.0/mcp/functions/{catalog_name}/{schema_name}
精灵空间 允许代理查询指定的 Genie 空间 ,从结构化数据(Unity 目录中的表)获取见解 https://<your-workspace-hostname>/api/2.0/mcp/genie/{genie_space_id}

可以提供代理多个服务器 URL 以连接到多个工具和数据。 例如,可以提供以下 URL 来让代理查询客户支持票证、计费表和运行与计费相关的函数:

  • https://<your-workspace-hostname>/api/2.0/mcp/vector-search/prod/customer_support

    • 允许代理在架构中使用 prod.customer_support 矢量搜索索引搜索非结构化数据
  • https://<your-workspace-hostname>/api/2.0/mcp/genie/{genie_space_id}

    • 让您的代理使用连接到prod.billing架构的Genie空间搜索结构化数据。
  • https://<your-workspace-hostname>/api/2.0/mcp/functions/prod/billing

    • 允许您的代理运行在 prod.billing 中受管理的 Unity Catalog 函数(自定义 Python 或 SQL 数据检索 UDF)

使用 Databricks 应用托管 MCP 服务器

还可以将自己的自定义或第三方 MCP 服务器作为 Databricks 应用托管。 如果已有想要在组织中部署和与他人共享的 MCP 服务器,或者想要将第三方 MCP 服务器作为工具源运行,则这非常有用。

作为 Databricks 应用托管的 MCP 服务器必须实现与 HTTP 兼容的传输,例如 可流式传输

小窍门

有关编写自己的 MCP 服务器并将其部署为 Databricks 应用的示例,请参阅 自定义 MCP 服务器存储库

若要将现有 Python MCP 服务器托管为 Databricks 应用,请执行以下步骤:

  1. 将 a requirements.txt 添加到服务器的根目录,并为服务器指定 Python 依赖项。

    Python MCP 服务器通常使用 uv 进行包管理。 如果使用 uv,添加 uv 将处理安装额外的依赖项。

  2. 添加一个app.yaml,指定要运行服务器的CLI命令。

    默认情况下,Databricks 应用程序在端口 8000 上侦听。 如果您的服务器侦听不同的端口,请在文件中使用app.yaml进行设置。

    示例 app.yaml

    command: [
        'uv',
        'run',
        'your-server-name',
        ..., # optionally include additional parameters here
      ]
    
  3. 创建用于托管 MCP 服务器的 Databricks 应用:

    databricks apps create mcp-my-custom-server
    
  4. 通过从包含 app.yaml 文件的目录运行以下命令,将源代码上传到 Databricks 并部署应用:

    DATABRICKS_USERNAME=$(databricks current-user me | jq -r .userName)
    databricks sync . "/Users/$DATABRICKS_USERNAME/mcp-my-custom-server"
    databricks apps deploy mcp-my-custom-server --source-code-path "/Workspace/Users/$DATABRICKS_USERNAME/mcp-my-custom-server"
    

使用 MCP 生成代理

本部分介绍如何编写连接到 Databricks 上的 MCP 服务器的自定义代码代理。

重要

必须注册托管 MCP 服务器 beta 版 才能使用以下代码片段。

要连接到 Databricks 上的 MCP 服务器,其过程与连接其他远程 MCP 服务器类似。 可以使用标准 SDK(如 MCP Python SDK)连接到服务器。 主要区别是,默认情况下 Databricks MCP 服务器是安全的,并要求客户端指定身份验证。 databricks-mcp Python 库有助于简化自定义代理代码中的身份验证。

开发代理代码的最简单方法是在本地运行它,并向工作区进行身份验证。 使用以下步骤生成连接到 Databricks MCP 服务器的 AI 代理:

  1. 使用 OAuth 向工作区进行身份验证。 在本地终端中运行以下命令:

    databricks auth login --host https://<your-workspace-hostname>
    
  2. 确保具有 Python 3.12 或更高版本的本地环境,然后安装依赖项:

    pip install -U databricks-mcp "mcp>=1.9" "databricks-sdk[openai]" "mlflow>=3.1.0" "databricks-agents>=1.0.0"
    
  3. 运行以下代码片段来验证与 MCP 服务器的连接。 代码片段列出了 Unity 目录工具,然后执行 内置的 Python 代码解释器工具。 必须在工作区中启用无服务器计算才能运行此代码片段。

    import asyncio
    
    from mcp.client.streamable_http import streamablehttp_client
    from mcp.client.session import ClientSession
    from databricks_mcp import DatabricksOAuthClientProvider
    from databricks.sdk import WorkspaceClient
    
    # TODO: Update to the Databricks CLI profile name you specified when
    # configuring authentication to the workspace.
    databricks_cli_profile = "YOUR_DATABRICKS_CLI_PROFILE"
    assert (
        databricks_cli_profile != "YOUR_DATABRICKS_CLI_PROFILE"
    ), "Set databricks_cli_profile to the Databricks CLI profile name you specified when configuring authentication to the workspace"
    workspace_client = WorkspaceClient(profile=databricks_cli_profile)
    workspace_hostname = workspace_client.config.host
    mcp_server_url = f"{workspace_hostname}/api/2.0/mcp/functions/system/ai"
    
    
    # This snippet below uses the Unity Catalog functions MCP server to expose built-in
    # AI tools under `system.ai`, like the `system.ai.python_exec` code interpreter tool
    async def test_connect_to_server():
        async with streamablehttp_client(
            f"{mcp_server_url}", auth=DatabricksOAuthClientProvider(workspace_client)
        ) as (read_stream, write_stream, _), ClientSession(
            read_stream, write_stream
        ) as session:
            # List and call tools from the MCP server
            await session.initialize()
            tools = await session.list_tools()
            print(
                f"Discovered tools {[t.name for t in tools.tools]} "
                f"from MCP server {mcp_server_url}"
            )
            result = await session.call_tool(
                "system__ai__python_exec", {"code": "print('Hello, world!')"}
            )
            print(
                f"Called system__ai__python_exec tool and got result " f"{result.content}"
            )
    
    
    if __name__ == "__main__":
        asyncio.run(test_connect_to_server())
    
  4. 可以在上面的代码片段上构建,以定义使用工具的基本单轮代理。 将代理代码本地保存为命名 mcp_agent.py 文件,以便在后续部分中启用部署它:

    from contextlib import asynccontextmanager
    import json
    import uuid
    import asyncio
    from typing import Any, Callable, List
    from pydantic import BaseModel
    
    import mlflow
    from mlflow.pyfunc import ResponsesAgent
    from mlflow.types.responses import ResponsesAgentRequest, ResponsesAgentResponse
    
    from databricks_mcp import DatabricksOAuthClientProvider
    from databricks.sdk import WorkspaceClient
    from mcp.client.session import ClientSession
    from mcp.client.streamable_http import streamablehttp_client
    
    # 1) CONFIGURE YOUR ENDPOINTS/PROFILE
    LLM_ENDPOINT_NAME = "databricks-claude-3-7-sonnet"
    SYSTEM_PROMPT = "You are a helpful assistant."
    DATABRICKS_CLI_PROFILE = "YOUR_DATABRICKS_CLI_PROFILE"
    assert (
        DATABRICKS_CLI_PROFILE != "YOUR_DATABRICKS_CLI_PROFILE"
    ), "Set DATABRICKS_CLI_PROFILE to the Databricks CLI profile name you specified when configuring authentication to the workspace"
    workspace_client = WorkspaceClient(profile=DATABRICKS_CLI_PROFILE)
    host = workspace_client.config.host
    # Add more MCP server URLs here if desired, e.g
    # f"{host}/api/2.0/mcp/vector-search/prod/billing"
    # to include vector search indexes under the prod.billing schema, or
    # f"{host}/api/2.0/mcp/genie/<genie_space_id>"
    # to include a Genie space
    MCP_SERVER_URLS = [
        f"{host}/api/2.0/mcp/functions/system/ai",
    ]
    
    
    # 2) HELPER: convert between ResponsesAgent “message dict” and ChatCompletions format
    def _to_chat_messages(msg: dict[str, Any]) -> List[dict]:
        """
        Take a single ResponsesAgent‐style dict and turn it into one or more
        ChatCompletions‐compatible dict entries.
        """
        msg_type = msg.get("type")
        if msg_type == "function_call":
            return [
                {
                    "role": "assistant",
                    "content": None,
                    "tool_calls": [
                        {
                            "id": msg["call_id"],
                            "type": "function",
                            "function": {
                                "name": msg["name"],
                                "arguments": msg["arguments"],
                            },
                        }
                    ],
                }
            ]
        elif msg_type == "message" and isinstance(msg["content"], list):
            return [
                {
                    "role": "assistant" if msg["role"] == "assistant" else msg["role"],
                    "content": content["text"],
                }
                for content in msg["content"]
            ]
        elif msg_type == "function_call_output":
            return [
                {
                    "role": "tool",
                    "content": msg["output"],
                    "tool_call_id": msg["tool_call_id"],
                }
            ]
        else:
            # fallback for plain {"role": ..., "content": "..."} or similar
            return [
                {
                    k: v
                    for k, v in msg.items()
                    if k in ("role", "content", "name", "tool_calls", "tool_call_id")
                }
            ]
    
    
    # 3) “MCP SESSION” + TOOL‐INVOCATION LOGIC
    @asynccontextmanager
    async def _mcp_session(server_url: str, ws: WorkspaceClient):
        async with streamablehttp_client(
            url=server_url, auth=DatabricksOAuthClientProvider(ws)
        ) as (reader, writer, _):
            async with ClientSession(reader, writer) as session:
                await session.initialize()
                yield session
    
    
    def _list_tools(server_url: str, ws: WorkspaceClient):
        async def inner():
            async with _mcp_session(server_url, ws) as sess:
                return await sess.list_tools()
    
        return asyncio.run(inner())
    
    
    def _make_exec_fn(
        server_url: str, tool_name: str, ws: WorkspaceClient
    ) -> Callable[..., str]:
        def exec_fn(**kwargs):
            async def call_it():
                async with _mcp_session(server_url, ws) as sess:
                    resp = await sess.call_tool(name=tool_name, arguments=kwargs)
                    return "".join([c.text for c in resp.content])
    
            return asyncio.run(call_it())
    
        return exec_fn
    
    
    class ToolInfo(BaseModel):
        name: str
        spec: dict
        exec_fn: Callable
    
    
    def _fetch_tool_infos(ws: WorkspaceClient, server_url: str) -> List[ToolInfo]:
        print(f"Listing tools from MCP server {server_url}")
        infos: List[ToolInfo] = []
        mcp_tools = _list_tools(server_url, ws).tools
        for t in mcp_tools:
            schema = t.inputSchema.copy()
            if "properties" not in schema:
                schema["properties"] = {}
            spec = {
                "type": "function",
                "function": {
                    "name": t.name,
                    "description": t.description,
                    "parameters": schema,
                },
            }
            infos.append(
                ToolInfo(
                    name=t.name, spec=spec, exec_fn=_make_exec_fn(server_url, t.name, ws)
                )
            )
        return infos
    
    
    # 4) “SINGLE‐TURN” AGENT CLASS
    class SingleTurnMCPAgent(ResponsesAgent):
        def _call_llm(self, history: List[dict], ws: WorkspaceClient, tool_infos):
            """
            Send current history → LLM, returning the raw response dict.
            """
            client = ws.serving_endpoints.get_open_ai_client()
            flat_msgs = []
            for msg in history:
                flat_msgs.extend(_to_chat_messages(msg))
    
            return client.chat.completions.create(
                model=LLM_ENDPOINT_NAME,
                messages=flat_msgs,
                tools=[ti.spec for ti in tool_infos],
            )
    
        def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
            ws = WorkspaceClient(profile=DATABRICKS_CLI_PROFILE)
    
            # 1) build initial history: system + user
            history: List[dict] = [{"role": "system", "content": SYSTEM_PROMPT}]
            for inp in request.input:
                history.append(inp.model_dump())
    
            # 2) call LLM once
            tool_infos = [
                tool_info
                for mcp_server_url in MCP_SERVER_URLS
                for tool_info in _fetch_tool_infos(ws, mcp_server_url)
            ]
            tools_dict = {tool_info.name: tool_info for tool_info in tool_infos}
            llm_resp = self._call_llm(history, ws, tool_infos)
            raw_choice = llm_resp.choices[0].message.to_dict()
            raw_choice["id"] = uuid.uuid4().hex
            history.append(raw_choice)
    
            tool_calls = raw_choice.get("tool_calls") or []
            if tool_calls:
                # (we only support a single tool in this “single‐turn” example)
                fc = tool_calls[0]
                name = fc["function"]["name"]
                args = json.loads(fc["function"]["arguments"])
                try:
                    tool_info = tools_dict[name]
                    result = tool_info.exec_fn(**args)
                except Exception as e:
                    result = f"Error invoking {name}: {e}"
    
                # 4) append the “tool” output
                history.append(
                    {
                        "type": "function_call_output",
                        "role": "tool",
                        "id": uuid.uuid4().hex,
                        "tool_call_id": fc["id"],
                        "output": result,
                    }
                )
    
                # 5) call LLM a second time and treat that reply as final
                followup = (
                    self._call_llm(history, ws, tool_infos=[]).choices[0].message.to_dict()
                )
                followup["id"] = uuid.uuid4().hex
    
                assistant_text = followup.get("content", "")
                return ResponsesAgentResponse(
                    output=[
                        {
                            "id": uuid.uuid4().hex,
                            "type": "message",
                            "role": "assistant",
                            "content": [{"type": "output_text", "text": assistant_text}],
                        }
                    ],
                    custom_outputs=request.custom_inputs,
                )
    
            # 6) if no tool_calls at all, return the assistant’s original reply
            assistant_text = raw_choice.get("content", "")
            return ResponsesAgentResponse(
                output=[
                    {
                        "id": uuid.uuid4().hex,
                        "type": "message",
                        "role": "assistant",
                        "content": [{"type": "output_text", "text": assistant_text}],
                    }
                ],
                custom_outputs=request.custom_inputs,
            )
    
    
    mlflow.models.set_model(SingleTurnMCPAgent())
    
    if __name__ == "__main__":
        req = ResponsesAgentRequest(
            input=[{"role": "user", "content": "What's the 100th Fibonacci number?"}]
        )
        resp = SingleTurnMCPAgent().predict(req)
        for item in resp.output:
            print(item)
    

使用 MCP 部署代理

准备好部署连接到 MCP 服务器的代理时,请使用 标准代理部署过程

请确保 指定代理在日志记录时需要访问的所有资源。 例如,如果代理使用以下 MCP 服务器 URL:

  • https://<your-workspace-hostname>/api/2.0/mcp/vector-search/prod/customer_support
  • https://<your-workspace-hostname>/api/2.0/mcp/vector-search/prod/billing
  • https://<your-workspace-hostname>/api/2.0/mcp/functions/prod/billing

您必须在 prod.customer_supportprod.billing 模式中指定代理需要的所有矢量搜索索引作为资源,还需要在 prod.billing 中列出所有 Unity Catalog 函数。

例如,若要部署上面定义的代理,可以运行以下代码片段,假设你已将代理代码定义保存到以下代码片段中 mcp_agent.py

import os
from databricks.sdk import WorkspaceClient
from databricks import agents
import mlflow
from mlflow.models.resources import DatabricksFunction, DatabricksServingEndpoint, DatabricksVectorSearchIndex
from mcp_agent import LLM_ENDPOINT_NAME

# TODO: Update this to your Databricks CLI profile name
databricks_cli_profile = "YOUR_DATABRICKS_CLI_PROFILE"
assert (
        databricks_cli_profile != "YOUR_DATABRICKS_CLI_PROFILE"
), "Set databricks_cli_profile to the Databricks CLI profile name you specified when configuring authentication to the workspace"
workspace_client = WorkspaceClient(profile=databricks_cli_profile)


# Configure MLflow and the Databricks SDK to use your Databricks CLI profile
current_user = workspace_client.current_user.me().user_name
mlflow.set_tracking_uri(f"databricks://{databricks_cli_profile}")
mlflow.set_registry_uri(f"databricks-uc://{databricks_cli_profile}")
mlflow.set_experiment(f"/Users/{current_user}/databricks_docs_example_mcp_agent")
os.environ["DATABRICKS_CONFIG_PROFILE"] = databricks_cli_profile

# Log the agent defined in mcp_agent.py
here = os.path.dirname(os.path.abspath(__file__))
agent_script = os.path.join(here, "mcp_agent.py")
resources = [
    DatabricksServingEndpoint(endpoint_name=LLM_ENDPOINT_NAME),
    DatabricksFunction("system.ai.python_exec"),
    # --- Uncomment and edit the following lines to specify vector search indices and additional UC functions ---
    # --- if referenced via the MCP_SERVER_URLS in your agent code ---
    # DatabricksVectorSearchIndex(index_name="prod.customer_support.my_index"),
    # DatabricksVectorSearchIndex(index_name="prod.billing.another_index"),
    # DatabricksFunction("prod.billing.my_custom_function"),
    # DatabricksFunction("prod.billing.another_function"),
]

with mlflow.start_run():
    logged_model_info = mlflow.pyfunc.log_model(
        artifact_path="mcp_agent",
        python_model=agent_script,
        resources=resources,
    )

# TODO Specify your UC model name here
UC_MODEL_NAME = "main.default.databricks_docs_mcp_agent"
registered_model = mlflow.register_model(logged_model_info.model_uri, UC_MODEL_NAME)

agents.deploy(
    model_name=UC_MODEL_NAME,
    model_version=registered_model.version,
)

计算定价

托管 MCP 服务器的计算定价取决于 MCP 工作负载:

自定义 MCP 服务器受 Databricks Apps 定价的约束。