Azure OpenAI 中的计算机使用(预览版)

通过本文了解如何在 Azure OpenAI 中使用 Computer Use。 计算机使用是一种专用 AI 工具,它使用专用模型,可以通过其 UI 与计算机系统和应用程序交互来执行任务。 利用计算机,你可以创建一个能够处理复杂任务并通过解释视觉元素来根据屏幕内容做出决策的代理。

计算机使用可提供:

  • 自治导航:例如,打开应用程序、单击按钮、填写表单和导航多页工作流。
  • 动态适应:解释 UI 更改并相应地调整动作。
  • 跨应用程序任务执行:跨基于 Web 的应用程序和桌面应用程序运行。
  • 自然语言界面:用户可以以纯语言描述任务,计算机使用模型确定要执行的正确的 UI 交互。

请求访问

若要访问 computer-use-preview 模型,需要注册,并根据Microsoft的资格条件授予访问权限。 有权访问其他受限访问模型的客户仍需要请求对此模型的访问权限。

请求访问:计算机使用预览版受限访问模型应用程序

授予访问权限后,需要为模型创建部署。

区域支持

计算机使用在以下区域中可用:

  • eastus2
  • swedencentral
  • southindia

使用Responses API向电脑使用模型发送API调用

计算机使用工具通过 响应 API 进行访问。 该工具在连续循环中运行,发送诸如输入文本或执行单击之类的操作。 代码在计算机上执行这些作,并将结果的屏幕截图发送到模型。

这样,代码将使用计算机界面模拟人工的作,而模型使用屏幕截图来了解环境状态并建议下一步作。

以下示例演示了基本的 API 调用。

注释

需要一个 Azure OpenAI 资源,并在computer-use-preview受支持的区域中部署模型。

若要发送请求,需要安装以下 Python 包。

pip install openai
pip install azure-identity
import os
from azure.identity import DefaultAzureCredential, get_bearer_token_provider
from openai import AzureOpenAI

#from openai import OpenAI
token_provider = get_bearer_token_provider(DefaultAzureCredential(), "https://cognitiveservices.azure.com/.default")

client = AzureOpenAI(  
  base_url = "https://YOUR-RESOURCE-NAME.openai.azure.com/openai/v1/",  
  azure_ad_token_provider=token_provider,
  api_version="preview"
)

response = client.responses.create(
    model="computer-use-preview", # set this to your model deployment name
    tools=[{
        "type": "computer_use_preview",
        "display_width": 1024,
        "display_height": 768,
        "environment": "browser" # other possible values: "mac", "windows", "ubuntu"
    }],
    input=[
        {
            "role": "user",
            "content": "Check the latest AI news on bing.com."
        }
    ],
    truncation="auto"
)

print(response.output)

输出

[
    ResponseComputerToolCall(
        id='cu_67d841873c1081908bfc88b90a8555e0', 
        action=ActionScreenshot(type='screenshot'), 
        call_id='call_wwEnfFDqQr1Z4Edk62Fyo7Nh', 
        pending_safety_checks=[], 
        status='completed', 
        type='computer_call'
    )
]

发送初始 API 请求后,您可以执行一个循环,在您的应用程序代码中执行指定的操作,并在每次迭代时发送一个屏幕截图,以便模型评估环境的更新状态。


## response.output is the previous response from the model
computer_calls = [item for item in response.output if item.type == "computer_call"]
if not computer_calls:
    print("No computer call found. Output from model:")
    for item in response.output:
        print(item)

computer_call = computer_calls[0]
last_call_id = computer_call.call_id
action = computer_call.action

# Your application would now perform the action suggested by the model
# And create a screenshot of the updated state of the environment before sending another response

response_2 = client.responses.create(
    model="computer-use-preview",
    previous_response_id=response.id,
    tools=[{
        "type": "computer_use_preview",
        "display_width": 1024,
        "display_height": 768,
        "environment": "browser" # other possible values: "mac", "windows", "ubuntu"
    }],
    input=[
        {
            "call_id": last_call_id,
            "type": "computer_call_output",
            "output": {
                "type": "input_image",
                # Image should be in base64
                "image_url": f"data:image/png;base64,{<base64_string>}"
            }
        }
    ],
    truncation="auto"
)

了解 Computer Use 集成

