この記事では、Azure OpenAI でコンピューターの使用を操作する方法について説明します。 Computer Use は、コンピューター システムやアプリケーションと UI を介してやり取りすることでタスクを実行できる特殊なモデルを使用する特殊な AI ツールです。 Computer Use を使用すると、ビジュアル要素を解釈し、画面上のコンテンツに基づいてアクションを実行することで、複雑なタスクを処理し、意思決定を行うことができるエージェントを作成できます。
コンピューターの使用は次の機能を提供します。
- 自律的ナビゲーション: 例えば、アプリケーションを開いたり、ボタンをクリックしたり、フォームに入力したり、複数ページのワークフローを移動したりします。
- 動的適応: UI の変更を解釈し、それに応じてアクションを調整します。
- アプリケーション間のタスク実行: Webベースおよびデスクトップアプリケーションをまたいで動作します。
- 自然言語インターフェイス: ユーザーは自然な言語でタスクを記述でき、Computer Use モデルが適切なUI操作を決定して実行します。
アクセスを要求する
computer-use-preview
モデルにアクセスするには、登録が必要であり、Microsoft の適格性基準に基づいてアクセス権が付与されます。 他の制限付きアクセス モデルにアクセスできるお客様は、引き続きこのモデルへのアクセスを要求する必要があります。
アクセスの要求: computer-use-preview 制限付きアクセス モデルの申請
アクセス権が付与されたら、モデルのデプロイを作成する必要があります。
地域のサポート
コンピューターの使用は、次のリージョンで利用できます。
eastus2
swedencentral
southindia
応答 API を使用した Computer Use モデルへの API 呼び出しの送信
コンピュータ使用ツールには、応答APIを介してアクセスします。 このツールは、テキストの入力やクリックの実行などのアクションを送信する連続ループで動作します。 コードはコンピューターでこれらのアクションを実行し、結果のスクリーンショットをモデルに送信します。
このようにして、コードはコンピューター インターフェイスを使用して人間のアクションをシミュレートしますが、モデルではスクリーンショットを使用して環境の状態を理解し、次のアクションを提案します。
次の例は、基本的な API 呼び出しを示しています。
要求を送信するには、次の Python パッケージをインストールする必要があります。
pip install openai
pip install azure-identity
import os
from azure.identity import DefaultAzureCredential, get_bearer_token_provider
from openai import AzureOpenAI
#from openai import OpenAI
token_provider = get_bearer_token_provider(DefaultAzureCredential(), "https://cognitiveservices.azure.com/.default")
client = AzureOpenAI(
base_url = "https://YOUR-RESOURCE-NAME.openai.azure.com/openai/v1/",
azure_ad_token_provider=token_provider,
api_version="preview"
)
response = client.responses.create(
model="computer-use-preview", # set this to your model deployment name
tools=[{
"type": "computer_use_preview",
"display_width": 1024,
"display_height": 768,
"environment": "browser" # other possible values: "mac", "windows", "ubuntu"
}],
input=[
{
"role": "user",
"content": "Check the latest AI news on bing.com."
}
],
truncation="auto"
)
print(response.output)
アウトプット
[
ResponseComputerToolCall(
id='cu_67d841873c1081908bfc88b90a8555e0',
action=ActionScreenshot(type='screenshot'),
call_id='call_wwEnfFDqQr1Z4Edk62Fyo7Nh',
pending_safety_checks=[],
status='completed',
type='computer_call'
)
]
最初の API 要求が送信されたら、指定されたアクションがアプリケーション コードで実行されるループを実行し、各ターンのスクリーンショットを送信して、モデルが環境の更新された状態を評価できるようにします。
## response.output is the previous response from the model
computer_calls = [item for item in response.output if item.type == "computer_call"]
if not computer_calls:
print("No computer call found. Output from model:")
for item in response.output:
print(item)
computer_call = computer_calls[0]
last_call_id = computer_call.call_id
action = computer_call.action
# Your application would now perform the action suggested by the model
# And create a screenshot of the updated state of the environment before sending another response
response_2 = client.responses.create(
model="computer-use-preview",
previous_response_id=response.id,
tools=[{
"type": "computer_use_preview",
"display_width": 1024,
"display_height": 768,
"environment": "browser" # other possible values: "mac", "windows", "ubuntu"
}],
input=[
{
"call_id": last_call_id,
"type": "computer_call_output",
"output": {
"type": "input_image",
# Image should be in base64
"image_url": f"data:image/png;base64,{<base64_string>}"
}
}
],
truncation="auto"
)
コンピューターの使用の統合について
コンピューター使用ツールを使用する場合は、通常、次の操作を実行してアプリケーションに統合します。
- コンピューター使用ツールの呼び出し、表示サイズと環境を含む要求をモデルに送信します。 また、最初の API 要求に環境の初期状態のスクリーンショットを含めることもできます。
- モデルから応答を受け取ります。 応答に
action
項目がある場合、それらの項目には、指定した目標に向けて進行するための推奨アクションが含まれます。 たとえば、アクションをscreenshot
して、モデルが更新されたスクリーンショットで現在の状態を評価したり、マウスを移動する場所を示す X/Y 座標でclick
したりできます。 - コンピューターまたはブラウザー環境でアプリケーション コードを使用してアクションを実行します。
- アクションを実行した後、環境の更新された状態をスクリーンショットとしてキャプチャします。
- 更新された状態を
computer_call_output
として新しい要求を送信し、モデルがアクションの要求を停止するか、あなたが停止することを決定するまで、このループを繰り返します。
会話履歴の処理
previous_response_id
パラメーターを使用して、現在の要求を前の応答にリンクできます。 会話履歴を管理しない場合は、このパラメーターを使用することをお勧めします。
このパラメーターを使用しない場合は、前の要求の応答出力で返されるすべての項目を入力配列に含める必要があります。 これには、存在する場合の理由項目が含まれます。
安全性チェック
API には、迅速な挿入とモデルの間違いから保護するための安全性チェックがあります。 これらのチェックには次のものが含まれます。
- 悪意のある命令検出: システムはスクリーンショット画像を評価し、モデルの動作を変更する可能性のある敵対的なコンテンツが含まれているかどうかを確認します。
- 無関係なドメイン検出: システムによって
current_url
(指定されている場合) が評価され、会話履歴を考慮して現在のドメインが関連するものと見なされるかどうかが確認されます。 - 機密性の高いドメインの検出: システムは
current_url
(指定されている場合) をチェックし、ユーザーが機密ドメインに存在することが検出されると警告を発生させます。
上記のチェックの 1 つ以上がトリガーされると、モデルが次の computer_call
を返したときに、 pending_safety_checks
パラメーターを使用して安全性チェックが発生します。
"output": [
{
"type": "reasoning",
"id": "rs_67cb...",
"summary": [
{
"type": "summary_text",
"text": "Exploring 'File' menu option."
}
]
},
{
"type": "computer_call",
"id": "cu_67cb...",
"call_id": "call_nEJ...",
"action": {
"type": "click",
"button": "left",
"x": 135,
"y": 193
},
"pending_safety_checks": [
{
"id": "cu_sc_67cb...",
"code": "malicious_instructions",
"message": "We've detected instructions that may cause your application to perform malicious or unauthorized actions. Please acknowledge this warning if you'd like to proceed."
}
],
"status": "completed"
}
]
続行するには、次の要求で安全性チェックを acknowledged_safety_checks
に戻す必要があります。
"input":[
{
"type": "computer_call_output",
"call_id": "<call_id>",
"acknowledged_safety_checks": [
{
"id": "<safety_check_id>",
"code": "malicious_instructions",
"message": "We've detected instructions that may cause your application to perform malicious or unauthorized actions. Please acknowledge this warning if you'd like to proceed."
}
],
"output": {
"type": "computer_screenshot",
"image_url": "<image_url>"
}
}
],
安全チェックの取り扱い
pending_safety_checks
が返されるすべての場合、適切なモデルの動作と精度を確認するために、アクションをエンド ユーザーに引き渡す必要があります。
malicious_instructions
とirrelevant_domain
: エンド ユーザーは、モデルアクションを確認し、モデルが意図したとおりに動作していることを確認する必要があります。sensitive_domain
: エンド ユーザーがこれらのサイトでモデル アクションを積極的に監視していることを確かめます。 この "ウォッチ モード" の正確な実装はアプリケーションによって異なりますが、たとえば、サイトでユーザーのインプレッション データを収集して、アプリケーションにアクティブなエンド ユーザーエンゲージメントがあることを確認できます。
Playwright の統合
このセクションでは、Azure OpenAI の computer-use-preview
モデルと Playwright を統合して基本的なブラウザー操作を自動化する簡単なサンプル スクリプトを提供します。 モデルと Playwright を組み合わせることで、モデルはブラウザー画面を表示し、意思決定を行い、Web サイトのクリック、入力、ナビゲーションなどのアクションを実行できます。 このコード例を実行するときは注意が必要です。 このコードはローカルで実行するように設計されていますが、テスト環境でのみ実行する必要があります。 人間を使用して決定を確認し、モデルに機密データへのアクセス権を付与しません。
まず、 Playwright 用の Python ライブラリをインストールする必要があります。
pip install playwright
パッケージがインストールされたら、また実行する必要があります
playwright install
インポートと構成
まず、必要なライブラリをインポートし、構成パラメーターを定義します。 asyncio
を使用しているため、このコードは Jupyter ノートブックの外部で実行します。 最初にコードをチャンクで説明し、その使用方法を示します。
import os
import asyncio
import base64
from openai import AzureOpenAI
from azure.identity import DefaultAzureCredential, get_bearer_token_provider
from playwright.async_api import async_playwright, TimeoutError
token_provider = get_bearer_token_provider(
DefaultAzureCredential(), "https://cognitiveservices.azure.com/.default"
)
# Configuration
BASE_URL = "https://YOUR-RESOURCE-NAME.openai.azure.com/openai/v1/"
MODEL = "computer-use-preview" # Set to model deployment name
DISPLAY_WIDTH = 1024
DISPLAY_HEIGHT = 768
API_VERSION = "preview" #Use this API version or later
ITERATIONS = 5 # Max number of iterations before returning control to human supervisor
ブラウザー操作のキー マッピング
次に、モデルがプレイライトに渡す必要がある特殊なキーのマッピングを設定します。 最終的に、モデルはアクション自体を実行することはなく、コマンドの表現を渡し、それらのコマンドを実行して選択した環境で実行できる最終的な統合レイヤーを提供する必要があります。
これは、考えられるキー マッピングの完全な一覧ではありません。 必要に応じて、この一覧を展開できます。 この辞書は、Playwright とのモデルの統合に特化しています。 モデルを代替ライブラリと統合して、オペレーティング システムのキーボード/マウスへの API アクセスを提供する場合は、そのライブラリに固有のマッピングを提供する必要があります。
# Key mapping for special keys in Playwright
KEY_MAPPING = {
"/": "Slash", "\\": "Backslash", "alt": "Alt", "arrowdown": "ArrowDown",
"arrowleft": "ArrowLeft", "arrowright": "ArrowRight", "arrowup": "ArrowUp",
"backspace": "Backspace", "ctrl": "Control", "delete": "Delete",
"enter": "Enter", "esc": "Escape", "shift": "Shift", "space": " ",
"tab": "Tab", "win": "Meta", "cmd": "Meta", "super": "Meta", "option": "Alt"
}
このディクショナリは、ユーザーフレンドリなキー名を、Playwright のキーボード API で想定される形式に変換します。
座標検証関数
モデルから渡されたマウス アクションがブラウザー ウィンドウの境界内に留まるようにするには、次のユーティリティ関数を追加します。
def validate_coordinates(x, y):
"""Ensure coordinates are within display bounds."""
return max(0, min(x, DISPLAY_WIDTH)), max(0, min(y, DISPLAY_HEIGHT))
この単純なユーティリティは、座標をウィンドウの寸法にクランプすることで、範囲外のエラーを防ぎます。
アクション処理
ブラウザーの自動化の中核となるのは、さまざまな種類のユーザー操作を処理し、ブラウザー内のアクションに変換するアクション ハンドラーです。
async def handle_action(page, action):
"""Handle different action types from the model."""
action_type = action.type
if action_type == "drag":
print("Drag action is not supported in this implementation. Skipping.")
return
elif action_type == "click":
button = getattr(action, "button", "left")
# Validate coordinates
x, y = validate_coordinates(action.x, action.y)
print(f"\tAction: click at ({x}, {y}) with button '{button}'")
if button == "back":
await page.go_back()
elif button == "forward":
await page.go_forward()
elif button == "wheel":
await page.mouse.wheel(x, y)
else:
button_type = {"left": "left", "right": "right", "middle": "middle"}.get(button, "left")
await page.mouse.click(x, y, button=button_type)
try:
await page.wait_for_load_state("domcontentloaded", timeout=3000)
except TimeoutError:
pass
elif action_type == "double_click":
# Validate coordinates
x, y = validate_coordinates(action.x, action.y)
print(f"\tAction: double click at ({x}, {y})")
await page.mouse.dblclick(x, y)
elif action_type == "scroll":
scroll_x = getattr(action, "scroll_x", 0)
scroll_y = getattr(action, "scroll_y", 0)
# Validate coordinates
x, y = validate_coordinates(action.x, action.y)
print(f"\tAction: scroll at ({x}, {y}) with offsets ({scroll_x}, {scroll_y})")
await page.mouse.move(x, y)
await page.evaluate(f"window.scrollBy({{left: {scroll_x}, top: {scroll_y}, behavior: 'smooth'}});")
elif action_type == "keypress":
keys = getattr(action, "keys", [])
print(f"\tAction: keypress {keys}")
mapped_keys = [KEY_MAPPING.get(key.lower(), key) for key in keys]
if len(mapped_keys) > 1:
# For key combinations (like Ctrl+C)
for key in mapped_keys:
await page.keyboard.down(key)
await asyncio.sleep(0.1)
for key in reversed(mapped_keys):
await page.keyboard.up(key)
else:
for key in mapped_keys:
await page.keyboard.press(key)
elif action_type == "type":
text = getattr(action, "text", "")
print(f"\tAction: type text: {text}")
await page.keyboard.type(text, delay=20)
elif action_type == "wait":
ms = getattr(action, "ms", 1000)
print(f"\tAction: wait {ms}ms")
await asyncio.sleep(ms / 1000)
elif action_type == "screenshot":
print("\tAction: screenshot")
else:
print(f"\tUnrecognized action: {action_type}")
この関数は、さまざまな種類のアクションの処理を試みます。 computer-use-preview
によって生成されるコマンドと、アクションを実行する Playwright ライブラリの間で変換する必要があります。 詳細については、 ComputerAction
のリファレンス ドキュメントを参照してください。
スクリーンショット キャプチャ
モデルがモデルと対話している内容を確認できるようにするには、スクリーンショットをキャプチャする方法が必要です。 このコードでは、Playwright を使用してスクリーンショットをキャプチャし、ブラウザー ウィンドウのコンテンツのみにビューを制限しています。 このスクリーンショットには、ブラウザー GUI の URL バーやその他の側面は含まれません。 メイン ブラウザー ウィンドウの外側にモデルを表示する必要がある場合は、独自のスクリーンショット関数を作成してモデルを拡張できます。
async def take_screenshot(page):
"""Take a screenshot and return base64 encoding with caching for failures."""
global last_successful_screenshot
try:
screenshot_bytes = await page.screenshot(full_page=False)
last_successful_screenshot = base64.b64encode(screenshot_bytes).decode("utf-8")
return last_successful_screenshot
except Exception as e:
print(f"Screenshot failed: {e}")
print(f"Using cached screenshot from previous successful capture")
if last_successful_screenshot:
return last_successful_screenshot
この関数は、現在のブラウザーの状態をイメージとしてキャプチャし、モデルに送信する準備ができている base64 でエンコードされた文字列として返します。 実行しようとしたコマンドが成功したかどうかをモデルが確認できるように、各ステップの後にループで常にこれを行います。これにより、スクリーンショットの内容に基づいて調整できるようになります。 モデルでスクリーンショットを撮る必要があるかどうかを判断することもできますが、わかりやすくするために、イテレーションごとに強制的にスクリーンショットを作成します。
モデル応答処理
この関数は、モデルの応答を処理し、要求されたアクションを実行します。
async def process_model_response(client, response, page, max_iterations=ITERATIONS):
"""Process the model's response and execute actions."""
for iteration in range(max_iterations):
if not hasattr(response, 'output') or not response.output:
print("No output from model.")
break
# Safely access response id
response_id = getattr(response, 'id', 'unknown')
print(f"\nIteration {iteration + 1} - Response ID: {response_id}\n")
# Print text responses and reasoning
for item in response.output:
# Handle text output
if hasattr(item, 'type') and item.type == "text":
print(f"\nModel message: {item.text}\n")
# Handle reasoning output
if hasattr(item, 'type') and item.type == "reasoning":
# Extract meaningful content from the reasoning
meaningful_content = []
if hasattr(item, 'summary') and item.summary:
for summary in item.summary:
# Handle different potential formats of summary content
if isinstance(summary, str) and summary.strip():
meaningful_content.append(summary)
elif hasattr(summary, 'text') and summary.text.strip():
meaningful_content.append(summary.text)
# Only print reasoning section if there's actual content
if meaningful_content:
print("=== Model Reasoning ===")
for idx, content in enumerate(meaningful_content, 1):
print(f"{content}")
print("=====================\n")
# Extract computer calls
computer_calls = [item for item in response.output
if hasattr(item, 'type') and item.type == "computer_call"]
if not computer_calls:
print("No computer call found in response. Reverting control to human operator")
break
computer_call = computer_calls[0]
if not hasattr(computer_call, 'call_id') or not hasattr(computer_call, 'action'):
print("Computer call is missing required attributes.")
break
call_id = computer_call.call_id
action = computer_call.action
# Handle safety checks
acknowledged_checks = []
if hasattr(computer_call, 'pending_safety_checks') and computer_call.pending_safety_checks:
pending_checks = computer_call.pending_safety_checks
print("\nSafety checks required:")
for check in pending_checks:
print(f"- {check.code}: {check.message}")
if input("\nDo you want to proceed? (y/n): ").lower() != 'y':
print("Operation cancelled by user.")
break
acknowledged_checks = pending_checks
# Execute the action
try:
await page.bring_to_front()
await handle_action(page, action)
# Check if a new page was created after the action
if action.type in ["click"]:
await asyncio.sleep(1.5)
# Get all pages in the context
all_pages = page.context.pages
# If we have multiple pages, check if there's a newer one
if len(all_pages) > 1:
newest_page = all_pages[-1] # Last page is usually the newest
if newest_page != page and newest_page.url not in ["about:blank", ""]:
print(f"\tSwitching to new tab: {newest_page.url}")
page = newest_page # Update our page reference
elif action.type != "wait":
await asyncio.sleep(0.5)
except Exception as e:
print(f"Error handling action {action.type}: {e}")
import traceback
traceback.print_exc()
# Take a screenshot after the action
screenshot_base64 = await take_screenshot(page)
print("\tNew screenshot taken")
# Prepare input for the next request
input_content = [{
"type": "computer_call_output",
"call_id": call_id,
"output": {
"type": "input_image",
"image_url": f"data:image/png;base64,{screenshot_base64}"
}
}]
# Add acknowledged safety checks if any
if acknowledged_checks:
acknowledged_checks_dicts = []
for check in acknowledged_checks:
acknowledged_checks_dicts.append({
"id": check.id,
"code": check.code,
"message": check.message
})
input_content[0]["acknowledged_safety_checks"] = acknowledged_checks_dicts
# Add current URL for context
try:
current_url = page.url
if current_url and current_url != "about:blank":
input_content[0]["current_url"] = current_url
print(f"\tCurrent URL: {current_url}")
except Exception as e:
print(f"Error getting URL: {e}")
# Send the screenshot back for the next step
try:
response = client.responses.create(
model=MODEL,
previous_response_id=response_id,
tools=[{
"type": "computer_use_preview",
"display_width": DISPLAY_WIDTH,
"display_height": DISPLAY_HEIGHT,
"environment": "browser"
}],
input=input_content,
truncation="auto"
)
print("\tModel processing screenshot")
except Exception as e:
print(f"Error in API call: {e}")
import traceback
traceback.print_exc()
break
if iteration >= max_iterations - 1:
print("Reached maximum number of iterations. Stopping.")
このセクションでは、次のコードを追加しました。
- モデルからテキストと推論を抽出して表示します。
- コンピューター アクションの呼び出しを処理します。
- ユーザーの確認が必要な潜在的な安全性チェックを処理します。
- 要求されたアクションを実行します。
- 新しいスクリーンショットをキャプチャします。
- 更新された状態をモデルに戻し、
ComputerTool
を定義します。 - 複数のイテレーションに対してこのプロセスを繰り返します。
メイン関数
main 関数はプロセス全体を調整します。
# Initialize OpenAI client
client = AzureOpenAI(
base_url=BASE_URL,
azure_ad_token_provider=token_provider,
api_version=API_VERSION
)
# Initialize Playwright
async with async_playwright() as playwright:
browser = await playwright.chromium.launch(
headless=False,
args=[f"--window-size={DISPLAY_WIDTH},{DISPLAY_HEIGHT}", "--disable-extensions"]
)
context = await browser.new_context(
viewport={"width": DISPLAY_WIDTH, "height": DISPLAY_HEIGHT},
accept_downloads=True
)
page = await context.new_page()
# Navigate to starting page
await page.goto("https://www.bing.com", wait_until="domcontentloaded")
print("Browser initialized to Bing.com")
# Main interaction loop
try:
while True:
print("\n" + "="*50)
user_input = input("Enter a task to perform (or 'exit' to quit): ")
if user_input.lower() in ('exit', 'quit'):
break
if not user_input.strip():
continue
# Take initial screenshot
screenshot_base64 = await take_screenshot(page)
print("\nTake initial screenshot")
# Initial request to the model
response = client.responses.create(
model=MODEL,
tools=[{
"type": "computer_use_preview",
"display_width": DISPLAY_WIDTH,
"display_height": DISPLAY_HEIGHT,
"environment": "browser"
}],
instructions = "You are an AI agent with the ability to control a browser. You can control the keyboard and mouse. You take a screenshot after each action to check if your action was successful. Once you have completed the requested task you should stop running and pass back control to your human operator.",
input=[{
"role": "user",
"content": [{
"type": "input_text",
"text": user_input
}, {
"type": "input_image",
"image_url": f"data:image/png;base64,{screenshot_base64}"
}]
}],
reasoning={"generate_summary": "concise"},
truncation="auto"
)
print("\nSending model initial screenshot and instructions")
# Process model actions
await process_model_response(client, response, page)
except Exception as e:
print(f"An error occurred: {e}")
import traceback
traceback.print_exc()
finally:
# Close browser
await context.close()
await browser.close()
print("Browser closed.")
if __name__ == "__main__":
asyncio.run(main())
主要な機能
- AzureOpenAI クライアントを初期化します。
- Playwright ブラウザーを設定します。
- Bing.com から開始します。
- ユーザー タスクを受け入れるループを入力します。
- 初期状態をキャプチャします。
- タスクとスクリーンショットをモデルに送信します。
- モデルの応答を処理します。
- ユーザーが終了するまで繰り返します。
- ブラウザーが正しく閉じられるようにします。
完全なスクリプト
注意事項
このコードは試験的であり、デモンストレーションのみを目的としています。 これは、応答 API と computer-use-preview
モデルの基本的なフローを示すことを目的としています。 このコードはローカル コンピューターで実行できますが、機密データにアクセスできない低い特権の仮想マシンでこのコードを実行することを強くお勧めします。 このコードは、基本的なテストのみを目的としています。
import os
import asyncio
import base64
from openai import AzureOpenAI
from azure.identity import DefaultAzureCredential, get_bearer_token_provider
from playwright.async_api import async_playwright, TimeoutError
token_provider = get_bearer_token_provider(
DefaultAzureCredential(), "https://cognitiveservices.azure.com/.default"
)
# Configuration
BASE_URL = "https://YOUR-RESOURCE-NAME.openai.azure.com/openai/v1/"
MODEL = "computer-use-preview"
DISPLAY_WIDTH = 1024
DISPLAY_HEIGHT = 768
API_VERSION = "preview"
ITERATIONS = 5 # Max number of iterations before forcing the model to return control to the human supervisor
# Key mapping for special keys in Playwright
KEY_MAPPING = {
"/": "Slash", "\\": "Backslash", "alt": "Alt", "arrowdown": "ArrowDown",
"arrowleft": "ArrowLeft", "arrowright": "ArrowRight", "arrowup": "ArrowUp",
"backspace": "Backspace", "ctrl": "Control", "delete": "Delete",
"enter": "Enter", "esc": "Escape", "shift": "Shift", "space": " ",
"tab": "Tab", "win": "Meta", "cmd": "Meta", "super": "Meta", "option": "Alt"
}
def validate_coordinates(x, y):
"""Ensure coordinates are within display bounds."""
return max(0, min(x, DISPLAY_WIDTH)), max(0, min(y, DISPLAY_HEIGHT))
async def handle_action(page, action):
"""Handle different action types from the model."""
action_type = action.type
if action_type == "drag":
print("Drag action is not supported in this implementation. Skipping.")
return
elif action_type == "click":
button = getattr(action, "button", "left")
# Validate coordinates
x, y = validate_coordinates(action.x, action.y)
print(f"\tAction: click at ({x}, {y}) with button '{button}'")
if button == "back":
await page.go_back()
elif button == "forward":
await page.go_forward()
elif button == "wheel":
await page.mouse.wheel(x, y)
else:
button_type = {"left": "left", "right": "right", "middle": "middle"}.get(button, "left")
await page.mouse.click(x, y, button=button_type)
try:
await page.wait_for_load_state("domcontentloaded", timeout=3000)
except TimeoutError:
pass
elif action_type == "double_click":
# Validate coordinates
x, y = validate_coordinates(action.x, action.y)
print(f"\tAction: double click at ({x}, {y})")
await page.mouse.dblclick(x, y)
elif action_type == "scroll":
scroll_x = getattr(action, "scroll_x", 0)
scroll_y = getattr(action, "scroll_y", 0)
# Validate coordinates
x, y = validate_coordinates(action.x, action.y)
print(f"\tAction: scroll at ({x}, {y}) with offsets ({scroll_x}, {scroll_y})")
await page.mouse.move(x, y)
await page.evaluate(f"window.scrollBy({{left: {scroll_x}, top: {scroll_y}, behavior: 'smooth'}});")
elif action_type == "keypress":
keys = getattr(action, "keys", [])
print(f"\tAction: keypress {keys}")
mapped_keys = [KEY_MAPPING.get(key.lower(), key) for key in keys]
if len(mapped_keys) > 1:
# For key combinations (like Ctrl+C)
for key in mapped_keys:
await page.keyboard.down(key)
await asyncio.sleep(0.1)
for key in reversed(mapped_keys):
await page.keyboard.up(key)
else:
for key in mapped_keys:
await page.keyboard.press(key)
elif action_type == "type":
text = getattr(action, "text", "")
print(f"\tAction: type text: {text}")
await page.keyboard.type(text, delay=20)
elif action_type == "wait":
ms = getattr(action, "ms", 1000)
print(f"\tAction: wait {ms}ms")
await asyncio.sleep(ms / 1000)
elif action_type == "screenshot":
print("\tAction: screenshot")
else:
print(f"\tUnrecognized action: {action_type}")
async def take_screenshot(page):
"""Take a screenshot and return base64 encoding with caching for failures."""
global last_successful_screenshot
try:
screenshot_bytes = await page.screenshot(full_page=False)
last_successful_screenshot = base64.b64encode(screenshot_bytes).decode("utf-8")
return last_successful_screenshot
except Exception as e:
print(f"Screenshot failed: {e}")
print(f"Using cached screenshot from previous successful capture")
if last_successful_screenshot:
return last_successful_screenshot
async def process_model_response(client, response, page, max_iterations=ITERATIONS):
"""Process the model's response and execute actions."""
for iteration in range(max_iterations):
if not hasattr(response, 'output') or not response.output:
print("No output from model.")
break
# Safely access response id
response_id = getattr(response, 'id', 'unknown')
print(f"\nIteration {iteration + 1} - Response ID: {response_id}\n")
# Print text responses and reasoning
for item in response.output:
# Handle text output
if hasattr(item, 'type') and item.type == "text":
print(f"\nModel message: {item.text}\n")
# Handle reasoning output
if hasattr(item, 'type') and item.type == "reasoning":
# Extract meaningful content from the reasoning
meaningful_content = []
if hasattr(item, 'summary') and item.summary:
for summary in item.summary:
# Handle different potential formats of summary content
if isinstance(summary, str) and summary.strip():
meaningful_content.append(summary)
elif hasattr(summary, 'text') and summary.text.strip():
meaningful_content.append(summary.text)
# Only print reasoning section if there's actual content
if meaningful_content:
print("=== Model Reasoning ===")
for idx, content in enumerate(meaningful_content, 1):
print(f"{content}")
print("=====================\n")
# Extract computer calls
computer_calls = [item for item in response.output
if hasattr(item, 'type') and item.type == "computer_call"]
if not computer_calls:
print("No computer call found in response. Reverting control to human supervisor")
break
computer_call = computer_calls[0]
if not hasattr(computer_call, 'call_id') or not hasattr(computer_call, 'action'):
print("Computer call is missing required attributes.")
break
call_id = computer_call.call_id
action = computer_call.action
# Handle safety checks
acknowledged_checks = []
if hasattr(computer_call, 'pending_safety_checks') and computer_call.pending_safety_checks:
pending_checks = computer_call.pending_safety_checks
print("\nSafety checks required:")
for check in pending_checks:
print(f"- {check.code}: {check.message}")
if input("\nDo you want to proceed? (y/n): ").lower() != 'y':
print("Operation cancelled by user.")
break
acknowledged_checks = pending_checks
# Execute the action
try:
await page.bring_to_front()
await handle_action(page, action)
# Check if a new page was created after the action
if action.type in ["click"]:
await asyncio.sleep(1.5)
# Get all pages in the context
all_pages = page.context.pages
# If we have multiple pages, check if there's a newer one
if len(all_pages) > 1:
newest_page = all_pages[-1] # Last page is usually the newest
if newest_page != page and newest_page.url not in ["about:blank", ""]:
print(f"\tSwitching to new tab: {newest_page.url}")
page = newest_page # Update our page reference
elif action.type != "wait":
await asyncio.sleep(0.5)
except Exception as e:
print(f"Error handling action {action.type}: {e}")
import traceback
traceback.print_exc()
# Take a screenshot after the action
screenshot_base64 = await take_screenshot(page)
print("\tNew screenshot taken")
# Prepare input for the next request
input_content = [{
"type": "computer_call_output",
"call_id": call_id,
"output": {
"type": "input_image",
"image_url": f"data:image/png;base64,{screenshot_base64}"
}
}]
# Add acknowledged safety checks if any
if acknowledged_checks:
acknowledged_checks_dicts = []
for check in acknowledged_checks:
acknowledged_checks_dicts.append({
"id": check.id,
"code": check.code,
"message": check.message
})
input_content[0]["acknowledged_safety_checks"] = acknowledged_checks_dicts
# Add current URL for context
try:
current_url = page.url
if current_url and current_url != "about:blank":
input_content[0]["current_url"] = current_url
print(f"\tCurrent URL: {current_url}")
except Exception as e:
print(f"Error getting URL: {e}")
# Send the screenshot back for the next step
try:
response = client.responses.create(
model=MODEL,
previous_response_id=response_id,
tools=[{
"type": "computer_use_preview",
"display_width": DISPLAY_WIDTH,
"display_height": DISPLAY_HEIGHT,
"environment": "browser"
}],
input=input_content,
truncation="auto"
)
print("\tModel processing screenshot")
except Exception as e:
print(f"Error in API call: {e}")
import traceback
traceback.print_exc()
break
if iteration >= max_iterations - 1:
print("Reached maximum number of iterations. Stopping.")
async def main():
# Initialize OpenAI client
client = AzureOpenAI(
base_url=BASE_URL,
azure_ad_token_provider=token_provider,
api_version=API_VERSION
)
# Initialize Playwright
async with async_playwright() as playwright:
browser = await playwright.chromium.launch(
headless=False,
args=[f"--window-size={DISPLAY_WIDTH},{DISPLAY_HEIGHT}", "--disable-extensions"]
)
context = await browser.new_context(
viewport={"width": DISPLAY_WIDTH, "height": DISPLAY_HEIGHT},
accept_downloads=True
)
page = await context.new_page()
# Navigate to starting page
await page.goto("https://www.bing.com", wait_until="domcontentloaded")
print("Browser initialized to Bing.com")
# Main interaction loop
try:
while True:
print("\n" + "="*50)
user_input = input("Enter a task to perform (or 'exit' to quit): ")
if user_input.lower() in ('exit', 'quit'):
break
if not user_input.strip():
continue
# Take initial screenshot
screenshot_base64 = await take_screenshot(page)
print("\nTake initial screenshot")
# Initial request to the model
response = client.responses.create(
model=MODEL,
tools=[{
"type": "computer_use_preview",
"display_width": DISPLAY_WIDTH,
"display_height": DISPLAY_HEIGHT,
"environment": "browser"
}],
instructions = "You are an AI agent with the ability to control a browser. You can control the keyboard and mouse. You take a screenshot after each action to check if your action was successful. Once you have completed the requested task you should stop running and pass back control to your human supervisor.",
input=[{
"role": "user",
"content": [{
"type": "input_text",
"text": user_input
}, {
"type": "input_image",
"image_url": f"data:image/png;base64,{screenshot_base64}"
}]
}],
reasoning={"generate_summary": "concise"},
truncation="auto"
)
print("\nSending model initial screenshot and instructions")
# Process model actions
await process_model_response(client, response, page)
except Exception as e:
print(f"An error occurred: {e}")
import traceback
traceback.print_exc()
finally:
# Close browser
await context.close()
await browser.close()
print("Browser closed.")
if __name__ == "__main__":
asyncio.run(main())