次の方法で共有


Databricks のモデル コンテキスト プロトコル (MCP)

このページでは、Databricks で MCP を使用する方法について説明します。 MCP は、AI エージェントをツール、リソース、プロンプト、およびその他のコンテキスト情報に接続するオープン ソース標準です。

MCP の主な利点は標準化です。 ツールを一度作成すれば、自分で作ったエージェントでも、サードパーティのエージェントでも、いかなるエージェントでも使用できます。 同様に、チームまたは組織外から、他のユーザーが開発したツールを使用できます。

Databricks には、次の MCP オプションが用意されています。

  • マネージド MCP サーバー: Databricks には、エージェントが Unity カタログ内のデータとアクセス ツールに対してクエリを実行できる、すぐに使用できるサーバーがあります。 Unity カタログのアクセス許可は常に適用されるため、エージェントとユーザーは、許可されているツールとデータにのみアクセスできます。

  • カスタム MCP サーバー: 独自の MCP サーバーを Databricks アプリ として安全にホストし、独自のサーバーを持ち込んだり、サードパーティの MCP サーバーを実行したりします。

マネージド MCP サーバー

Von Bedeutung

この機能は ベータ版です

Databricks には、エージェントからエンタープライズ データへの接続を簡略化するために、次のマネージド MCP サーバーが用意されています。 これらのサーバーは、すぐに使用でき、Databricks によってホストおよび管理されます。

MCP サーバー 説明 URL
ベクトル検索 エージェントが、指定された Unity カタログ スキーマ内の Databricks Vector Search インデックス に対してクエリを実行できるようにします。 https://<your-workspace-hostname>/api/2.0/mcp/vector-search/{catalog_name}/{schema_name}
Unity カタログ関数 エージェントが指定された Unity カタログ スキーマで Unity カタログ関数 を実行できるようにします。 https://<your-workspace-hostname>/api/2.0/mcp/functions/{catalog_name}/{schema_name}
ジーニー空間 エージェントが指定された Genie 空間 にクエリを実行して、構造化データ (Unity カタログのテーブル) から分析情報を取得できるようにします https://<your-workspace-hostname>/api/2.0/mcp/genie/{genie_space_id}

エージェントに複数のサーバー URL を指定して、複数のツールとデータに接続できます。 たとえば、次の URL を指定して、エージェントがカスタマー サポート チケット、課金テーブルにクエリを実行し、課金関連の関数を実行できるようにします。

  • https://<your-workspace-hostname>/api/2.0/mcp/vector-search/prod/customer_support

    • prod.customer_support スキーマのベクター検索インデックスを使用して、エージェントが非構造化データを検索できるようにします
  • https://<your-workspace-hostname>/api/2.0/mcp/genie/{genie_space_id}

    • エージェントが、 prod.billing スキーマに接続されている Genie 空間を使用して構造化データを検索できるようにします
  • https://<your-workspace-hostname>/api/2.0/mcp/functions/prod/billing

    • で管理されている Unity カタログ関数 (カスタム Python または SQL データ取得 UDF) をエージェントで実行できます。 prod.billing

Databricks アプリを使用して MCP サーバーをホストする

独自のカスタムまたはサードパーティの MCP サーバーを Databricks アプリとしてホストすることもできます。 これは、組織内の他のユーザーと展開して共有する MCP サーバーが既にある場合や、ツールのソースとしてサードパーティの MCP サーバーを実行する場合に便利です。

Databricks アプリとしてホストされる MCP サーバーは、 ストリーミング可能な HTTP トランスポートなどの HTTP 互換トランスポートを実装する必要があります。

ヒント

独自の MCP サーバー を記述し、Databricks アプリとしてデプロイする例については、カスタム MCP サーバー リポジトリを参照してください。