使用计算机使用工具时,通常会执行以下作以将其集成到应用程序中。

  1. 向模型发送请求,其中包括调用计算机使用工具,以及设置显示尺寸和环境。 还可以在第一个 API 请求中包含环境的初始状态的屏幕截图。
  2. 从模型接收响应。 如果响应包含 action 项,则这些内容包含建议的行动,以便向指定目标前进。 例如,操作可能是 screenshot,以便模型可以使用更新的屏幕截图来评估当前状态,或者通过 click 提供X/Y坐标来指示鼠标应移动的位置。
  3. 在计算机或浏览器环境中使用应用程序代码执行该作。
  4. 执行动作后,将环境的更新状态捕捉为屏幕截图。
  5. 发送具有更新状态的 computer_call_output 新请求,并重复这个过程,直到模型停止请求操作或者你决定停止。

处理对话历史记录

可以使用 previous_response_id 参数将当前请求链接到以前的响应。 如果不想管理聊天历史记录,建议使用此参数。

如果不使用此参数,应确保包含输入数组中上一请求的响应输出中返回的所有项。 这包括推理内容(如果存在)。

安全检查

API 具有安全检查来帮助防止提示注入和模型错误。 这些检查包括:

  • 恶意指令检测:系统将评估屏幕截图图像,并检查它是否包含可能会更改模型行为的对抗内容。
  • 无关域检测:系统评估 current_url (如果提供)并检查当前域是否被视为与会话历史记录相关的域。
  • 敏感域检测:系统检查 current_url (如果提供)并在检测到用户位于敏感域时引发警告。

如果触发了上述一个或多个检查,则当模型返回下一个 computer_call 时,将进行安全检查,其中包含 pending_safety_checks 参数。

"output": [
    {
        "type": "reasoning",
        "id": "rs_67cb...",
        "summary": [
            {
                "type": "summary_text",
                "text": "Exploring 'File' menu option."
            }
        ]
    },
    {
        "type": "computer_call",
        "id": "cu_67cb...",
        "call_id": "call_nEJ...",
        "action": {
            "type": "click",
            "button": "left",
            "x": 135,
            "y": 193
        },
        "pending_safety_checks": [
            {
                "id": "cu_sc_67cb...",
                "code": "malicious_instructions",
                "message": "We've detected instructions that may cause your application to perform malicious or unauthorized actions. Please acknowledge this warning if you'd like to proceed."
            }
        ],
        "status": "completed"
    }
]

在下一个请求中,你需要将该安全检查作为 acknowledged_safety_checks 传回,以便继续操作。

"input":[
        {
            "type": "computer_call_output",
            "call_id": "<call_id>",
            "acknowledged_safety_checks": [
                {
                    "id": "<safety_check_id>",
                    "code": "malicious_instructions",
                    "message": "We've detected instructions that may cause your application to perform malicious or unauthorized actions. Please acknowledge this warning if you'd like to proceed."
                }
            ],
            "output": {
                "type": "computer_screenshot",
                "image_url": "<image_url>"
            }
        }
    ],

安全检查处理流程

在所有返回 pending_safety_checks 的情况中,应将操作移交给终端用户,以确认模型行为和准确性。

  • malicious_instructionsirrelevant_domain:最终用户应查看模型作,并确认模型的行为是否按预期方式运行。
  • sensitive_domain:保证最终用户正在主动监视这些站点上的模型的动作。 此“监视模式”的确切实现可能因应用程序而异,但可能的示例可能是收集网站上的用户印象数据,以确保有活动的最终用户参与应用程序。

Playwright 集成

在本部分中,我们提供了一个简单的示例脚本,该脚本将 Azure OpenAI 的 computer-use-preview 模型与 Playwright 集成,以自动执行基本浏览器交互。 将模型与 Playwright 结合使用可让模型查看浏览器屏幕、做出决策,以及执行点击、输入和导航网站等操作。 运行此示例代码时,应谨慎行事。 此代码旨在在本地运行,但只能在测试环境中执行。 使用人工确认决策,且不要授予模型对敏感数据的访问权限。

与 Playwright 集成的计算机使用预览模型的动画 gif。

首先,需要安装适用于 Playwright 的 Python 库。

pip install playwright

安装软件包后,你还需要运行

playwright install

导入和配置

首先,导入必要的库并定义配置参数。 由于我们正在使用 asyncio,因此将在 Jupyter 笔记本之外执行此代码。 我们将先在区块中演练代码,然后演示如何使用该代码。

import os
import asyncio
import base64
from openai import AzureOpenAI
from azure.identity import DefaultAzureCredential, get_bearer_token_provider
from playwright.async_api import async_playwright, TimeoutError

