A2A Protocol

تنفيذ CurrencyAgent باستخدام A2A Python SDK

MILO
Share
تنفيذ CurrencyAgent باستخدام A2A Python SDK

يتحدث SDK الرسمي من Google a2a-python تحديثات متكررة، ويجب تحديث برنامجنا التعليمي أيضًا. في هذه المقالة، سنقوم بتنفيذ CurrencyAgent بسيط باستخدام الإصدار 0.2.3 من a2a-python SDK.

المحتويات

الكود المصدري

الكود المصدري للمشروع متاح على a2a-python-currency. نرحب بـ star.

المتطلبات

  • uv 0.7.2، لإدارة المشروع
  • Python 3.13+، هذا الإصدار مطلوب من قبل a2a-python
  • apiKey و baseURL من openai/openrouter. أنا أستخدم OpenRouter، الذي يوفر المزيد من خيارات النماذج.

الخطوات التفصيلية

إنشاء المشروع

uv init a2a-python-currency
cd a2a-python-currency

إنشاء البيئة الافتراضية

uv venv
source .venv/bin/activate

إضافة التبعيات

uv add a2a-sdk uvicorn dotenv click

تكوين متغيرات البيئة

echo OPENROUTER_API_KEY=your_api_key >> .env
echo OPENROUTER_BASE_URL=your_base_url >> .env

# مثال
OPENROUTER_API_KEY=مفتاح_API_الخاص_بك_في_OpenRouter
OPENROUTER_BASE_URL="https://openrouter.ai/api/v1"

إنشاء Agent

الكود الكامل كالتالي:

import logging
import json
from typing import Any, Dict, List, Optional
import httpx
from os import getenv
from dotenv import load_dotenv
from collections.abc import AsyncIterable

load_dotenv()

logger = logging.getLogger(__name__)

class CurrencyAgent:
    """Currency Conversion Agent using OpenAI API."""

    SYSTEM_PROMPT = """You are a specialized assistant for currency conversions.
Your sole purpose is to use the 'get_exchange_rate' tool to answer questions about currency exchange rates.
If the user asks about anything other than currency conversion or exchange rates,
politely state that you cannot help with that topic and can only assist with currency-related queries.
Do not attempt to answer unrelated questions or use tools for other purposes.

You have access to the following tool:
- get_exchange_rate: Get current exchange rate between two currencies

When using the tool, respond in the following JSON format:
{
    "status": "completed" | "input_required" | "error",
    "message": "your response message"
}

If you need to use the tool, respond with:
{
    "status": "tool_use",
    "tool": "get_exchange_rate",
    "parameters": {
        "currency_from": "USD",
        "currency_to": "EUR",
        "currency_date": "latest"
    }
}
Note: Return the response in the JSON format, only json is allowed.
"""

    def __init__(self):
        self.api_key = getenv("OPENROUTER_API_KEY")
        self.api_base = getenv("OPENROUTER_BASE_URL")
        self.model = "anthropic/claude-3.7-sonnet"
        self.conversation_history: List[Dict[str, str]] = []

    async def get_exchange_rate(
        self,
        currency_from: str = 'USD',
        currency_to: str = 'EUR',
        currency_date: str = 'latest',
    ) -> Dict[str, Any]:
        """Get current exchange rate between currencies."""
        try:
            response = httpx.get(
                f'https://api.frankfurter.app/{currency_date}',
                params={'from': currency_from, 'to': currency_to},
            )
            response.raise_for_status()
            data = response.json()
            if 'rates' not in data:
                logger.error(f'rates not found in response: {data}')
                return {'error': 'Invalid API response format.'}
            logger.info(f'API response: {data}')
            return data
        except httpx.HTTPError as e:
            logger.error(f'API request failed: {e}')
            return {'error': f'API request failed: {e}'}
        except ValueError:
            logger.error('Invalid JSON response from API')
            return {'error': 'Invalid JSON response from API.'}

    async def _call_openai(self, messages: List[Dict[str, str]]) -> Dict[str, Any]:
        """Call OpenAI API through OpenRouter."""
        async with httpx.AsyncClient() as client:
            response = await client.post(
                f"{self.api_base}/chat/completions",
                headers={
                    "Authorization": f"Bearer {self.api_key}",
                    "Content-Type": "application/json",
                },
                json={
                    "model": self.model,
                    "messages": messages,
                    "temperature": 0.7,
                    "stream": False,
                },
            )
            response.raise_for_status()
            return response.json()

    async def stream(self, query: str, session_id: str) -> AsyncIterable[Dict[str, Any]]:
        """Stream the response for a given query."""
        # Add user message to conversation history
        self.conversation_history.append({"role": "user", "content": query})

        # Prepare messages for API call
        messages = [{"role": "system", "content": self.SYSTEM_PROMPT}] + self.conversation_history

        # Get response from OpenAI
        response = await self._call_openai(messages)
        assistant_message = response["choices"][0]["message"]["content"]
        print(assistant_message)
        try:
            # Try to parse the response as JSON
            parsed_response = json.loads(assistant_message)
            
            # If it's a tool use request
            if parsed_response.get("status") == "tool_use":
                tool_name = parsed_response["tool"]
                parameters = parsed_response["parameters"]
                
                # Yield tool usage status
                yield {
                    "is_task_complete": False,
                    "require_user_input": False,
                    "content": "Looking up the exchange rates..."
                }
                
                if tool_name == "get_exchange_rate":
                    # Yield processing status
                    yield {
                        "is_task_complete": False,
                        "require_user_input": False,
                        "content": "Processing the exchange rates..."
                    }
                    
                    tool_result = await self.get_exchange_rate(**parameters)
                    
                    # Add tool result to conversation history
                    self.conversation_history.append({
                        "role": "assistant",
                        "content": json.dumps({"tool_result": tool_result})
                    })
                    
                    # Get final response after tool use
                    final_response = await self._call_openai(messages)
                    final_message = final_response["choices"][0]["message"]["content"]
                    parsed_response = json.loads(final_message)

            # Add assistant response to conversation history
            self.conversation_history.append({"role": "assistant", "content": assistant_message})
            
            # Yield final response
            if parsed_response["status"] == "completed":
                yield {
                    "is_task_complete": True,
                    "require_user_input": False,
                    "content": parsed_response["message"]
                }
            elif parsed_response["status"] in ["input_required", "error"]:
                yield {
                    "is_task_complete": False,
                    "require_user_input": True,
                    "content": parsed_response["message"]
                }
            else:
                yield {
                    "is_task_complete": False,
                    "require_user_input": True,
                    "content": "We are unable to process your request at the moment. Please try again."
                }

        except json.JSONDecodeError:
            # If response is not valid JSON, return error
            yield {
                "is_task_complete": False,
                "require_user_input": True,
                "content": "Invalid response format from the model."
            } 