既存の Python MCP サーバーを Databricks アプリとしてホストするには、次の手順に従います。

  1. サーバーのルート ディレクトリに requirements.txt を追加し、サーバーの Python 依存関係を指定します。

    Python MCP サーバーでは、多くの場合、パッケージ管理に uv が使用されます。 uvを使用する場合は、uvを追加すると、追加の依存関係のインストールが処理されます。

  2. サーバーを実行する CLI コマンドを指定する app.yaml を追加します。

    既定では、Databricks アプリはポート 8000 でリッスンします。 サーバーが別のポートでリッスンしている場合は、 ファイルのapp.yamlを使用して設定します。

    app.yaml例:

    command: [
        'uv',
        'run',
        'your-server-name',
        ..., # optionally include additional parameters here
      ]
    
  3. MCP サーバーをホストする Databricks アプリを作成します。

    databricks apps create mcp-my-custom-server
    
  4. ソース コードを Databricks にアップロードし、 app.yaml ファイルを含むディレクトリから次のコマンドを実行してアプリをデプロイします。

    DATABRICKS_USERNAME=$(databricks current-user me | jq -r .userName)
    databricks sync . "/Users/$DATABRICKS_USERNAME/mcp-my-custom-server"
    databricks apps deploy mcp-my-custom-server --source-code-path "/Workspace/Users/$DATABRICKS_USERNAME/mcp-my-custom-server"
    

MCP を使用してエージェントをビルドする

このセクションでは、Databricks 上の MCP サーバーに接続するカスタム コード エージェントを記述する方法について説明します。

Von Bedeutung

次のコード スニペットを使用するには、マネージド MCP サーバー ベータ に登録する必要があります。

Databricks 上の MCP サーバーへの接続は、他のリモート MCP サーバーと似ています。 MCP Python SDK などの標準 SDK を使用してサーバーに接続できます。 主な違いは、Databricks MCP サーバーは既定でセキュリティで保護されており、クライアントに認証を指定する必要がある点です。 databricks-mcp Python ライブラリは、カスタム エージェント コードでの認証を簡略化するのに役立ちます。