token_provider = get_bearer_token_provider(
    DefaultAzureCredential(), "https://cognitiveservices.azure.com/.default"
)


# Configuration

BASE_URL = "https://YOUR-RESOURCE-NAME.openai.azure.com/openai/v1/"
MODEL = "computer-use-preview" # Set to model deployment name
DISPLAY_WIDTH = 1024
DISPLAY_HEIGHT = 768
API_VERSION = "preview" #Use this API version or later
ITERATIONS = 5 # Max number of iterations before returning control to human supervisor

浏览器交互的键映射

接下来,我们为模型可能需要传递给 Playwright 的特殊键设置映射。 最终,该模型永远不会执行操作本身,它会传递命令的表示形式,并且你必须提供最终集成层,这些集成层可以采用这些命令并在所选环境中执行它们。

此处并未详尽列出所有可能的键映射。 可以根据需要展开此列表。 此词典专用于将模型与 Playwright 集成。 如果将模型与备用库集成以提供对操作系统键盘/鼠标的 API 访问权限,则需要提供特定于该库的映射。

# Key mapping for special keys in Playwright
KEY_MAPPING = {
    "/": "Slash", "\\": "Backslash", "alt": "Alt", "arrowdown": "ArrowDown",
    "arrowleft": "ArrowLeft", "arrowright": "ArrowRight", "arrowup": "ArrowUp",
    "backspace": "Backspace", "ctrl": "Control", "delete": "Delete", 
    "enter": "Enter", "esc": "Escape", "shift": "Shift", "space": " ",
    "tab": "Tab", "win": "Meta", "cmd": "Meta", "super": "Meta", "option": "Alt"
}

此字典将用户友好的键名称转换为符合 Playwright 键盘 API 预期的格式。

坐标验证函数

为了确保从模型传递的任何鼠标作都保留在浏览器窗口边界内,我们将添加以下实用工具函数:

def validate_coordinates(x, y):
    """Ensure coordinates are within display bounds."""
    return max(0, min(x, DISPLAY_WIDTH)), max(0, min(y, DISPLAY_HEIGHT))

这个简单的实用工具尝试通过将坐标固定到窗口尺寸来防止超出边界的错误。

操作处理

浏览器自动化的核心是处理各种类型的用户交互并将其转换为浏览器中的操作的处理程序。

async def handle_action(page, action):
    """Handle different action types from the model."""
    action_type = action.type
    
    if action_type == "drag":
        print("Drag action is not supported in this implementation. Skipping.")
        return
        
    elif action_type == "click":
        button = getattr(action, "button", "left")
        # Validate coordinates
        x, y = validate_coordinates(action.x, action.y)
        
        print(f"\tAction: click at ({x}, {y}) with button '{button}'")
        
        if button == "back":
            await page.go_back()
        elif button == "forward":
            await page.go_forward()
        elif button == "wheel":
            await page.mouse.wheel(x, y)
        else:
            button_type = {"left": "left", "right": "right", "middle": "middle"}.get(button, "left")
            await page.mouse.click(x, y, button=button_type)
            try:
                await page.wait_for_load_state("domcontentloaded", timeout=3000)
            except TimeoutError:
                pass
        
    elif action_type == "double_click":
        # Validate coordinates
        x, y = validate_coordinates(action.x, action.y)
        
        print(f"\tAction: double click at ({x}, {y})")
        await page.mouse.dblclick(x, y)
        
    elif action_type == "scroll":
        scroll_x = getattr(action, "scroll_x", 0)
        scroll_y = getattr(action, "scroll_y", 0)
        # Validate coordinates
        x, y = validate_coordinates(action.x, action.y)
        
        print(f"\tAction: scroll at ({x}, {y}) with offsets ({scroll_x}, {scroll_y})")
        await page.mouse.move(x, y)
        await page.evaluate(f"window.scrollBy({{left: {scroll_x}, top: {scroll_y}, behavior: 'smooth'}});")
        
    elif action_type == "keypress":
        keys = getattr(action, "keys", [])
        print(f"\tAction: keypress {keys}")
        mapped_keys = [KEY_MAPPING.get(key.lower(), key) for key in keys]
        
        if len(mapped_keys) > 1:
            # For key combinations (like Ctrl+C)
            for key in mapped_keys:
                await page.keyboard.down(key)
            await asyncio.sleep(0.1)
            for key in reversed(mapped_keys):
                await page.keyboard.up(key)
        else:
            for key in mapped_keys:
                await page.keyboard.press(key)
                
    elif action_type == "type":
        text = getattr(action, "text", "")
        print(f"\tAction: type text: {text}")
        await page.keyboard.type(text, delay=20)
        
    elif action_type == "wait":
        ms = getattr(action, "ms", 1000)
        print(f"\tAction: wait {ms}ms")
        await asyncio.sleep(ms / 1000)
        
    elif action_type == "screenshot":
        print("\tAction: screenshot")
        
    else:
        print(f"\tUnrecognized action: {action_type}")