تحليل الوظائف الرئيسية والمنطق التنفيذي:

1. الوظائف الأساسية

  • متخصص في معالجة استعلامات تحويل العملات وأسعار الصرف
  • استخدام Frankfurter API للحصول على بيانات أسعار الصرف في الوقت الفعلي
  • معالجة المحادثات من خلال نموذج Claude 3.7 Sonnet عبر OpenRouter

2. هندسة النظام

يتكون Agent من عدة مكونات رئيسية:

2.1 نظام الـ Prompt
  • يحدد الغرض المحدد للـ Agent: معالجة استعلامات تحويل العملات فقط
  • يحدد تنسيق الاستجابة: يجب استخدام تنسيق JSON
  • يحدد استخدام الأدوات: استخدام أداة get_exchange_rate للحصول على معلومات أسعار الصرف
2.2 الطرق الرئيسية
  1. طريقة التهيئة __init__

    • تكوين مفتاح API والـ URL الأساسي
    • تهيئة سجل المحادثة
  2. طريقة استعلام سعر الصرف get_exchange_rate

    • المعلمات: العملة المصدر، العملة المستهدفة، التاريخ (أحدث افتراضيًا)
    • استدعاء Frankfurter API للحصول على بيانات أسعار الصرف
    • إرجاع معلومات سعر الصرف بتنسيق JSON
  3. طريقة البث stream

    • يوفر وظيفة استجابة البث
    • إرجاع حالة المعالجة والنتائج في الوقت الفعلي
    • دعم التغذية الراجعة للحالة المتوسطة لاستدعاءات الأدوات