エージェント コードを開発する最も簡単な方法は、エージェント コードをローカルで実行し、ワークスペースに対して認証することです。 Databricks MCP サーバーに接続する AI エージェントを構築するには、次の手順に従います。

  1. OAuth を使用してワークスペースに対する認証を行います。 ローカル ターミナルで次を実行します。

    databricks auth login --host https://<your-workspace-hostname>
    
  2. Python 3.12 以上のローカル環境があることを確認してから、依存関係をインストールします。

    pip install -U databricks-mcp "mcp>=1.9" "databricks-sdk[openai]" "mlflow>=3.1.0" "databricks-agents>=1.0.0"
    
  3. 次のスニペットを実行して、MCP サーバーへの接続を検証します。 このスニペットでは、Unity カタログ ツールが一覧表示され、 組み込みの Python コード インタープリター ツールが実行されます。 この snipet を実行するには、ワークスペースでサーバーレス コンピューティングを有効にする必要があります

    import asyncio
    
    from mcp.client.streamable_http import streamablehttp_client
    from mcp.client.session import ClientSession
    from databricks_mcp import DatabricksOAuthClientProvider
    from databricks.sdk import WorkspaceClient
    
    # TODO: Update to the Databricks CLI profile name you specified when
    # configuring authentication to the workspace.
    databricks_cli_profile = "YOUR_DATABRICKS_CLI_PROFILE"
    assert (
        databricks_cli_profile != "YOUR_DATABRICKS_CLI_PROFILE"
    ), "Set databricks_cli_profile to the Databricks CLI profile name you specified when configuring authentication to the workspace"
    workspace_client = WorkspaceClient(profile=databricks_cli_profile)
    workspace_hostname = workspace_client.config.host
    mcp_server_url = f"{workspace_hostname}/api/2.0/mcp/functions/system/ai"
    
    
    # This snippet below uses the Unity Catalog functions MCP server to expose built-in
    # AI tools under `system.ai`, like the `system.ai.python_exec` code interpreter tool
    async def test_connect_to_server():
        async with streamablehttp_client(
            f"{mcp_server_url}", auth=DatabricksOAuthClientProvider(workspace_client)
        ) as (read_stream, write_stream, _), ClientSession(
            read_stream, write_stream
        ) as session:
            # List and call tools from the MCP server
            await session.initialize()
            tools = await session.list_tools()
            print(
                f"Discovered tools {[t.name for t in tools.tools]} "
                f"from MCP server {mcp_server_url}"
            )
            result = await session.call_tool(
                "system__ai__python_exec", {"code": "print('Hello, world!')"}
            )
            print(
                f"Called system__ai__python_exec tool and got result " f"{result.content}"
            )
    
    
    if __name__ == "__main__":
        asyncio.run(test_connect_to_server())
    
  4. 上記のスニペットを基にして、ツールを使用する基本的なシングルターン エージェントを定義できます。 エージェント コードを mcp_agent.py という名前のファイルとしてローカルに保存して、以降のセクションでデプロイできるようにします。

    from contextlib import asynccontextmanager
    import json
    import uuid
    import asyncio
    from typing import Any, Callable, List
    from pydantic import BaseModel
    
    import mlflow
    from mlflow.pyfunc import ResponsesAgent
    from mlflow.types.responses import ResponsesAgentRequest, ResponsesAgentResponse
    
    from databricks_mcp import DatabricksOAuthClientProvider
    from databricks.sdk import WorkspaceClient
    from mcp.client.session import ClientSession
    from mcp.client.streamable_http import streamablehttp_client
    
    # 1) CONFIGURE YOUR ENDPOINTS/PROFILE
    LLM_ENDPOINT_NAME = "databricks-claude-3-7-sonnet"
    SYSTEM_PROMPT = "You are a helpful assistant."
    DATABRICKS_CLI_PROFILE = "YOUR_DATABRICKS_CLI_PROFILE"
    assert (
        DATABRICKS_CLI_PROFILE != "YOUR_DATABRICKS_CLI_PROFILE"
    ), "Set DATABRICKS_CLI_PROFILE to the Databricks CLI profile name you specified when configuring authentication to the workspace"
    workspace_client = WorkspaceClient(profile=DATABRICKS_CLI_PROFILE)
    host = workspace_client.config.host
    # Add more MCP server URLs here if desired, e.g
    # f"{host}/api/2.0/mcp/vector-search/prod/billing"
    # to include vector search indexes under the prod.billing schema, or
    # f"{host}/api/2.0/mcp/genie/<genie_space_id>"
    # to include a Genie space
    MCP_SERVER_URLS = [
        f"{host}/api/2.0/mcp/functions/system/ai",
    ]
    
    
    # 2) HELPER: convert between ResponsesAgent “message dict” and ChatCompletions format
    def _to_chat_messages(msg: dict[str, Any]) -> List[dict]:
        """
        Take a single ResponsesAgent‐style dict and turn it into one or more
        ChatCompletions‐compatible dict entries.
        """
        msg_type = msg.get("type")
        if msg_type == "function_call":
            return [
                {
                    "role": "assistant",
                    "content": None,
                    "tool_calls": [
                        {
                            "id": msg["call_id"],
                            "type": "function",
                            "function": {
                                "name": msg["name"],
                                "arguments": msg["arguments"],
                            },
                        }
                    ],
                }
            ]
        elif msg_type == "message" and isinstance(msg["content"], list):
            return [
                {
                    "role": "assistant" if msg["role"] == "assistant" else msg["role"],
                    "content": content["text"],
                }
                for content in msg["content"]
            ]
        elif msg_type == "function_call_output":
            return [
                {
                    "role": "tool",
                    "content": msg["output"],
                    "tool_call_id": msg["tool_call_id"],
                }
            ]
        else:
            # fallback for plain {"role": ..., "content": "..."} or similar
            return [
                {
                    k: v
                    for k, v in msg.items()
                    if k in ("role", "content", "name", "tool_calls", "tool_call_id")
                }
            ]
    
    
    # 3) “MCP SESSION” + TOOL‐INVOCATION LOGIC
    @asynccontextmanager
    async def _mcp_session(server_url: str, ws: WorkspaceClient):
        async with streamablehttp_client(
            url=server_url, auth=DatabricksOAuthClientProvider(ws)
        ) as (reader, writer, _):
            async with ClientSession(reader, writer) as session:
                await session.initialize()
                yield session
    
    
    def _list_tools(server_url: str, ws: WorkspaceClient):
        async def inner():
            async with _mcp_session(server_url, ws) as sess:
                return await sess.list_tools()
    
        return asyncio.run(inner())
    
    
    def _make_exec_fn(
        server_url: str, tool_name: str, ws: WorkspaceClient
    ) -> Callable[..., str]:
        def exec_fn(**kwargs):
            async def call_it():
                async with _mcp_session(server_url, ws) as sess:
                    resp = await sess.call_tool(name=tool_name, arguments=kwargs)
                    return "".join([c.text for c in resp.content])
    
            return asyncio.run(call_it())
    
        return exec_fn
    
    
    class ToolInfo(BaseModel):
        name: str
        spec: dict
        exec_fn: Callable
    
    
    def _fetch_tool_infos(ws: WorkspaceClient, server_url: str) -> List[ToolInfo]:
        print(f"Listing tools from MCP server {server_url}")
        infos: List[ToolInfo] = []
        mcp_tools = _list_tools(server_url, ws).tools
        for t in mcp_tools:
            schema = t.inputSchema.copy()
            if "properties" not in schema:
                schema["properties"] = {}
            spec = {
                "type": "function",
                "function": {
                    "name": t.name,
                    "description": t.description,
                    "parameters": schema,
                },
            }
            infos.append(
                ToolInfo(
                    name=t.name, spec=spec, exec_fn=_make_exec_fn(server_url, t.name, ws)
                )
            )
        return infos
    
    
    # 4) “SINGLE‐TURN” AGENT CLASS
    class SingleTurnMCPAgent(ResponsesAgent):
        def _call_llm(self, history: List[dict], ws: WorkspaceClient, tool_infos):
            """
            Send current history → LLM, returning the raw response dict.
            """
            client = ws.serving_endpoints.get_open_ai_client()
            flat_msgs = []
            for msg in history:
                flat_msgs.extend(_to_chat_messages(msg))
    
            return client.chat.completions.create(
                model=LLM_ENDPOINT_NAME,
                messages=flat_msgs,
                tools=[ti.spec for ti in tool_infos],
            )
    
        def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
            ws = WorkspaceClient(profile=DATABRICKS_CLI_PROFILE)
    
            # 1) build initial history: system + user
            history: List[dict] = [{"role": "system", "content": SYSTEM_PROMPT}]
            for inp in request.input:
                history.append(inp.model_dump())
    
            # 2) call LLM once
            tool_infos = [
                tool_info
                for mcp_server_url in MCP_SERVER_URLS
                for tool_info in _fetch_tool_infos(ws, mcp_server_url)
            ]
            tools_dict = {tool_info.name: tool_info for tool_info in tool_infos}
            llm_resp = self._call_llm(history, ws, tool_infos)
            raw_choice = llm_resp.choices[0].message.to_dict()
            raw_choice["id"] = uuid.uuid4().hex
            history.append(raw_choice)
    
            tool_calls = raw_choice.get("tool_calls") or []
            if tool_calls:
                # (we only support a single tool in this “single‐turn” example)
                fc = tool_calls[0]
                name = fc["function"]["name"]
                args = json.loads(fc["function"]["arguments"])
                try:
                    tool_info = tools_dict[name]
                    result = tool_info.exec_fn(**args)
                except Exception as e:
                    result = f"Error invoking {name}: {e}"
    
                # 4) append the “tool” output
                history.append(
                    {
                        "type": "function_call_output",
                        "role": "tool",
                        "id": uuid.uuid4().hex,
                        "tool_call_id": fc["id"],
                        "output": result,
                    }
                )
    
                # 5) call LLM a second time and treat that reply as final
                followup = (
                    self._call_llm(history, ws, tool_infos=[]).choices[0].message.to_dict()
                )
                followup["id"] = uuid.uuid4().hex
    
                assistant_text = followup.get("content", "")
                return ResponsesAgentResponse(
                    output=[
                        {
                            "id": uuid.uuid4().hex,
                            "type": "message",
                            "role": "assistant",
                            "content": [{"type": "output_text", "text": assistant_text}],
                        }
                    ],
                    custom_outputs=request.custom_inputs,
                )
    
            # 6) if no tool_calls at all, return the assistant’s original reply
            assistant_text = raw_choice.get("content", "")
            return ResponsesAgentResponse(
                output=[
                    {
                        "id": uuid.uuid4().hex,
                        "type": "message",
                        "role": "assistant",
                        "content": [{"type": "output_text", "text": assistant_text}],
                    }
                ],
                custom_outputs=request.custom_inputs,
            )
    
    
    mlflow.models.set_model(SingleTurnMCPAgent())
    
    if __name__ == "__main__":
        req = ResponsesAgentRequest(
            input=[{"role": "user", "content": "What's the 100th Fibonacci number?"}]
        )
        resp = SingleTurnMCPAgent().predict(req)
        for item in resp.output:
            print(item)
    