此函数尝试处理各种类型的操作。 我们需要在 computer-use-preview 将生成的命令和将执行操作的 Playwright 库之间进行转换。 有关详细信息,请参阅适用于 ComputerAction 的参考文档。

屏幕截图捕获

为了使模型能够查看它与其他模型交互的内容,需要一种方法来捕获屏幕截图。 对于此代码,我们使用 Playwright 捕获屏幕截图,我们将视图限制为仅浏览器窗口中的内容。 屏幕截图不包括 URL 栏或浏览器 GUI 的其他方面。 如果需要模型在主浏览器窗口外查看,可以通过创建自己的屏幕截图函数来增强模型。

async def take_screenshot(page):
    """Take a screenshot and return base64 encoding with caching for failures."""
    global last_successful_screenshot
    
    try:
        screenshot_bytes = await page.screenshot(full_page=False)
        last_successful_screenshot = base64.b64encode(screenshot_bytes).decode("utf-8")
        return last_successful_screenshot
    except Exception as e:
        print(f"Screenshot failed: {e}")
        print(f"Using cached screenshot from previous successful capture")
        if last_successful_screenshot:
            return last_successful_screenshot

此函数将当前浏览器状态捕获为图像,并将其作为 base64 编码的字符串返回,以可供发送到模型。 在每个步骤允许模型查看它尝试执行的命令是否成功后,我们会在循环中执行此作,然后允许它根据屏幕截图的内容进行调整。 我们可以让模型决定是否需要获取屏幕截图,但为简单起见,我们将强制为每个迭代执行屏幕截图。

模型响应处理

此函数会处理模型的响应并执行请求的操作:

async def process_model_response(client, response, page, max_iterations=ITERATIONS):
    """Process the model's response and execute actions."""
    for iteration in range(max_iterations):
        if not hasattr(response, 'output') or not response.output:
            print("No output from model.")
            break
        
        # Safely access response id
        response_id = getattr(response, 'id', 'unknown')
        print(f"\nIteration {iteration + 1} - Response ID: {response_id}\n")
        
        # Print text responses and reasoning
        for item in response.output:
            # Handle text output
            if hasattr(item, 'type') and item.type == "text":
                print(f"\nModel message: {item.text}\n")
                
            # Handle reasoning output
            if hasattr(item, 'type') and item.type == "reasoning":
                # Extract meaningful content from the reasoning
                meaningful_content = []
                
                if hasattr(item, 'summary') and item.summary:
                    for summary in item.summary:
                        # Handle different potential formats of summary content
                        if isinstance(summary, str) and summary.strip():
                            meaningful_content.append(summary)
                        elif hasattr(summary, 'text') and summary.text.strip():
                            meaningful_content.append(summary.text)
                
                # Only print reasoning section if there's actual content
                if meaningful_content:
                    print("=== Model Reasoning ===")
                    for idx, content in enumerate(meaningful_content, 1):
                        print(f"{content}")
                    print("=====================\n")
        
        # Extract computer calls
        computer_calls = [item for item in response.output 
                         if hasattr(item, 'type') and item.type == "computer_call"]
        
        if not computer_calls:
            print("No computer call found in response. Reverting control to human operator")
            break
        
        computer_call = computer_calls[0]
        if not hasattr(computer_call, 'call_id') or not hasattr(computer_call, 'action'):
            print("Computer call is missing required attributes.")
            break
        
        call_id = computer_call.call_id
        action = computer_call.action
        
        # Handle safety checks
        acknowledged_checks = []
        if hasattr(computer_call, 'pending_safety_checks') and computer_call.pending_safety_checks:
            pending_checks = computer_call.pending_safety_checks
            print("\nSafety checks required:")
            for check in pending_checks:
                print(f"- {check.code}: {check.message}")
            
            if input("\nDo you want to proceed? (y/n): ").lower() != 'y':
                print("Operation cancelled by user.")
                break
            
            acknowledged_checks = pending_checks
        
        # Execute the action
        try:
           await page.bring_to_front()
           await handle_action(page, action)
           
           # Check if a new page was created after the action
           if action.type in ["click"]:
               await asyncio.sleep(1.5)
               # Get all pages in the context
               all_pages = page.context.pages
               # If we have multiple pages, check if there's a newer one
               if len(all_pages) > 1:
                   newest_page = all_pages[-1]  # Last page is usually the newest
                   if newest_page != page and newest_page.url not in ["about:blank", ""]:
                       print(f"\tSwitching to new tab: {newest_page.url}")
                       page = newest_page  # Update our page reference
           elif action.type != "wait":
               await asyncio.sleep(0.5)
               
        except Exception as e:
           print(f"Error handling action {action.type}: {e}")
           import traceback
           traceback.print_exc()    

        # Take a screenshot after the action
        screenshot_base64 = await take_screenshot(page)

        print("\tNew screenshot taken")
        
        # Prepare input for the next request
        input_content = [{
            "type": "computer_call_output",
            "call_id": call_id,
            "output": {
                "type": "input_image",
                "image_url": f"data:image/png;base64,{screenshot_base64}"
            }
        }]
        
        # Add acknowledged safety checks if any
        if acknowledged_checks:
            acknowledged_checks_dicts = []
            for check in acknowledged_checks:
                acknowledged_checks_dicts.append({
                    "id": check.id,
                    "code": check.code,
                    "message": check.message
                })
            input_content[0]["acknowledged_safety_checks"] = acknowledged_checks_dicts
        
        # Add current URL for context
        try:
            current_url = page.url
            if current_url and current_url != "about:blank":
                input_content[0]["current_url"] = current_url
                print(f"\tCurrent URL: {current_url}")
        except Exception as e:
            print(f"Error getting URL: {e}")
        
        # Send the screenshot back for the next step
        try:
            response = client.responses.create(
                model=MODEL,
                previous_response_id=response_id,
                tools=[{
                    "type": "computer_use_preview",
                    "display_width": DISPLAY_WIDTH,
                    "display_height": DISPLAY_HEIGHT,
                    "environment": "browser"
                }],
                input=input_content,
                truncation="auto"
            )

            print("\tModel processing screenshot")
        except Exception as e:
            print(f"Error in API call: {e}")
            import traceback
            traceback.print_exc()
            break
    
    if iteration >= max_iterations - 1:
        print("Reached maximum number of iterations. Stopping.")

在本部分中,我们添加了以下代码:

  • 从模型中提取和显示文本及推理。
  • 处理计算机操作调用。
  • 处理需要用户确认的潜在安全检查。
  • 执行请求的操作。
  • 捕获新的屏幕截图。
  • 将更新的状态发送回模型并定义 ComputerTool
  • 对多个迭代重复此过程。

主要功能

主函数会协调整个过程:

    # Initialize OpenAI client
    client = AzureOpenAI(
        base_url=BASE_URL,
        azure_ad_token_provider=token_provider,
        api_version=API_VERSION
    )
    
    # Initialize Playwright
    async with async_playwright() as playwright:
        browser = await playwright.chromium.launch(
            headless=False,
            args=[f"--window-size={DISPLAY_WIDTH},{DISPLAY_HEIGHT}", "--disable-extensions"]
        )
        
        context = await browser.new_context(
            viewport={"width": DISPLAY_WIDTH, "height": DISPLAY_HEIGHT},
            accept_downloads=True
        )
        
        page = await context.new_page()
        
        # Navigate to starting page
        await page.goto("https://www.bing.com", wait_until="domcontentloaded")
        print("Browser initialized to Bing.com")
        
        # Main interaction loop
        try:
            while True:
                print("\n" + "="*50)
                user_input = input("Enter a task to perform (or 'exit' to quit): ")
                
                if user_input.lower() in ('exit', 'quit'):
                    break
                
                if not user_input.strip():
                    continue
                
                # Take initial screenshot
                screenshot_base64 = await take_screenshot(page)
                print("\nTake initial screenshot")
                
                # Initial request to the model
                response = client.responses.create(
                    model=MODEL,
                    tools=[{
                        "type": "computer_use_preview",
                        "display_width": DISPLAY_WIDTH,
                        "display_height": DISPLAY_HEIGHT,
                        "environment": "browser"
                    }],
                    instructions = "You are an AI agent with the ability to control a browser. You can control the keyboard and mouse. You take a screenshot after each action to check if your action was successful. Once you have completed the requested task you should stop running and pass back control to your human operator.",
                    input=[{
                        "role": "user",
                        "content": [{
                            "type": "input_text",
                            "text": user_input
                        }, {
                            "type": "input_image",
                            "image_url": f"data:image/png;base64,{screenshot_base64}"
                        }]
                    }],
                    reasoning={"generate_summary": "concise"},
                    truncation="auto"
                )
                print("\nSending model initial screenshot and instructions")

                # Process model actions
                await process_model_response(client, response, page)
                
        except Exception as e:
            print(f"An error occurred: {e}")
            import traceback
            traceback.print_exc()
        
        finally:
            # Close browser
            await context.close()
            await browser.close()
            print("Browser closed.")