3. سير العمل

  1. استلام استعلام المستخدم

    • إضافة رسالة المستخدم إلى سجل المحادثة
  2. معالجة النموذج

    • إرسال نظام الـ Prompt وسجل المحادثة إلى النموذج
    • تحليل النموذج ما إذا كان يحتاج إلى استخدام أداة
  3. استدعاء الأداة (إذا لزم الأمر)

    • إذا قرر النموذج استخدام أداة، فإنه يرجع طلب استدعاء الأداة
    • تنفيذ استعلام سعر الصرف
    • إضافة نتائج الاستعلام إلى سجل المحادثة
  4. إنشاء الاستجابة النهائية

    • إنشاء الإجابة النهائية بناءً على نتائج استدعاء الأداة
    • إرجاع استجابة JSON منسقة

4. تنسيق الاستجابة

تستخدم استجابات Agent دائمًا تنسيق JSON مع الحالات التالية:

  • completed: اكتمل المهمة
  • input_required: مطلوب إدخال المستخدم
  • error: حدث خطأ
  • tool_use: مطلوب استخدام أداة

5. معالجة الأخطاء

  • يتضمن آلية كاملة لمعالجة الأخطاء
  • معالجة فشل استدعاءات API
  • معالجة أخطاء تحليل JSON
  • معالجة تنسيقات الاستجابة غير الصالحة

اختبار Agent

كود الاختبار كالتالي:

import asyncio
import logging
from currency_agent import CurrencyAgent

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

async def main():
    agent = CurrencyAgent()
    
    # حالات الاختبار
    test_queries = [
        "What is the current exchange rate from USD to EUR?",
        "Can you tell me the exchange rate between GBP and JPY?",
        "What's the weather like today?",  # يجب رفض هذا لأنه لا يتعلق بالعملة
    ]
    
    for query in test_queries:
        logger.info(f"\nTesting query: {query}")
        async for response in agent.stream(query, "test_session"):
            logger.info(f"Response: {response}")

if __name__ == "__main__":
    asyncio.run(main()) 

إذا كان كل شيء مكونًا بشكل صحيح، خاصة تكوين البيئة، يجب أن ترى إخراجًا مشابهًا:

uv run python test_currency_agent.py
INFO:__main__:
Testing query: What is the current exchange rate from USD to EUR?
INFO:httpx:HTTP Request: POST https://openrouter.ai/api/v1/chat/completions "HTTP/1.1 200 OK"
INFO:__main__:Response: {'is_task_complete': False, 'require_user_input': False, 'content': 'Looking up the exchange rates...'}
INFO:__main__:Response: {'is_task_complete': False, 'require_user_input': False, 'content': 'Processing the exchange rates...'}
INFO:httpx:HTTP Request: GET https://api.frankfurter.app/latest?from=USD&to=EUR "HTTP/1.1 200 OK"
INFO:currency_agent:API response: {'amount': 1.0, 'base': 'USD', 'date': '2025-05-20', 'rates': {'EUR': 0.8896}}
INFO:httpx:HTTP Request: POST https://openrouter.ai/api/v1/chat/completions "HTTP/1.1 200 OK"
INFO:currency_agent:Final message: {'role': 'assistant', 'content': '{\n    "status": "completed",\n    "message": "The current exchange rate from USD to EUR is 0.8896. This means that 1 US Dollar equals 0.8896 Euros as of May 20, 2025."\n}', 'refusal': None, 'reasoning': None}
INFO:__main__:Response: {'is_task_complete': True, 'require_user_input': False, 'content': 'The current exchange rate from USD to EUR is 0.8896. This means that 1 US Dollar equals 0.8896 Euros as of May 20, 2025.'}

تنفيذ AgentExecutor

from currency_agent import CurrencyAgent  # type: ignore[import-untyped]

from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events.event_queue import EventQueue
from a2a.types import (
    TaskArtifactUpdateEvent,
    TaskState,
    TaskStatus,
    TaskStatusUpdateEvent,
)
from a2a.utils import new_agent_text_message, new_task, new_text_artifact