MCP を使用してエージェントをデプロイする

MCP サーバーに接続するエージェントをデプロイする準備ができたら、 標準のエージェント展開プロセスを使用します。

エージェントがログ記録時にアクセスする必要があるすべてのリソースを指定してください。 たとえば、エージェントが次の MCP サーバー URL を使用している場合です。

  • https://<your-workspace-hostname>/api/2.0/mcp/vector-search/prod/customer_support
  • https://<your-workspace-hostname>/api/2.0/mcp/vector-search/prod/billing
  • https://<your-workspace-hostname>/api/2.0/mcp/functions/prod/billing

エージェントが必要とするすべてのベクター検索インデックスをリソースとして prod.customer_support スキーマと prod.billing スキーマに指定し、 prod.billingのすべての Unity カタログ関数を指定する必要があります。

たとえば、上記で定義したエージェントをデプロイするには、エージェント コード定義を mcp_agent.pyに保存したと仮定して、次のスニペットを実行できます。

import os
from databricks.sdk import WorkspaceClient
from databricks import agents
import mlflow
from mlflow.models.resources import DatabricksFunction, DatabricksServingEndpoint, DatabricksVectorSearchIndex
from mcp_agent import LLM_ENDPOINT_NAME

# TODO: Update this to your Databricks CLI profile name
databricks_cli_profile = "YOUR_DATABRICKS_CLI_PROFILE"
assert (
        databricks_cli_profile != "YOUR_DATABRICKS_CLI_PROFILE"
), "Set databricks_cli_profile to the Databricks CLI profile name you specified when configuring authentication to the workspace"
workspace_client = WorkspaceClient(profile=databricks_cli_profile)