if __name__ == "__main__":
    asyncio.run(main())

主函数:

  • 初始化 AzureOpenAI 客户端。
  • 设置 Playwright 浏览器。
  • 从 Bing.com 开始。
  • 输入一个循环以接受用户任务。
  • 捕获初始状态。
  • 将任务和屏幕截图发送到模型。
  • 处理模型的响应。
  • 重复操作,直到用户退出。
  • 确保浏览器已正确关闭。

完成脚本

谨慎

此代码是实验性代码,仅用于演示目的。 此代码仅用于说明响应 API 和 computer-use-preview 模型之间的基本流程。 虽然你可以在本地计算机上执行此代码,但我们强烈建议在低特权虚拟机上运行此代码,而且设置为无法访问敏感数据。 此代码仅用于基本测试目的。

import os
import asyncio
import base64
from openai import AzureOpenAI
from azure.identity import DefaultAzureCredential, get_bearer_token_provider
from playwright.async_api import async_playwright, TimeoutError


token_provider = get_bearer_token_provider(
    DefaultAzureCredential(), "https://cognitiveservices.azure.com/.default"
)

# Configuration

BASE_URL = "https://YOUR-RESOURCE-NAME.openai.azure.com/openai/v1/"
MODEL = "computer-use-preview"
DISPLAY_WIDTH = 1024
DISPLAY_HEIGHT = 768
API_VERSION = "preview"
ITERATIONS = 5 # Max number of iterations before forcing the model to return control to the human supervisor

# Key mapping for special keys in Playwright
KEY_MAPPING = {
    "/": "Slash", "\\": "Backslash", "alt": "Alt", "arrowdown": "ArrowDown",
    "arrowleft": "ArrowLeft", "arrowright": "ArrowRight", "arrowup": "ArrowUp",
    "backspace": "Backspace", "ctrl": "Control", "delete": "Delete", 
    "enter": "Enter", "esc": "Escape", "shift": "Shift", "space": " ",
    "tab": "Tab", "win": "Meta", "cmd": "Meta", "super": "Meta", "option": "Alt"
}

def validate_coordinates(x, y):
    """Ensure coordinates are within display bounds."""
    return max(0, min(x, DISPLAY_WIDTH)), max(0, min(y, DISPLAY_HEIGHT))

async def handle_action(page, action):
    """Handle different action types from the model."""
    action_type = action.type
    
    if action_type == "drag":
        print("Drag action is not supported in this implementation. Skipping.")
        return
        
    elif action_type == "click":
        button = getattr(action, "button", "left")
        # Validate coordinates
        x, y = validate_coordinates(action.x, action.y)
        
        print(f"\tAction: click at ({x}, {y}) with button '{button}'")
        
        if button == "back":
            await page.go_back()
        elif button == "forward":
            await page.go_forward()
        elif button == "wheel":
            await page.mouse.wheel(x, y)
        else:
            button_type = {"left": "left", "right": "right", "middle": "middle"}.get(button, "left")
            await page.mouse.click(x, y, button=button_type)
            try:
                await page.wait_for_load_state("domcontentloaded", timeout=3000)
            except TimeoutError:
                pass
        
    elif action_type == "double_click":
        # Validate coordinates
        x, y = validate_coordinates(action.x, action.y)
        
        print(f"\tAction: double click at ({x}, {y})")
        await page.mouse.dblclick(x, y)
        
    elif action_type == "scroll":
        scroll_x = getattr(action, "scroll_x", 0)
        scroll_y = getattr(action, "scroll_y", 0)
        # Validate coordinates
        x, y = validate_coordinates(action.x, action.y)
        
        print(f"\tAction: scroll at ({x}, {y}) with offsets ({scroll_x}, {scroll_y})")
        await page.mouse.move(x, y)
        await page.evaluate(f"window.scrollBy({{left: {scroll_x}, top: {scroll_y}, behavior: 'smooth'}});")
        
    elif action_type == "keypress":
        keys = getattr(action, "keys", [])
        print(f"\tAction: keypress {keys}")
        mapped_keys = [KEY_MAPPING.get(key.lower(), key) for key in keys]
        
        if len(mapped_keys) > 1:
            # For key combinations (like Ctrl+C)
            for key in mapped_keys:
                await page.keyboard.down(key)
            await asyncio.sleep(0.1)
            for key in reversed(mapped_keys):
                await page.keyboard.up(key)
        else:
            for key in mapped_keys:
                await page.keyboard.press(key)
                
    elif action_type == "type":
        text = getattr(action, "text", "")
        print(f"\tAction: type text: {text}")
        await page.keyboard.type(text, delay=20)
        
    elif action_type == "wait":
        ms = getattr(action, "ms", 1000)
        print(f"\tAction: wait {ms}ms")
        await asyncio.sleep(ms / 1000)
        
    elif action_type == "screenshot":
        print("\tAction: screenshot")
        
    else:
        print(f"\tUnrecognized action: {action_type}")