class CurrencyAgentExecutor(AgentExecutor):
    """Currency AgentExecutor Example."""

    def __init__(self):
        self.agent = CurrencyAgent()

    async def execute(
        self,
        context: RequestContext,
        event_queue: EventQueue,
    ) -> None:
        query = context.get_user_input()
        task = context.current_task

        if not context.message:
            raise Exception('No message provided')

        if not task:
            task = new_task(context.message)
            event_queue.enqueue_event(task)
        # invoke the underlying agent, using streaming results
        async for event in self.agent.stream(query, task.contextId):
            if event['is_task_complete']:
                event_queue.enqueue_event(
                    TaskArtifactUpdateEvent(
                        append=False,
                        contextId=task.contextId,
                        taskId=task.id,
                        lastChunk=True,
                        artifact=new_text_artifact(
                            name='current_result',
                            description='Result of request to agent.',
                            text=event['content'],
                        ),
                    )
                )
                event_queue.enqueue_event(
                    TaskStatusUpdateEvent(
                        status=TaskStatus(state=TaskState.completed),
                        final=True,
                        contextId=task.contextId,
                        taskId=task.id,
                    )
                )
            elif event['require_user_input']:
                event_queue.enqueue_event(
                    TaskStatusUpdateEvent(
                        status=TaskStatus(
                            state=TaskState.input_required,
                            message=new_agent_text_message(
                                event['content'],
                                task.contextId,
                                task.id,
                            ),
                        ),
                        final=True,
                        contextId=task.contextId,
                        taskId=task.id,
                    )
                )
            else:
                event_queue.enqueue_event(
                    TaskStatusUpdateEvent(
                        status=TaskStatus(
                            state=TaskState.working,
                            message=new_agent_text_message(
                                event['content'],
                                task.contextId,
                                task.id,
                            ),
                        ),
                        final=False,
                        contextId=task.contextId,
                        taskId=task.id,
                    )
                )

    async def cancel(
        self, context: RequestContext, event_queue: EventQueue
    ) -> None:
        raise Exception('cancel not supported')

تحليل منطق هذا الكود: هذا هو فئة AgentExecutor تسمى CurrencyAgentExecutor التي تتعامل بشكل أساسي مع عمليات الوكيل المتعلقة بالعملة. دعنا نحلل هيكلها ووظائفها بالتفصيل:

يتم تنفيذ المنطق الأساسي لمعالجة طلبات A2A وتوليد الاستجابات/الأحداث بواسطة AgentExecutor. يوفر A2A Python SDK فئة أساسية مجردة a2a.server.agent_execution.AgentExecutor التي تحتاج إلى تنفيذها.

تحدد فئة AgentExecutor طريقتين رئيسيتين:

  • async def execute(self, context: RequestContext, event_queue: EventQueue): يتعامل مع الطلبات الواردة التي تتطلب استجابات أو تدفقات أحداث. يقوم بمعالجة إدخال المستخدم (الذي تم الحصول عليه من خلال السياق) ويستخدم event_queue لإرسال كائنات Message أو Task أو TaskStatusUpdateEvent أو TaskArtifactUpdateEvent.
  • async def cancel(self, context: RequestContext, event_queue: EventQueue): يتعامل مع طلبات إلغاء المهام الجارية.

يوفر RequestContext معلومات حول الطلب الوارد، مثل رسالة المستخدم وأي تفاصيل مهمة موجودة. يتم استخدام EventQueue بواسطة الوكيل لإرسال الأحداث إلى العميل.

تنفيذ AgentServer

الكود:

import os
import sys

import click
import httpx

from currency_agent import CurrencyAgent  # type: ignore[import-untyped]
from agent_executor import CurrencyAgentExecutor  # type: ignore[import-untyped]
from dotenv import load_dotenv

from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryPushNotifier, InMemoryTaskStore
from a2a.types import AgentCapabilities, AgentCard, AgentSkill


load_dotenv()


@click.command()
@click.option('--host', 'host', default='localhost')
@click.option('--port', 'port', default=10000)
def main(host: str, port: int):

    client = httpx.AsyncClient()
    request_handler = DefaultRequestHandler(
        agent_executor=CurrencyAgentExecutor(),
        task_store=InMemoryTaskStore(),
        push_notifier=InMemoryPushNotifier(client),
    )

    server = A2AStarletteApplication(
        agent_card=get_agent_card(host, port), http_handler=request_handler
    )
    import uvicorn

    uvicorn.run(server.build(), host=host, port=port)


def get_agent_card(host: str, port: int):
    """Returns the Agent Card for the Currency Agent."""
    capabilities = AgentCapabilities(streaming=True, pushNotifications=True)
    skill = AgentSkill(
        id='convert_currency',
        name='Currency Exchange Rates Tool',
        description='Helps with exchange values between various currencies',
        tags=['currency conversion', 'currency exchange'],
        examples=['What is exchange rate between USD and GBP?'],
    )
    return AgentCard(
        name='Currency Agent',
        description='Helps with exchange rates for currencies',
        url=f'http://{host}:{port}/',
        version='1.0.0',
        defaultInputModes=CurrencyAgent.SUPPORTED_CONTENT_TYPES,
        defaultOutputModes=CurrencyAgent.SUPPORTED_CONTENT_TYPES,
        capabilities=capabilities,
        skills=[skill],
    )