# Configure MLflow and the Databricks SDK to use your Databricks CLI profile
current_user = workspace_client.current_user.me().user_name
mlflow.set_tracking_uri(f"databricks://{databricks_cli_profile}")
mlflow.set_registry_uri(f"databricks-uc://{databricks_cli_profile}")
mlflow.set_experiment(f"/Users/{current_user}/databricks_docs_example_mcp_agent")
os.environ["DATABRICKS_CONFIG_PROFILE"] = databricks_cli_profile

# Log the agent defined in mcp_agent.py
here = os.path.dirname(os.path.abspath(__file__))
agent_script = os.path.join(here, "mcp_agent.py")
resources = [
    DatabricksServingEndpoint(endpoint_name=LLM_ENDPOINT_NAME),
    DatabricksFunction("system.ai.python_exec"),
    # --- Uncomment and edit the following lines to specify vector search indices and additional UC functions ---
    # --- if referenced via the MCP_SERVER_URLS in your agent code ---
    # DatabricksVectorSearchIndex(index_name="prod.customer_support.my_index"),
    # DatabricksVectorSearchIndex(index_name="prod.billing.another_index"),
    # DatabricksFunction("prod.billing.my_custom_function"),
    # DatabricksFunction("prod.billing.another_function"),
]

with mlflow.start_run():
    logged_model_info = mlflow.pyfunc.log_model(
        artifact_path="mcp_agent",
        python_model=agent_script,
        resources=resources,
    )

# TODO Specify your UC model name here
UC_MODEL_NAME = "main.default.databricks_docs_mcp_agent"
registered_model = mlflow.register_model(logged_model_info.model_uri, UC_MODEL_NAME)

agents.deploy(
    model_name=UC_MODEL_NAME,
    model_version=registered_model.version,
)

コンピューティング価格

マネージド MCP サーバーのコンピューティング価格は、MCP ワークロードによって異なります。

カスタム MCP サーバーには、 Databricks Apps の価格が適用されます