Реализация CurrencyAgent с использованием A2A Python SDK

Официальный SDK Google a2a-python получает частые обновления, и наш учебник также нуждается в обновлении. В этой статье мы реализуем простой CurrencyAgent, используя версию 0.2.3
SDK a2a-python.
Содержание
- Исходный код
- Подготовка
- Подробный процесс
Исходный код
Исходный код проекта доступен на a2a-python-currency. Не стесняйтесь поставить звезду.
Подготовка
- uv 0.7.2, для управления проектом
- Python 3.13+, эта версия необходима для a2a-python
- apiKey и baseURL от openai/openrouter. Я использую OpenRouter, который предлагает больше вариантов моделей.
Подробный процесс
Создание проекта
uv init a2a-python-currency
cd a2a-python-currency
Создание виртуального окружения
uv venv
source .venv/bin/activate
Добавление зависимостей
uv add a2a-sdk uvicorn dotenv click
Настройка переменных окружения
echo OPENROUTER_API_KEY=your_api_key >> .env
echo OPENROUTER_BASE_URL=your_base_url >> .env
# пример
OPENROUTER_API_KEY=ваш_ключ_api_OpenRouter
OPENROUTER_BASE_URL="https://openrouter.ai/api/v1"
Создание агента
Полный код выглядит следующим образом:
import logging
import json
from typing import Any, Dict, List, Optional
import httpx
from os import getenv
from dotenv import load_dotenv
from collections.abc import AsyncIterable
load_dotenv()
logger = logging.getLogger(__name__)
class CurrencyAgent:
"""Агент конвертации валют, использующий API OpenAI."""
SYSTEM_PROMPT = """Вы - специализированный ассистент для конвертации валют.
Ваша единственная цель - использовать инструмент 'get_exchange_rate' для ответов на вопросы о курсах валют.
Если пользователь задает вопросы не о конвертации валют или курсах валют,
вежливо сообщите, что вы не можете помочь по этой теме и можете помогать только с вопросами, связанными с валютами.
Не пытайтесь отвечать на несвязанные вопросы или использовать инструменты для других целей.
У вас есть доступ к следующему инструменту:
- get_exchange_rate: Получить текущий курс обмена между двумя валютами
При использовании инструмента отвечайте в следующем формате JSON:
{
"status": "completed" | "input_required" | "error",
"message": "ваше сообщение ответа"
}
Если вам нужно использовать инструмент, ответьте:
{
"status": "tool_use",
"tool": "get_exchange_rate",
"parameters": {
"currency_from": "USD",
"currency_to": "EUR",
"currency_date": "latest"
}
}
Примечание: Возвращайте ответ в формате JSON, только json разрешен.
"""
def __init__(self):
self.api_key = getenv("OPENROUTER_API_KEY")
self.api_base = getenv("OPENROUTER_BASE_URL")
self.model = "anthropic/claude-3.7-sonnet"
self.conversation_history: List[Dict[str, str]] = []
async def get_exchange_rate(
self,
currency_from: str = 'USD',
currency_to: str = 'EUR',
currency_date: str = 'latest',
) -> Dict[str, Any]:
"""Получить текущий курс обмена между валютами."""
try:
response = httpx.get(
f'https://api.frankfurter.app/{currency_date}',
params={'from': currency_from, 'to': currency_to},
)
response.raise_for_status()
data = response.json()
if 'rates' not in data:
logger.error(f'rates not found in response: {data}')
return {'error': 'Неверный формат ответа API.'}
logger.info(f'API response: {data}')
return data
except httpx.HTTPError as e:
logger.error(f'API request failed: {e}')
return {'error': f'Ошибка запроса API: {e}'}
except ValueError:
logger.error('Invalid JSON response from API')
return {'error': 'Неверный JSON-ответ от API.'}
async def _call_openai(self, messages: List[Dict[str, str]]) -> Dict[str, Any]:
"""Вызвать API OpenAI через OpenRouter."""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.api_base}/chat/completions",
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
},
json={
"model": self.model,
"messages": messages,
"temperature": 0.7,
"stream": False,
},
)
response.raise_for_status()
return response.json()
async def stream(self, query: str, session_id: str) -> AsyncIterable[Dict[str, Any]]:
"""Потоковая передача ответа для заданного запроса."""
# Добавить сообщение пользователя в историю разговора
self.conversation_history.append({"role": "user", "content": query})
# Подготовить сообщения для вызова API
messages = [{"role": "system", "content": self.SYSTEM_PROMPT}] + self.conversation_history
# Получить ответ от OpenAI
response = await self._call_openai(messages)
assistant_message = response["choices"][0]["message"]["content"]
print(assistant_message)
try:
# Попытаться разобрать ответ как JSON
parsed_response = json.loads(assistant_message)
# Если это запрос на использование инструмента
if parsed_response.get("status") == "tool_use":
tool_name = parsed_response["tool"]
parameters = parsed_response["parameters"]
# Выдать статус использования инструмента
yield {
"is_task_complete": False,
"require_user_input": False,
"content": "Поиск курсов валют..."
}
if tool_name == "get_exchange_rate":
# Выдать статус обработки
yield {
"is_task_complete": False,
"require_user_input": False,
"content": "Обработка курсов валют..."
}
tool_result = await self.get_exchange_rate(**parameters)
# Добавить результат инструмента в историю разговора
self.conversation_history.append({
"role": "assistant",
"content": json.dumps({"tool_result": tool_result})
})
# Получить окончательный ответ после использования инструмента
final_response = await self._call_openai(messages)
final_message = final_response["choices"][0]["message"]["content"]
parsed_response = json.loads(final_message)
# Добавить ответ ассистента в историю разговора
self.conversation_history.append({"role": "assistant", "content": assistant_message})
# Выдать окончательный ответ
if parsed_response["status"] == "completed":
yield {
"is_task_complete": True,
"require_user_input": False,
"content": parsed_response["message"]
}
elif parsed_response["status"] in ["input_required", "error"]:
yield {
"is_task_complete": False,
"require_user_input": True,
"content": parsed_response["message"]
}
else:
yield {
"is_task_complete": False,
"require_user_input": True,
"content": "В настоящее время мы не можем обработать ваш запрос. Пожалуйста, попробуйте снова."
}
except json.JSONDecodeError:
# Если ответ не является допустимым JSON, вернуть ошибку
yield {
"is_task_complete": False,
"require_user_input": True,
"content": "Неверный формат ответа от модели."
}
Анализ основных функций и логики реализации:
1. Основные функции
- Специализируется на обработке запросов конвертации валют и курсов валют
- Использует API Frankfurter для получения данных о курсах валют в реальном времени
- Обрабатывает разговоры через модель Claude 3.7 Sonnet через OpenRouter
2. Архитектура системы
Агент состоит из нескольких основных компонентов:
2.1 System Prompt
- Определяет конкретную цель агента: обрабатывать только запросы конвертации валют
- Определяет формат ответа: должен использовать формат JSON
- Определяет использование инструментов: использовать инструмент
get_exchange_rate
для получения информации о курсах валют
2.2 Основные методы
-
Метод инициализации
__init__
- Настраивает API-ключ и базовый URL
- Инициализирует историю разговоров
-
Метод запроса курса валют
get_exchange_rate
- Параметры: исходная валюта, целевая валюта, дата (последняя по умолчанию)
- Вызывает API Frankfurter для получения данных о курсах валют
- Возвращает информацию о курсе валют в формате JSON
-
Метод потоковой передачи
stream
- Обеспечивает функциональность потокового ответа
- Возвращает статус обработки и результаты в реальном времени
- Поддерживает промежуточный статус для вызовов инструментов
3. Рабочий процесс
-
Получение запроса пользователя
- Добавляет сообщение пользователя в историю разговоров
-
Обработка моделью
- Отправляет System Prompt и историю разговоров модели
- Модель анализирует, нужно ли использовать инструмент
-
Вызов инструмента (если необходимо)
- Если модель решает использовать инструмент, возвращает запрос на вызов инструмента
- Выполняет запрос курса валют
- Добавляет результаты запроса в историю разговоров
-
Генерация окончательного ответа
- Генерирует окончательный ответ на основе результатов вызова инструмента
- Возвращает ответ в формате JSON
4. Формат ответа
Ответы агента всегда используют формат JSON со следующими состояниями:
completed
: задача завершенаinput_required
: требуется ввод пользователяerror
: произошла ошибкаtool_use
: необходимо использование инструмента
5. Обработка ошибок
- Включает полный механизм обработки ошибок
- Обрабатывает сбои вызовов API
- Обрабатывает ошибки разбора JSON
- Обрабатывает неверные форматы ответов
Тестирование агента
Код теста выглядит следующим образом:
import asyncio
import logging
from currency_agent import CurrencyAgent
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def main():
agent = CurrencyAgent()
# Тестовые случаи
test_queries = [
"What is the current exchange rate from USD to EUR?",
"Can you tell me the exchange rate between GBP and JPY?",
"What's the weather like today?", # Должен быть отклонен как не связанный с валютами
]
for query in test_queries:
logger.info(f"\nTesting query: {query}")
async for response in agent.stream(query, "test_session"):
logger.info(f"Response: {response}")
if __name__ == "__main__":
asyncio.run(main())
Если все настроено правильно, особенно конфигурация окружения, вы должны увидеть вывод, похожий на:
uv run python test_currency_agent.py
INFO:__main__:
Testing query: What is the current exchange rate from USD to EUR?
INFO:httpx:HTTP Request: POST https://openrouter.ai/api/v1/chat/completions "HTTP/1.1 200 OK"
INFO:__main__:Response: {'is_task_complete': False, 'require_user_input': False, 'content': 'Looking up the exchange rates...'}
INFO:__main__:Response: {'is_task_complete': False, 'require_user_input': False, 'content': 'Processing the exchange rates...'}
INFO:httpx:HTTP Request: GET https://api.frankfurter.app/latest?from=USD&to=EUR "HTTP/1.1 200 OK"
INFO:currency_agent:API response: {'amount': 1.0, 'base': 'USD', 'date': '2025-05-20', 'rates': {'EUR': 0.8896}}
INFO:httpx:HTTP Request: POST https://openrouter.ai/api/v1/chat/completions "HTTP/1.1 200 OK"
INFO:currency_agent:Final message: {'role': 'assistant', 'content': '{\n "status": "completed",\n "message": "The current exchange rate from USD to EUR is 0.8896. This means that 1 US Dollar equals 0.8896 Euros as of May 20, 2025."\n}', 'refusal': None, 'reasoning': None}
INFO:__main__:Response: {'is_task_complete': True, 'require_user_input': False, 'content': 'The current exchange rate from USD to EUR is 0.8896. This means that 1 US Dollar equals 0.8896 Euros as of May 20, 2025.'}
Реализация AgentExecutor
from currency_agent import CurrencyAgent # type: ignore[import-untyped]
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events.event_queue import EventQueue
from a2a.types import (
TaskArtifactUpdateEvent,
TaskState,
TaskStatus,
TaskStatusUpdateEvent,
)
from a2a.utils import new_agent_text_message, new_task, new_text_artifact
class CurrencyAgentExecutor(AgentExecutor):
"""Пример AgentExecutor для валют."""
def __init__(self):
self.agent = CurrencyAgent()
async def execute(
self,
context: RequestContext,
event_queue: EventQueue,
) -> None:
query = context.get_user_input()
task = context.current_task
if not context.message:
raise Exception('No message provided')
if not task:
task = new_task(context.message)
event_queue.enqueue_event(task)
# вызвать базовый агент, используя результаты потоковой передачи
async for event in self.agent.stream(query, task.contextId):
if event['is_task_complete']:
event_queue.enqueue_event(
TaskArtifactUpdateEvent(
append=False,
contextId=task.contextId,
taskId=task.id,
lastChunk=True,
artifact=new_text_artifact(
name='current_result',
description='Результат запроса к агенту.',
text=event['content'],
),
)
)
event_queue.enqueue_event(
TaskStatusUpdateEvent(
status=TaskStatus(state=TaskState.completed),
final=True,
contextId=task.contextId,
taskId=task.id,
)
)
elif event['require_user_input']:
event_queue.enqueue_event(
TaskStatusUpdateEvent(
status=TaskStatus(
state=TaskState.input_required,
message=new_agent_text_message(
event['content'],
task.contextId,
task.id,
),
),
final=True,
contextId=task.contextId,
taskId=task.id,
)
)
else:
event_queue.enqueue_event(
TaskStatusUpdateEvent(
status=TaskStatus(
state=TaskState.working,
message=new_agent_text_message(
event['content'],
task.contextId,
task.id,
),
),
final=False,
contextId=task.contextId,
taskId=task.id,
)
)
async def cancel(
self, context: RequestContext, event_queue: EventQueue
) -> None:
raise Exception('cancel not supported')
Анализ логики этого кода:
Это класс AgentExecutor под названием CurrencyAgentExecutor
, который в основном обрабатывает операции агента, связанные с валютами. Давайте проанализируем его структуру и функциональность подробно:
Центральная логика для обработки запросов A2A и генерации ответов/событий реализована через AgentExecutor. SDK A2A Python предоставляет абстрактный базовый класс a2a.server.agent_execution.AgentExecutor, который вы должны реализовать.
Класс AgentExecutor определяет два основных метода:
async def execute(self, context: RequestContext, event_queue: EventQueue)
: обрабатывает полученные запросы, требующие ответов или потоков событий. Он обрабатывает пользовательский ввод (полученный через контекст) и используетevent_queue
для отправки объектов Message, Task, TaskStatusUpdateEvent или TaskArtifactUpdateEvent.async def cancel(self, context: RequestContext, event_queue: EventQueue)
: обрабатывает запросы на отмену текущих задач.
RequestContext предоставляет информацию о полученном запросе, такую как сообщение пользователя и любые существующие детали задачи. EventQueue используется агентом для отправки событий клиенту.
Реализация AgentServer
Код:
import os
import sys
import click
import httpx
from currency_agent import CurrencyAgent # type: ignore[import-untyped]
from agent_executor import CurrencyAgentExecutor # type: ignore[import-untyped]
from dotenv import load_dotenv
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryPushNotifier, InMemoryTaskStore
from a2a.types import AgentCapabilities, AgentCard, AgentSkill
load_dotenv()
@click.command()
@click.option('--host', 'host', default='localhost')
@click.option('--port', 'port', default=10000)
def main(host: str, port: int):
client = httpx.AsyncClient()
request_handler = DefaultRequestHandler(
agent_executor=CurrencyAgentExecutor(),
task_store=InMemoryTaskStore(),
push_notifier=InMemoryPushNotifier(client),
)
server = A2AStarletteApplication(
agent_card=get_agent_card(host, port), http_handler=request_handler
)
import uvicorn
uvicorn.run(server.build(), host=host, port=port)
def get_agent_card(host: str, port: int):
"""Возвращает карточку агента для агента валют."""
capabilities = AgentCapabilities(streaming=True, pushNotifications=True)
skill = AgentSkill(
id='convert_currency',
name='Инструмент курсов валют',
description='Помогает с обменными курсами между различными валютами',
tags=['конвертация валют', 'обмен валют'],
examples=['What is exchange rate between USD and GBP?'],
)
return AgentCard(
name='Агент валют',
description='Помогает с курсами валют',
url=f'http://{host}:{port}/',
version='1.0.0',
defaultInputModes=CurrencyAgent.SUPPORTED_CONTENT_TYPES,
defaultOutputModes=CurrencyAgent.SUPPORTED_CONTENT_TYPES,
capabilities=capabilities,
skills=[skill],
)
if __name__ == '__main__':
main()
AgentSkill
AgentSkill описывает навыки или конкретные функции, которые может выполнять агент. Это строительный блок, который информирует клиента о типах задач, для которых подходит агент. Основные атрибуты AgentSkill (определены в a2a.types):
- id: уникальный идентификатор навыка
- name: человекочитаемое имя
- description: более подробное объяснение функциональности навыка
- tags: ключевые слова для классификации и обнаружения
- examples: примеры промптов или случаев использования
- inputModes / outputModes: поддерживаемые типы MIME для ввода и вывода (например, "text/plain", "application/json")
Этот навык очень прост: обработка конвертации валют, ввод и вывод - text
, определено в AgentCard.
AgentCard
AgentCard - это JSON-документ, предоставляемый сервером A2A, обычно расположенный в эндпоинте .well-known/agent.json
. Это как цифровая визитная карточка агента.
Основные атрибуты AgentCard (определены в a2a.types):
- name, description, version: основная информация об идентичности
- url: эндпоинт для доступа к сервису A2A
- capabilities: указывает поддерживаемые функции A2A, такие как streaming или pushNotifications
- defaultInputModes / defaultOutputModes: типы MIME по умолчанию для агента
- skills: список объектов AgentSkill, предоставляемых агентом
AgentServer
-
DefaultRequestHandler: SDK предоставляет DefaultRequestHandler. Этот обработчик получает реализацию AgentExecutor (здесь CurrencyAgentExecutor) и TaskStore (здесь InMemoryTaskStore). Он маршрутизирует полученные вызовы RPC A2A к соответствующим методам агента (таким как execute или cancel). TaskStore используется DefaultRequestHandler для управления жизненным циклом задач, особенно для взаимодействий с состоянием, потоковой передачи и повторной подписки. Даже если AgentExecutor прост, обработчику нужен хранилище задач.
-
A2AStarletteApplication: Класс A2AStarletteApplication создается с использованием agent_card и request_handler (называется http_handler в конструкторе). agent_card очень важен, так как сервер будет предоставлять его по умолчанию в эндпоинте
/.well-known/agent.json
. request_handler отвечает за обработку всех полученных вызовов метода A2A через взаимодействие с его AgentExecutor. -
uvicorn.run(server_app_builder.build(), ...): A2AStarletteApplication имеет метод build() для построения реального приложения Starlette. Это приложение затем запускается с использованием
uvicorn.run()
, делая ваш агент доступным через HTTP. host='0.0.0.0' делает сервер доступным на всех сетевых интерфейсах вашей машины. port=9999 указывает порт для прослушивания. Это соответствует url в AgentCard.
Запуск
Запуск сервера
uv run python main.py
Вывод:
INFO: Started server process [70842]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://localhost:10000 (Press CTRL+C to quit)
Запуск клиента
Код клиента выглядит следующим образом:
from a2a.client import A2AClient
from typing import Any
from uuid import uuid4
from a2a.types import (
SendMessageResponse,
GetTaskResponse,
SendMessageSuccessResponse,
Task,
TaskState,
SendMessageRequest,
MessageSendParams,
GetTaskRequest,
TaskQueryParams,
SendStreamingMessageRequest,
)
import httpx
import traceback
AGENT_URL = 'http://localhost:10000'
def create_send_message_payload(
text: str, task_id: str | None = None, context_id: str | None = None
) -> dict[str, Any]:
"""Вспомогательная функция для создания полезной нагрузки для отправки задачи."""
payload: dict[str, Any] = {
'message': {
'role': 'user',
'parts': [{'kind': 'text', 'text': text}],
'messageId': uuid4().hex,
},
}
if task_id:
payload['message']['taskId'] = task_id
if context_id:
payload['message']['contextId'] = context_id
return payload
def print_json_response(response: Any, description: str) -> None:
"""Вспомогательная функция для вывода JSON-представления ответа."""
print(f'--- {description} ---')
if hasattr(response, 'root'):
print(f'{response.root.model_dump_json(exclude_none=True)}\n')
else:
print(f'{response.model_dump(mode="json", exclude_none=True)}\n')
async def run_single_turn_test(client: A2AClient) -> None:
"""Выполняет одношаговый тест без потоковой передачи."""
send_payload = create_send_message_payload(
text='how much is 100 USD in CAD?'
)
request = SendMessageRequest(params=MessageSendParams(**send_payload))
print('--- Single Turn Request ---')
# Отправить сообщение
send_response: SendMessageResponse = await client.send_message(request)
print_json_response(send_response, 'Single Turn Request Response')
if not isinstance(send_response.root, SendMessageSuccessResponse):
print('received non-success response. Aborting get task ')
return
if not isinstance(send_response.root.result, Task):
print('received non-task response. Aborting get task ')
return
task_id: str = send_response.root.result.id
print('---Query Task---')
# запросить задачу
get_request = GetTaskRequest(params=TaskQueryParams(id=task_id))
get_response: GetTaskResponse = await client.get_task(get_request)
print_json_response(get_response, 'Query Task Response')
async def run_streaming_test(client: A2AClient) -> None:
"""Выполняет одношаговый тест с потоковой передачей."""
send_payload = create_send_message_payload(
text='how much is 50 EUR in JPY?'
)
request = SendStreamingMessageRequest(
params=MessageSendParams(**send_payload)
)
print('--- Single Turn Streaming Request ---')
stream_response = client.send_message_streaming(request)
async for chunk in stream_response:
print_json_response(chunk, 'Streaming Chunk')
async def run_multi_turn_test(client: A2AClient) -> None:
"""Выполняет многошаговый тест без потоковой передачи."""
print('--- Multi-Turn Request ---')
# --- Первый шаг ---
first_turn_payload = create_send_message_payload(
text='how much is 100 USD?'
)
request1 = SendMessageRequest(
params=MessageSendParams(**first_turn_payload)
)
first_turn_response: SendMessageResponse = await client.send_message(
request1
)
print_json_response(first_turn_response, 'Multi-Turn: First Turn Response')
context_id: str | None = None
if isinstance(
first_turn_response.root, SendMessageSuccessResponse
) and isinstance(first_turn_response.root.result, Task):
task: Task = first_turn_response.root.result
context_id = task.contextId # Захватить ID контекста
# --- Второй шаг (если требуется ввод) ---
if task.status.state == TaskState.input_required and context_id:
print('--- Multi-Turn: Second Turn (Input Required) ---')
second_turn_payload = create_send_message_payload(
'in GBP', task.id, context_id
)
request2 = SendMessageRequest(
params=MessageSendParams(**second_turn_payload)
)
second_turn_response = await client.send_message(request2)
print_json_response(
second_turn_response, 'Multi-Turn: Second Turn Response'
)
elif not context_id:
print('Warning: Could not get context ID from first turn response.')
else:
print(
'First turn completed, no further input required for this test case.'
)
async def main() -> None:
"""Основная функция для выполнения тестов."""
print(f'Connecting to agent at {AGENT_URL}...')
try:
async with httpx.AsyncClient(timeout=100) as httpx_client:
client = await A2AClient.get_client_from_agent_card_url(
httpx_client, AGENT_URL
)
print('Connection successful.')
await run_single_turn_test(client)
await run_streaming_test(client)
await run_multi_turn_test(client)
except Exception as e:
traceback.print_exc()
print(f'An error occurred: {e}')
print('Ensure the agent server is running.')
if __name__ == '__main__':
import asyncio
asyncio.run(main())
Запуск:
uv run python test_client.py
Конец учебника.