if __name__ == '__main__':
    main()

AgentSkill

يصف AgentSkill المهارات أو الوظائف المحددة التي يمكن للوكيل تنفيذها. إنه لبنة بناء تخبر العميل بنوع المهام التي يكون الوكيل مناسبًا لتنفيذها. الخصائص الرئيسية لـ AgentSkill (المعرفة في a2a.types):

  • id: المعرف الفريد للمهارة
  • name: اسم مقروء للإنسان
  • description: شرح أكثر تفصيلاً لوظيفة المهارة
  • tags: الكلمات الرئيسية للتصنيف والاكتشاف
  • examples: أمثلة على الـ prompts أو حالات الاستخدام
  • inputModes / outputModes: أنواع MIME المدعومة للإدخال والإخراج (مثل "text/plain"، "application/json")

هذه المهارة بسيطة جدًا: التعامل مع تحويل العملات، الإدخال والإخراج هما text، معرف في AgentCard.

AgentCard

AgentCard هو مستند JSON يوفره خادم A2A، وعادة ما يكون موجودًا في نقطة النهاية .well-known/agent.json. إنه مثل البطاقة الرقمية للوكيل. الخصائص الرئيسية لـ AgentCard (المعرفة في a2a.types):

  • name، description، version: معلومات الهوية الأساسية
  • url: نقطة النهاية للوصول إلى خدمة A2A
  • capabilities: يحدد ميزات A2A المدعومة، مثل streaming أو pushNotifications
  • defaultInputModes / defaultOutputModes: أنواع MIME الافتراضية للوكيل
  • skills: قائمة كائنات AgentSkill التي يوفرها الوكيل

AgentServer

  • DefaultRequestHandler: يوفر SDK DefaultRequestHandler. يأخذ هذا المعالج تنفيذ AgentExecutor (هنا CurrencyAgentExecutor) و TaskStore (هنا InMemoryTaskStore). يقوم بتوجيه استدعاءات A2A RPC الواردة إلى الطرق المناسبة على الوكيل (مثل execute أو cancel). يستخدم TaskStore بواسطة DefaultRequestHandler لإدارة دورة حياة المهام، خاصة للتفاعلات ذات الحالة، والبث، وإعادة الاشتراك. حتى لو كان AgentExecutor بسيطًا، يحتاج المعالج إلى مخزن مهام.

  • A2AStarletteApplication: يتم تهيئة فئة A2AStarletteApplication باستخدام agent_card و request_handler (يسمى http_handler في المُنشئ). agent_card مهم جدًا لأن الخادم سيعرضه افتراضيًا في نقطة النهاية /.well-known/agent.json. request_handler مسؤول عن معالجة جميع استدعاءات طريقة A2A الواردة من خلال التفاعل مع AgentExecutor الخاص بك.

  • uvicorn.run(server_app_builder.build(), ...): لدى A2AStarletteApplication طريقة build() لبناء تطبيق Starlette الفعلي. ثم يتم تشغيل هذا التطبيق باستخدام uvicorn.run()، مما يجعل الوكيل الخاص بك يمكن الوصول إليه عبر HTTP. host='0.0.0.0' يجعل الخادم يمكن الوصول إليه على جميع واجهات الشبكة على جهازك. port=9999 يحدد المنفذ للاستماع. هذا يتطابق مع url في AgentCard.

التشغيل

تشغيل الخادم

uv run python main.py

الإخراج:

INFO:     Started server process [70842]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://localhost:10000 (Press CTRL+C to quit)

تشغيل العميل

كود العميل كالتالي:

from a2a.client import A2AClient
from typing import Any
from uuid import uuid4
from a2a.types import (
    SendMessageResponse,
    GetTaskResponse,
    SendMessageSuccessResponse,
    Task,
    TaskState,
    SendMessageRequest,
    MessageSendParams,
    GetTaskRequest,
    TaskQueryParams,
    SendStreamingMessageRequest,
)
import httpx
import traceback

AGENT_URL = 'http://localhost:10000'