async def take_screenshot(page):
    """Take a screenshot and return base64 encoding with caching for failures."""
    global last_successful_screenshot
    
    try:
        screenshot_bytes = await page.screenshot(full_page=False)
        last_successful_screenshot = base64.b64encode(screenshot_bytes).decode("utf-8")
        return last_successful_screenshot
    except Exception as e:
        print(f"Screenshot failed: {e}")
        print(f"Using cached screenshot from previous successful capture")
        if last_successful_screenshot:
            return last_successful_screenshot


async def process_model_response(client, response, page, max_iterations=ITERATIONS):
    """Process the model's response and execute actions."""
    for iteration in range(max_iterations):
        if not hasattr(response, 'output') or not response.output:
            print("No output from model.")
            break
        
        # Safely access response id
        response_id = getattr(response, 'id', 'unknown')
        print(f"\nIteration {iteration + 1} - Response ID: {response_id}\n")
        
        # Print text responses and reasoning
        for item in response.output:
            # Handle text output
            if hasattr(item, 'type') and item.type == "text":
                print(f"\nModel message: {item.text}\n")
                
            # Handle reasoning output
            if hasattr(item, 'type') and item.type == "reasoning":
                # Extract meaningful content from the reasoning
                meaningful_content = []
                
                if hasattr(item, 'summary') and item.summary:
                    for summary in item.summary:
                        # Handle different potential formats of summary content
                        if isinstance(summary, str) and summary.strip():
                            meaningful_content.append(summary)
                        elif hasattr(summary, 'text') and summary.text.strip():
                            meaningful_content.append(summary.text)
                
                # Only print reasoning section if there's actual content
                if meaningful_content:
                    print("=== Model Reasoning ===")
                    for idx, content in enumerate(meaningful_content, 1):
                        print(f"{content}")
                    print("=====================\n")
        
        # Extract computer calls
        computer_calls = [item for item in response.output 
                         if hasattr(item, 'type') and item.type == "computer_call"]
        
        if not computer_calls:
            print("No computer call found in response. Reverting control to human supervisor")
            break
        
        computer_call = computer_calls[0]
        if not hasattr(computer_call, 'call_id') or not hasattr(computer_call, 'action'):
            print("Computer call is missing required attributes.")
            break
        
        call_id = computer_call.call_id
        action = computer_call.action
        
        # Handle safety checks
        acknowledged_checks = []
        if hasattr(computer_call, 'pending_safety_checks') and computer_call.pending_safety_checks:
            pending_checks = computer_call.pending_safety_checks
            print("\nSafety checks required:")
            for check in pending_checks:
                print(f"- {check.code}: {check.message}")
            
            if input("\nDo you want to proceed? (y/n): ").lower() != 'y':
                print("Operation cancelled by user.")
                break
            
            acknowledged_checks = pending_checks
        
        # Execute the action
        try:
           await page.bring_to_front()
           await handle_action(page, action)
           
           # Check if a new page was created after the action
           if action.type in ["click"]:
               await asyncio.sleep(1.5)
               # Get all pages in the context
               all_pages = page.context.pages
               # If we have multiple pages, check if there's a newer one
               if len(all_pages) > 1:
                   newest_page = all_pages[-1]  # Last page is usually the newest
                   if newest_page != page and newest_page.url not in ["about:blank", ""]:
                       print(f"\tSwitching to new tab: {newest_page.url}")
                       page = newest_page  # Update our page reference
           elif action.type != "wait":
               await asyncio.sleep(0.5)
               
        except Exception as e:
           print(f"Error handling action {action.type}: {e}")
           import traceback
           traceback.print_exc()    

        # Take a screenshot after the action
        screenshot_base64 = await take_screenshot(page)

        print("\tNew screenshot taken")
        
        # Prepare input for the next request
        input_content = [{
            "type": "computer_call_output",
            "call_id": call_id,
            "output": {
                "type": "input_image",
                "image_url": f"data:image/png;base64,{screenshot_base64}"
            }
        }]
        
        # Add acknowledged safety checks if any
        if acknowledged_checks:
            acknowledged_checks_dicts = []
            for check in acknowledged_checks:
                acknowledged_checks_dicts.append({
                    "id": check.id,
                    "code": check.code,
                    "message": check.message
                })
            input_content[0]["acknowledged_safety_checks"] = acknowledged_checks_dicts
        
        # Add current URL for context
        try:
            current_url = page.url
            if current_url and current_url != "about:blank":
                input_content[0]["current_url"] = current_url
                print(f"\tCurrent URL: {current_url}")
        except Exception as e:
            print(f"Error getting URL: {e}")
        
        # Send the screenshot back for the next step
        try:
            response = client.responses.create(
                model=MODEL,
                previous_response_id=response_id,
                tools=[{
                    "type": "computer_use_preview",
                    "display_width": DISPLAY_WIDTH,
                    "display_height": DISPLAY_HEIGHT,
                    "environment": "browser"
                }],
                input=input_content,
                truncation="auto"
            )

            print("\tModel processing screenshot")
        except Exception as e:
            print(f"Error in API call: {e}")
            import traceback
            traceback.print_exc()
            break
    
    if iteration >= max_iterations - 1:
        print("Reached maximum number of iterations. Stopping.")
        
