Implementierung von CurrencyAgent mit A2A Python SDK

Das offizielle Google SDK a2a-python wird häufig aktualisiert, und unser Tutorial muss ebenfalls aktualisiert werden. In diesem Artikel implementieren wir einen einfachen CurrencyAgent mit Version 0.2.3
des a2a-python SDK.
Inhaltsverzeichnis
- Quellcode
- Vorbereitung
- Detaillierte Schritte
Quellcode
Der Quellcode des Projekts ist unter a2a-python-currency verfügbar. Ein Stern ist willkommen.
Vorbereitung
- uv 0.7.2, für die Projektverwaltung
- Python 3.13+, erforderliche Version für a2a-python
- openai/openrouter apiKey und baseURL. Ich verwende OpenRouter, das mehr Modelloptionen bietet.
Detaillierte Schritte
Projekterstellung
uv init a2a-python-currency
cd a2a-python-currency
Erstellung der virtuellen Umgebung
uv venv
source .venv/bin/activate
Hinzufügen von Abhängigkeiten
uv add a2a-sdk uvicorn dotenv click
Umgebungsvariablen einrichten
echo OPENROUTER_API_KEY=your_api_key >> .env
echo OPENROUTER_BASE_URL=your_base_url >> .env
# Beispiel
OPENROUTER_API_KEY=Ihr OpenRouter API-Schlüssel
OPENROUTER_BASE_URL="https://openrouter.ai/api/v1"
Agent erstellen
Der vollständige Code lautet wie folgt:
import logging
import json
from typing import Any, Dict, List, Optional
import httpx
from os import getenv
from dotenv import load_dotenv
from collections.abc import AsyncIterable
load_dotenv()
logger = logging.getLogger(__name__)
class CurrencyAgent:
"""Ein Währungsagent mit OpenAI API."""
SYSTEM_PROMPT = """Sie sind ein Assistent, der sich auf Währungsumrechnungen spezialisiert hat.
Ihr einziger Zweck ist es, das 'tool_use'-Tool zu verwenden, um Fragen zu Wechselkursen zu beantworten.
Wenn der Benutzer Fragen stellt, die nichts mit Währungsumrechnungen oder Wechselkursen zu tun haben,
teilen Sie höflich mit, dass Sie bei diesem Thema nicht helfen können und nur bei währungsbezogenen Fragen helfen können.
Beantworten Sie keine nicht relevanten Fragen und versuchen Sie nicht, das Tool für andere Zwecke zu verwenden.
Sie haben Zugriff auf folgende Tools:
- get_exchange_rate: Aktuelle Wechselkurse zwischen zwei Währungen abrufen
Wenn Sie ein Tool verwenden, antworten Sie im folgenden JSON-Format:
{
"status": "completed" | "input_required" | "error",
"message": "Ihre Antwortnachricht"
}
Wenn Sie ein Tool verwenden müssen, antworten Sie wie folgt:
{
"status": "tool_use",
"tool": "get_exchange_rate",
"parameters": {
"currency_from": "USD",
"currency_to": "EUR",
"currency_date": "latest"
}
}
Hinweis: Antworten Sie nur im JSON-Format. Nur JSON ist erlaubt.
"""
def __init__(self):
self.api_key = getenv("OPENROUTER_API_KEY")
self.api_base = getenv("OPENROUTER_BASE_URL")
self.model = "anthropic/claude-3.7-sonnet"
self.conversation_history: List[Dict[str, str]] = []
async def get_exchange_rate(
self,
currency_from: str = 'USD',
currency_to: str = 'EUR',
currency_date: str = 'latest',
) -> Dict[str, Any]:
"""Aktuelle Wechselkurse zwischen Währungen abrufen."""
try:
response = httpx.get(
f'https://api.frankfurter.app/{currency_date}',
params={'from': currency_from, 'to': currency_to},
)
response.raise_for_status()
data = response.json()
if 'rates' not in data:
logger.error(f'rates not found in response: {data}')
return {'error': 'Ungültiges API-Antwortformat.'}
logger.info(f'API response: {data}')
return data
except httpx.HTTPError as e:
logger.error(f'API request failed: {e}')
return {'error': f'API-Anfrage fehlgeschlagen: {e}'}
except ValueError:
logger.error('Invalid JSON response from API')
return {'error': 'Ungültige JSON-Antwort von der API.'}
async def _call_openai(self, messages: List[Dict[str, str]]) -> Dict[str, Any]:
"""OpenAI API über OpenRouter aufrufen."""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.api_base}/chat/completions",
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
},
json={
"model": self.model,
"messages": messages,
"temperature": 0.7,
"stream": False,
},
)
response.raise_for_status()
return response.json()
async def stream(self, query: str, session_id: str) -> AsyncIterable[Dict[str, Any]]:
"""Antworten auf die angegebene Abfrage streamen."""
# Benutzernachricht zum Gesprächsverlauf hinzufügen
self.conversation_history.append({"role": "user", "content": query})
# Nachrichten für API-Aufruf vorbereiten
messages = [{"role": "system", "content": self.SYSTEM_PROMPT}] + self.conversation_history
# Antwort von OpenAI abrufen
response = await self._call_openai(messages)
assistant_message = response["choices"][0]["message"]["content"]
print(assistant_message)
try:
# Antwort als JSON parsen
parsed_response = json.loads(assistant_message)
# Bei Tool-Verwendungsanfrage
if parsed_response.get("status") == "tool_use":
tool_name = parsed_response["tool"]
parameters = parsed_response["parameters"]
# Tool-Verwendungsstatus ausgeben
yield {
"is_task_complete": False,
"require_user_input": False,
"content": "Suche Wechselkurse..."
}
if tool_name == "get_exchange_rate":
# Verarbeitungsstatus ausgeben
yield {
"is_task_complete": False,
"require_user_input": False,
"content": "Verarbeite Wechselkurse..."
}
tool_result = await self.get_exchange_rate(**parameters)
# Tool-Ergebnis zum Gesprächsverlauf hinzufügen
self.conversation_history.append({
"role": "assistant",
"content": json.dumps({"tool_result": tool_result})
})
# Endgültige Antwort nach Tool-Verwendung abrufen
final_response = await self._call_openai(messages)
final_message = final_response["choices"][0]["message"]["content"]
parsed_response = json.loads(final_message)
# Assistentenantwort zum Gesprächsverlauf hinzufügen
self.conversation_history.append({"role": "assistant", "content": assistant_message})
# Endgültige Antwort ausgeben
if parsed_response["status"] == "completed":
yield {
"is_task_complete": True,
"require_user_input": False,
"content": parsed_response["message"]
}
elif parsed_response["status"] in ["input_required", "error"]:
yield {
"is_task_complete": False,
"require_user_input": True,
"content": parsed_response["message"]
}
else:
yield {
"is_task_complete": False,
"require_user_input": True,
"content": "Anfrage kann derzeit nicht verarbeitet werden. Bitte versuchen Sie es erneut."
}
except json.JSONDecodeError:
# Bei ungültigem JSON in der Antwort Fehler zurückgeben
yield {
"is_task_complete": False,
"require_user_input": True,
"content": "Ungültiges Antwortformat vom Modell."
}
Analyse der Hauptfunktionen und Implementierungslogik:
1. Hauptfunktionen
- Spezialisiert auf Währungsumrechnungen und Wechselkursanfragen
- Verwendet Frankfurter API für Echtzeit-Wechselkursdaten
- Verarbeitet Konversationen über OpenRouter mit dem Claude 3.7 Sonnet-Modell
2. Systemarchitektur
Der Agent besteht aus folgenden Hauptkomponenten:
2.1 System Prompt
- Definiert den spezifischen Zweck des Agents: Verarbeitung nur von Währungsumrechnungsanfragen
- Definiert das Antwortformat: Muss im JSON-Format sein
- Definiert die Tool-Verwendung: Verwendet
get_exchange_rate
-Tool für Wechselkursinformationen
2.2 Hauptmethoden
-
Initialisierungsmethode
__init__
- Setzt API-Schlüssel und Basis-URL
- Initialisiert Gesprächsverlauf
-
Wechselkursabrufmethode
get_exchange_rate
- Parameter: Quellwährung, Zielwährung, Datum (Standard: neueste)
- Ruft Frankfurter API für Wechselkursdaten auf
- Gibt Wechselkursinformationen im JSON-Format zurück
-
Streamingmethode
stream
- Bietet Streaming-Antwortfunktionalität
- Gibt Verarbeitungsstatus und Ergebnisse in Echtzeit zurück
- Unterstützt Zwischenstatus bei Tool-Aufrufen
3. Workflow
-
Benutzeranfrage empfangen
- Fügt Benutzernachricht zum Gesprächsverlauf hinzu
-
Modellverarbeitung
- Sendet System Prompt und Gesprächsverlauf an das Modell
- Modell analysiert, ob Tool-Verwendung erforderlich ist
-
Tool-Aufruf (falls erforderlich)
- Wenn Modell Tool-Verwendung entscheidet, gibt Tool-Aufrufanfrage zurück
- Führt Wechselkursanfrage aus
- Fügt Anfrageergebnis zum Gesprächsverlauf hinzu
-
Endgültige Antwort generieren
- Generiert endgültige Antwort basierend auf Tool-Aufrufergebnis
- Gibt Antwort im JSON-Format zurück
4. Antwortformat
Agent-Antworten verwenden immer JSON-Format mit folgenden Status:
completed
: Aufgabe abgeschlosseninput_required
: Benutzereingabe erforderlicherror
: Fehler aufgetretentool_use
: Tool-Verwendung erforderlich
5. Fehlerbehandlung
- Enthält vollständigen Fehlerbehandlungsmechanismus
- Behandelt API-Aufruf-Fehler
- Behandelt JSON-Parsing-Fehler
- Behandelt ungültige Antwortformate
Agent testen
Der Testcode lautet wie folgt:
import asyncio
import logging
from currency_agent import CurrencyAgent
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def main():
agent = CurrencyAgent()
# Testfälle
test_queries = [
"What is the current exchange rate from USD to EUR?",
"Can you tell me the exchange rate between GBP and JPY?",
"What's the weather like today?", # Sollte abgelehnt werden, da nicht währungsbezogen
]
for query in test_queries:
logger.info(f"\nTesting query: {query}")
async for response in agent.stream(query, "test_session"):
logger.info(f"Response: {response}")
if __name__ == "__main__":
asyncio.run(main())
Bei korrekter Umgebungskonfiguration, insbesondere wenn die Umgebungskonfiguration korrekt ist, sollte folgende Ausgabe erscheinen:
uv run python test_currency_agent.py
INFO:__main__:
Testing query: What is the current exchange rate from USD to EUR?
INFO:httpx:HTTP Request: POST https://openrouter.ai/api/v1/chat/completions "HTTP/1.1 200 OK"
INFO:__main__:Response: {'is_task_complete': False, 'require_user_input': False, 'content': 'Looking up the exchange rates...'}
INFO:__main__:Response: {'is_task_complete': False, 'require_user_input': False, 'content': 'Processing the exchange rates...'}
INFO:httpx:HTTP Request: GET https://api.frankfurter.app/latest?from=USD&to=EUR "HTTP/1.1 200 OK"
INFO:currency_agent:API response: {'amount': 1.0, 'base': 'USD', 'date': '2025-05-20', 'rates': {'EUR': 0.8896}}
INFO:httpx:HTTP Request: POST https://openrouter.ai/api/v1/chat/completions "HTTP/1.1 200 OK"
INFO:currency_agent:Final message: {'role': 'assistant', 'content': '{\n "status": "completed",\n "message": "The current exchange rate from USD to EUR is 0.8896. This means that 1 US Dollar equals 0.8896 Euros as of May 20, 2025."\n}', 'refusal': None, 'reasoning': None}
INFO:__main__:Response: {'is_task_complete': True, 'require_user_input': False, 'content': 'The current exchange rate from USD to EUR is 0.8896. This means that 1 US Dollar equals 0.8896 Euros as of May 20, 2025.'}
AgentExecutor-Implementierung
from currency_agent import CurrencyAgent # type: ignore[import-untyped]
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events.event_queue import EventQueue
from a2a.types import (
TaskArtifactUpdateEvent,
TaskState,
TaskStatus,
TaskStatusUpdateEvent,
)
from a2a.utils import new_agent_text_message, new_task, new_text_artifact
class CurrencyAgentExecutor(AgentExecutor):
"""Beispiel für einen AgentExecutor für Währungen."""
def __init__(self):
self.agent = CurrencyAgent()
async def execute(
self,
context: RequestContext,
event_queue: EventQueue,
) -> None:
query = context.get_user_input()
task = context.current_task
if not context.message:
raise Exception('No message provided')
if not task:
task = new_task(context.message)
event_queue.enqueue_event(task)
# Basisagent mit Streaming-Ergebnissen aufrufen
async for event in self.agent.stream(query, task.contextId):
if event['is_task_complete']:
event_queue.enqueue_event(
TaskArtifactUpdateEvent(
append=False,
contextId=task.contextId,
taskId=task.id,
lastChunk=True,
artifact=new_text_artifact(
name='current_result',
description='Ergebnis der Anfrage an den Agenten.',
text=event['content'],
),
)
)
event_queue.enqueue_event(
TaskStatusUpdateEvent(
status=TaskStatus(state=TaskState.completed),
final=True,
contextId=task.contextId,
taskId=task.id,
)
)
elif event['require_user_input']:
event_queue.enqueue_event(
TaskStatusUpdateEvent(
status=TaskStatus(
state=TaskState.input_required,
message=new_agent_text_message(
event['content'],
task.contextId,
task.id,
),
),
final=True,
contextId=task.contextId,
taskId=task.id,
)
)
else:
event_queue.enqueue_event(
TaskStatusUpdateEvent(
status=TaskStatus(
state=TaskState.working,
message=new_agent_text_message(
event['content'],
task.contextId,
task.id,
),
),
final=False,
contextId=task.contextId,
taskId=task.id,
)
)
async def cancel(
self, context: RequestContext, event_queue: EventQueue
) -> None:
raise Exception('cancel not supported')
Analyse der Logik dieses Codes:
Dies ist eine AgentExecutor-Klasse namens CurrencyAgentExecutor
, die hauptsächlich währungsbezogene Agentenoperationen verarbeitet. Lassen Sie uns ihre Struktur und Funktionalität im Detail analysieren:
Die zentrale Logik für die Verarbeitung von A2A-Anfragen und die Generierung von Antworten/Ereignissen wird über den AgentExecutor implementiert. Das A2A Python SDK stellt eine abstrakte Basisklasse a2a.server.agent_execution.AgentExecutor bereit, die implementiert werden muss.
Die AgentExecutor-Klasse definiert zwei Hauptmethoden:
async def execute(self, context: RequestContext, event_queue: EventQueue)
: Verarbeitet eingehende Anfragen, die Antworten oder Ereignisströme erfordern. Verarbeitet Benutzereingaben aus dem Kontext und sendet Message-, Task-, TaskStatusUpdateEvent- oder TaskArtifactUpdateEvent-Objekte über dieevent_queue
.async def cancel(self, context: RequestContext, event_queue: EventQueue)
: Verarbeitet Anfragen zum Abbrechen des aktuellen Tasks.
RequestContext stellt Informationen über die eingehende Anfrage bereit, wie Benutzernachrichten oder bestehende Task-Details. EventQueue wird vom Agenten verwendet, um Ereignisse an den Client zu senden.
AgentServer-Implementierung
Code:
import os
import sys
import click
import httpx
from currency_agent import CurrencyAgent # type: ignore[import-untyped]
from agent_executor import CurrencyAgentExecutor # type: ignore[import-untyped]
from dotenv import load_dotenv
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryPushNotifier, InMemoryTaskStore
from a2a.types import AgentCapabilities, AgentCard, AgentSkill
load_dotenv()
@click.command()
@click.option('--host', 'host', default='localhost')
@click.option('--port', 'port', default=10000)
def main(host: str, port: int):
client = httpx.AsyncClient()
request_handler = DefaultRequestHandler(
agent_executor=CurrencyAgentExecutor(),
task_store=InMemoryTaskStore(),
push_notifier=InMemoryPushNotifier(client),
)
server = A2AStarletteApplication(
agent_card=get_agent_card(host, port), http_handler=request_handler
)
import uvicorn
uvicorn.run(server.build(), host=host, port=port)
def get_agent_card(host: str, port: int):
"""Gibt die Agentenkarte für den Währungsagenten zurück."""
capabilities = AgentCapabilities(streaming=True, pushNotifications=True)
skill = AgentSkill(
id='convert_currency',
name='Wechselkurs-Tool',
description='Hilft bei Wechselkursen zwischen verschiedenen Währungen',
tags=['Währungsumrechnung', 'Wechselkurs'],
examples=['What is exchange rate between USD and GBP?'],
)
return AgentCard(
name='Währungsagent',
description='Hilft bei Wechselkursen',
url=f'http://{host}:{port}/',
version='1.0.0',
defaultInputModes=CurrencyAgent.SUPPORTED_CONTENT_TYPES,
defaultOutputModes=CurrencyAgent.SUPPORTED_CONTENT_TYPES,
capabilities=capabilities,
skills=[skill],
)
if __name__ == '__main__':
main()
AgentSkill
AgentSkill beschreibt die Fähigkeiten oder spezifischen Funktionen, die ein Agent ausführen kann. Es ist ein Baustein, der Clients mitteilt, für welche Arten von Aufgaben der Agent geeignet ist. Hauptattribute von AgentSkill (definiert in a2a.types):
- id: Eindeutiger Bezeichner für die Fähigkeit
- name: Menschenlesbarer Name
- description: Detailliertere Beschreibung der Funktionalität
- tags: Schlüsselwörter für Klassifizierung und Entdeckung
- examples: Beispiele für Prompts oder Anwendungsfälle
- inputModes / outputModes: Unterstützte MIME-Typen für Ein- und Ausgabe (z.B. "text/plain", "application/json")
Diese Fähigkeit ist sehr einfach: Verarbeitung von Währungsumrechnungen, Ein- und Ausgabe ist text
, definiert in AgentCard.
AgentCard
AgentCard ist ein JSON-Dokument, das vom A2A-Server bereitgestellt wird, typischerweise am Endpunkt .well-known/agent.json
. Es ist wie eine digitale Visitenkarte für den Agenten.
Hauptattribute von AgentCard (definiert in a2a.types):
- name, description, version: Grundlegende Identifikationsinformationen
- url: Endpunkt für den Zugriff auf den A2A-Dienst
- capabilities: Spezifiziert unterstützte A2A-Funktionen wie streaming und pushNotifications
- defaultInputModes / defaultOutputModes: Standard-MIME-Typen des Agenten
- skills: Liste von AgentSkill-Objekten, die der Agent bereitstellt
AgentServer
-
DefaultRequestHandler: Das SDK stellt DefaultRequestHandler bereit. Dieser Handler nimmt eine AgentExecutor-Implementierung (hier CurrencyAgentExecutor) und einen TaskStore (hier InMemoryTaskStore) entgegen. Leitet eingehende A2A RPC-Aufrufe an die entsprechenden Methoden des Agenten wie execute oder cancel weiter. TaskStore wird vom DefaultRequestHandler verwendet, um den Task-Lebenszyklus zu verwalten, insbesondere für zustandsbehaftete Interaktionen, Streaming und Wiederverbindung. Auch wenn der AgentExecutor einfach ist, benötigt der Handler einen Task- Store.
-
A2AStarletteApplication: Die A2AStarletteApplication-Klasse wird mit agent_card und request_handler (im Konstruktor als http_handler bezeichnet) erstellt. agent_card ist sehr wichtig. Der Server stellt dies standardmäßig am Endpunkt
/.well-known/agent.json
bereit. request_handler ist für die Verarbeitung aller eingehenden A2A-Methodenaufrufe über seinen AgentExecutor verantwortlich. -
uvicorn.run(server_app_builder.build(), ...): A2AStarletteApplication hat eine build()-Methode, die die eigentliche Starlette-Anwendung erstellt. Diese Anwendung wird mit
uvicorn.run()
ausgeführt, wodurch der Agent über HTTP zugänglich wird. host='0.0.0.0' macht den Server auf allen Netzwerkschnittstellen der Maschine zugänglich. port=9999 gibt den zu überwachenden Port an. Dies entspricht der url in AgentCard.
Ausführung
Server ausführen
uv run python main.py
Ausgabe:
INFO: Started server process [70842]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://localhost:10000 (Press CTRL+C to quit)
Client ausführen
Der Client-Code lautet wie folgt:
from a2a.client import A2AClient
from typing import Any
from uuid import uuid4
from a2a.types import (
SendMessageResponse,
GetTaskResponse,
SendMessageSuccessResponse,
Task,
TaskState,
SendMessageRequest,
MessageSendParams,
GetTaskRequest,
TaskQueryParams,
SendStreamingMessageRequest,
)
import httpx
import traceback
AGENT_URL = 'http://localhost:10000'
def create_send_message_payload(
text: str, task_id: str | None = None, context_id: str | None = None
) -> dict[str, Any]:
"""Hilfsfunktion zum Erstellen der Payload für Task-Sendungen."""
payload: dict[str, Any] = {
'message': {
'role': 'user',
'parts': [{'kind': 'text', 'text': text}],
'messageId': uuid4().hex,
},
}
if task_id:
payload['message']['taskId'] = task_id
if context_id:
payload['message']['contextId'] = context_id
return payload
def print_json_response(response: Any, description: str) -> None:
"""Hilfsfunktion zum Ausgeben der JSON-Darstellung der Antwort."""
print(f'--- {description} ---')
if hasattr(response, 'root'):
print(f'{response.root.model_dump_json(exclude_none=True)}\n')
else:
print(f'{response.model_dump(mode="json", exclude_none=True)}\n')
async def run_single_turn_test(client: A2AClient) -> None:
"""Führt einen Single-Turn-Test ohne Streaming durch."""
send_payload = create_send_message_payload(
text='how much is 100 USD in CAD?'
)
request = SendMessageRequest(params=MessageSendParams(**send_payload))
print('--- Single Turn Request ---')
# Nachricht senden
send_response: SendMessageResponse = await client.send_message(request)
print_json_response(send_response, 'Single Turn Request Response')
if not isinstance(send_response.root, SendMessageSuccessResponse):
print('received non-success response. Aborting get task ')
return
if not isinstance(send_response.root.result, Task):
print('received non-task response. Aborting get task ')
return
task_id: str = send_response.root.result.id
print('---Query Task---')
# Task abfragen
get_request = GetTaskRequest(params=TaskQueryParams(id=task_id))
get_response: GetTaskResponse = await client.get_task(get_request)
print_json_response(get_response, 'Query Task Response')
async def run_streaming_test(client: A2AClient) -> None:
"""Führt einen Single-Turn-Test mit Streaming durch."""
send_payload = create_send_message_payload(
text='how much is 50 EUR in JPY?'
)
request = SendStreamingMessageRequest(
params=MessageSendParams(**send_payload)
)
print('--- Single Turn Streaming Request ---')
stream_response = client.send_message_streaming(request)
async for chunk in stream_response:
print_json_response(chunk, 'Streaming Chunk')
async def run_multi_turn_test(client: A2AClient) -> None:
"""Führt einen Multi-Turn-Test ohne Streaming durch."""
print('--- Multi-Turn Request ---')
# --- Erster Turn ---
first_turn_payload = create_send_message_payload(
text='how much is 100 USD?'
)
request1 = SendMessageRequest(
params=MessageSendParams(**first_turn_payload)
)
first_turn_response: SendMessageResponse = await client.send_message(
request1
)
print_json_response(first_turn_response, 'Multi-Turn: First Turn Response')
context_id: str | None = None
if isinstance(
first_turn_response.root, SendMessageSuccessResponse
) and isinstance(first_turn_response.root.result, Task):
task: Task = first_turn_response.root.result
context_id = task.contextId # Kontext-ID erfassen
# --- Zweiter Turn (bei erforderlicher Eingabe) ---
if task.status.state == TaskState.input_required and context_id:
print('--- Multi-Turn: Second Turn (Input Required) ---')
second_turn_payload = create_send_message_payload(
'in GBP', task.id, context_id
)
request2 = SendMessageRequest(
params=MessageSendParams(**second_turn_payload)
)
second_turn_response = await client.send_message(request2)
print_json_response(
second_turn_response, 'Multi-Turn: Second Turn Response'
)
elif not context_id:
print('Warning: Could not get context ID from first turn response.')
else:
print(
'First turn completed, no further input required for this test case.'
)
async def main() -> None:
"""Hauptfunktion zum Ausführen der Tests."""
print(f'Connecting to agent at {AGENT_URL}...')
try:
async with httpx.AsyncClient(timeout=100) as httpx_client:
client = await A2AClient.get_client_from_agent_card_url(
httpx_client, AGENT_URL
)
print('Connection successful.')
await run_single_turn_test(client)
await run_streaming_test(client)
await run_multi_turn_test(client)
except Exception as e:
traceback.print_exc()
print(f'An error occurred: {e}')
print('Ensure the agent server is running.')
if __name__ == '__main__':
import asyncio
asyncio.run(main())
Ausführung:
uv run python test_client.py
Tutorial beendet.