def create_send_message_payload(
    text: str, task_id: str | None = None, context_id: str | None = None
) -> dict[str, Any]:
    """Helper function to create the payload for sending a task."""
    payload: dict[str, Any] = {
        'message': {
            'role': 'user',
            'parts': [{'kind': 'text', 'text': text}],
            'messageId': uuid4().hex,
        },
    }

    if task_id:
        payload['message']['taskId'] = task_id

    if context_id:
        payload['message']['contextId'] = context_id
    return payload


def print_json_response(response: Any, description: str) -> None:
    """Helper function to print the JSON representation of a response."""
    print(f'--- {description} ---')
    if hasattr(response, 'root'):
        print(f'{response.root.model_dump_json(exclude_none=True)}\n')
    else:
        print(f'{response.model_dump(mode="json", exclude_none=True)}\n')


async def run_single_turn_test(client: A2AClient) -> None:
    """Runs a single-turn non-streaming test."""

    send_payload = create_send_message_payload(
        text='how much is 100 USD in CAD?'
    )
    request = SendMessageRequest(params=MessageSendParams(**send_payload))

    print('--- Single Turn Request ---')
    # Send Message
    send_response: SendMessageResponse = await client.send_message(request)
    print_json_response(send_response, 'Single Turn Request Response')
    if not isinstance(send_response.root, SendMessageSuccessResponse):
        print('received non-success response. Aborting get task ')
        return

    if not isinstance(send_response.root.result, Task):
        print('received non-task response. Aborting get task ')
        return

    task_id: str = send_response.root.result.id
    print('---Query Task---')
    # query the task
    get_request = GetTaskRequest(params=TaskQueryParams(id=task_id))
    get_response: GetTaskResponse = await client.get_task(get_request)
    print_json_response(get_response, 'Query Task Response')


async def run_streaming_test(client: A2AClient) -> None:
    """Runs a single-turn streaming test."""

    send_payload = create_send_message_payload(
        text='how much is 50 EUR in JPY?'
    )

    request = SendStreamingMessageRequest(
        params=MessageSendParams(**send_payload)
    )

    print('--- Single Turn Streaming Request ---')
    stream_response = client.send_message_streaming(request)
    async for chunk in stream_response:
        print_json_response(chunk, 'Streaming Chunk')


async def run_multi_turn_test(client: A2AClient) -> None:
    """Runs a multi-turn non-streaming test."""
    print('--- Multi-Turn Request ---')
    # --- First Turn ---

    first_turn_payload = create_send_message_payload(
        text='how much is 100 USD?'
    )
    request1 = SendMessageRequest(
        params=MessageSendParams(**first_turn_payload)
    )
    first_turn_response: SendMessageResponse = await client.send_message(
        request1
    )
    print_json_response(first_turn_response, 'Multi-Turn: First Turn Response')

    context_id: str | None = None
    if isinstance(
        first_turn_response.root, SendMessageSuccessResponse
    ) and isinstance(first_turn_response.root.result, Task):
        task: Task = first_turn_response.root.result
        context_id = task.contextId  # Capture context ID

        # --- Second Turn (if input required) ---
        if task.status.state == TaskState.input_required and context_id:
            print('--- Multi-Turn: Second Turn (Input Required) ---')
            second_turn_payload = create_send_message_payload(
                'in GBP', task.id, context_id
            )
            request2 = SendMessageRequest(
                params=MessageSendParams(**second_turn_payload)
            )
            second_turn_response = await client.send_message(request2)
            print_json_response(
                second_turn_response, 'Multi-Turn: Second Turn Response'
            )
        elif not context_id:
            print('Warning: Could not get context ID from first turn response.')
        else:
            print(
                'First turn completed, no further input required for this test case.'
            )


async def main() -> None:
    """Main function to run the tests."""
    print(f'Connecting to agent at {AGENT_URL}...')
    try:
        async with httpx.AsyncClient(timeout=100) as httpx_client:
            client = await A2AClient.get_client_from_agent_card_url(
                httpx_client, AGENT_URL
            )
            print('Connection successful.')
            await run_single_turn_test(client)
            await run_streaming_test(client)
            await run_multi_turn_test(client)

    except Exception as e:
        traceback.print_exc()
        print(f'An error occurred: {e}')
        print('Ensure the agent server is running.')


if __name__ == '__main__':
    import asyncio

    asyncio.run(main())

تشغيله كالتالي:

uv run python test_client.py

انتهى البرنامج التعليمي.