async def main():    
    # Initialize OpenAI client
    client = AzureOpenAI(
        base_url=BASE_URL,
        azure_ad_token_provider=token_provider,
        api_version=API_VERSION
    )
    
    # Initialize Playwright
    async with async_playwright() as playwright:
        browser = await playwright.chromium.launch(
            headless=False,
            args=[f"--window-size={DISPLAY_WIDTH},{DISPLAY_HEIGHT}", "--disable-extensions"]
        )
        
        context = await browser.new_context(
            viewport={"width": DISPLAY_WIDTH, "height": DISPLAY_HEIGHT},
            accept_downloads=True
        )
        
        page = await context.new_page()
        
        # Navigate to starting page
        await page.goto("https://www.bing.com", wait_until="domcontentloaded")
        print("Browser initialized to Bing.com")
        
        # Main interaction loop
        try:
            while True:
                print("\n" + "="*50)
                user_input = input("Enter a task to perform (or 'exit' to quit): ")
                
                if user_input.lower() in ('exit', 'quit'):
                    break
                
                if not user_input.strip():
                    continue
                
                # Take initial screenshot
                screenshot_base64 = await take_screenshot(page)
                print("\nTake initial screenshot")
                
                # Initial request to the model
                response = client.responses.create(
                    model=MODEL,
                    tools=[{
                        "type": "computer_use_preview",
                        "display_width": DISPLAY_WIDTH,
                        "display_height": DISPLAY_HEIGHT,
                        "environment": "browser"
                    }],
                    instructions = "You are an AI agent with the ability to control a browser. You can control the keyboard and mouse. You take a screenshot after each action to check if your action was successful. Once you have completed the requested task you should stop running and pass back control to your human supervisor.",
                    input=[{
                        "role": "user",
                        "content": [{
                            "type": "input_text",
                            "text": user_input
                        }, {
                            "type": "input_image",
                            "image_url": f"data:image/png;base64,{screenshot_base64}"
                        }]
                    }],
                    reasoning={"generate_summary": "concise"},
                    truncation="auto"
                )
                print("\nSending model initial screenshot and instructions")

                # Process model actions
                await process_model_response(client, response, page)
                
        except Exception as e:
            print(f"An error occurred: {e}")
            import traceback
            traceback.print_exc()
        
        finally:
            # Close browser
            await context.close()
            await browser.close()
            print("Browser closed.")

if __name__ == "__main__":
    asyncio.run(main())

另请参阅