A2A Protocol

Building an A2A Currency Agent with LangGraph

MILO
Share
Building an A2A Currency Agent with LangGraph

This guide provides a detailed explanation of how to build an A2A-compliant agent using LangGraph and the Google Gemini model. We'll walk through the Currency Agent example from the A2A Python SDK, explaining each component, the flow of data, and how the A2A protocol facilitates agent interactions.

Table of Contents

  1. Overview
  2. Architecture
  3. Setup and Installation
  4. Components Explained
  5. Flow of Execution
  6. Client Interaction
  7. Advanced Features
  8. Next Steps

Overview

The Currency Agent is a specialized agent that helps with currency conversions. It leverages:

  • A2A Protocol: For standardized communication
  • LangGraph: For orchestrating the agent's reasoning
  • Google's Gemini Model: As the reasoning engine
  • External API: To fetch real-time exchange rates

This example demonstrates several important A2A capabilities:

  • Streaming responses for real-time updates
  • Multi-turn conversations for clarification
  • Task state management
  • Integration with an LLM

Architecture

Here's the high-level architecture of the Currency Agent:

high-level architecture of the Currency Agent

graph TD
    Client[Client] <-->|A2A Protocol| Server[A2A Server]
    Server --> RequestHandler[DefaultA2ARequestHandler]
    RequestHandler --> Executor[CurrencyAgentExecutor]
    Executor --> Agent[CurrencyAgent]
    Agent --> |Uses| Gemini[Gemini LLM]
    Agent --> |Calls| ExchangeAPI[Exchange Rate API]

    subgraph "Task Management"
        Executor --> TaskStore[InMemoryTaskStore]
    end

Setup and Installation

To run this example, you'll need:

  1. Python 3.10 or higher
  2. A Gemini API key

First, clone the A2A repository and install the dependencies:

git clone https://github.com/google/A2A.git -b main --depth 1
cd A2A/a2a-python-sdk
python -m venv .venv
source .venv/bin/activate  # On Windows: .venv\Scripts\activate
pip install -e .[dev]

Then, create a .env file in the examples/langgraph/ directory:

echo "GOOGLE_API_KEY=YOUR_API_KEY_HERE" > a2a-python-sdk/examples/langgraph/.env

Components Explained

Let's explore each component of the Currency Agent in detail:

Agent Card and Skills

The agent card defines the agent's identity, capabilities, and skills. It's created in __main__.py:

def get_agent_card(host: str, port: int):
    """Returns the Agent Card for the Currency Agent."""
    capabilities = AgentCapabilities(streaming=True, pushNotifications=True)
    skill = AgentSkill(
        id='convert_currency',
        name='Currency Exchange Rates Tool',
        description='Helps with exchange values between various currencies',
        tags=['currency conversion', 'currency exchange'],
        examples=['What is exchange rate between USD and GBP?'],
    )
    return AgentCard(
        name='Currency Agent',
        description='Helps with exchange rates for currencies',
        url=f'http://{host}:{port}/',
        version='1.0.0',
        defaultInputModes=CurrencyAgent.SUPPORTED_CONTENT_TYPES,
        defaultOutputModes=CurrencyAgent.SUPPORTED_CONTENT_TYPES,
        capabilities=capabilities,
        skills=[skill],
        authentication=AgentAuthentication(schemes=['public']),
    )

Key points:

  • The agent has a single skill: convert_currency
  • It supports streaming (capabilities.streaming=True)
  • It accepts and returns text content types
  • It uses public authentication (no auth required)

Currency Agent

The CurrencyAgent class in agent.py contains the core logic of the agent:

CurrencyAgent classDiagram

classDiagram
    class CurrencyAgent {
        +SYSTEM_INSTRUCTION: str
        +RESPONSE_FORMAT_INSTRUCTION: str
        +SUPPORTED_CONTENT_TYPES: list
        -model: ChatGoogleGenerativeAI
        -tools: list
        -graph: AgentGraph
        +invoke(query: str, sessionId: str): dict
        +stream(query: str, sessionId: str): AsyncGenerator
        -get_agent_response(config: dict): dict
    }

    class get_exchange_rate {
        <<function>>
    }

    class ResponseFormat {
        <<interface>>
    }

    CurrencyAgent ..> get_exchange_rate : uses
    CurrencyAgent ..> ResponseFormat : returns

The core functionality includes:

  1. Tool Definition: The get_exchange_rate tool fetches real-time exchange rates from an external API:
@tool
def get_exchange_rate(
    currency_from: str = 'USD',
    currency_to: str = 'EUR',
    currency_date: str = 'latest',
):
    """Use this to get current exchange rate."""
    try:
        response = httpx.get(
            f'https://api.frankfurter.app/{currency_date}',
            params={'from': currency_from, 'to': currency_to},
        )
        response.raise_for_status()

        data = response.json()
        # ... error handling code
        return data
    except httpx.HTTPError as e:
        return {'error': f'API request failed: {e}'}
  1. Agent Definition: The agent is created using LangGraph's create_react_agent:
def __init__(self):
    self.model = ChatGoogleGenerativeAI(model='gemini-2.0-flash')
    self.tools = [get_exchange_rate]

    self.graph = create_react_agent(
        self.model,
        tools=self.tools,
        checkpointer=memory,
        prompt=self.SYSTEM_INSTRUCTION,
        response_format=(self.RESPONSE_FORMAT_INSTRUCTION, ResponseFormat),
    )
  1. Response Format: A structured format for agent responses:
class ResponseFormat(BaseModel):
    """Respond to the user in this format."""
    status: Literal['input_required', 'completed', 'error'] = 'input_required'
    message: str
  1. Invocation Methods: Methods for direct invocation and streaming:
def invoke(self, query: str, sessionId: str) -> dict[str, Any]:
    config: RunnableConfig = {'configurable': {'thread_id': sessionId}}
    self.graph.invoke({'messages': [('user', query)]}, config)
    return self.get_agent_response(config)

async def stream(
    self, query: str, sessionId: str
) -> AsyncIterable[dict[str, Any]]:
    inputs: dict[str, Any] = {'messages': [('user', query)]}
    config: RunnableConfig = {'configurable': {'thread_id': sessionId}}

    for item in self.graph.stream(inputs, config, stream_mode='values'):
        message = item['messages'][-1]
        if isinstance(message, AIMessage) and message.tool_calls:
            yield {
                'is_task_complete': False,
                'require_user_input': False,
                'content': 'Looking up the exchange rates...',
            }
        elif isinstance(message, ToolMessage):
            yield {
                'is_task_complete': False,
                'require_user_input': False,
                'content': 'Processing the exchange rates..',
            }

    yield self.get_agent_response(config)

Agent Executor

The CurrencyAgentExecutor in agent_executor.py adapts the LangGraph agent to the A2A protocol:

class CurrencyAgentExecutor(BaseAgentExecutor):
    """Currency AgentExecutor Example."""

    def __init__(self):
        self.agent = CurrencyAgent()

    @override
    async def on_message_send(
        self,
        request: SendMessageRequest,
        event_queue: EventQueue,
        task: Task | None,
    ) -> None:
        """Handler for 'message/send' requests."""
        params: MessageSendParams = request.params
        query = self._get_user_query(params)

        if not task:
            task = create_task_obj(params)

        # invoke the underlying agent
        agent_response: dict[str, Any] = self.agent.invoke(
            query, task.contextId
        )
        update_task_with_agent_response(task, agent_response)
        event_queue.enqueue_event(task)

    @override
    async def on_message_stream(
        self,
        request: SendStreamingMessageRequest,
        event_queue: EventQueue,
        task: Task | None,
    ) -> None:
        """Handler for 'message/stream' requests."""
        params: MessageSendParams = request.params
        query = self._get_user_query(params)

        if not task:
            task = create_task_obj(params)
            # emit the initial task so it is persisted to TaskStore
            event_queue.enqueue_event(task)

        # kickoff the streaming agent and process responses
        async for item in self.agent.stream(query, task.contextId):
            task_artifact_update_event, task_status_event = (
                process_streaming_agent_response(task, item)
            )

            if task_artifact_update_event:
                event_queue.enqueue_event(task_artifact_update_event)

            event_queue.enqueue_event(task_status_event)

    def _get_user_query(self, task_send_params: MessageSendParams) -> str:
        """Helper to get user query from task send params."""
        part = task_send_params.message.parts[0].root
        if not isinstance(part, TextPart):
            raise ValueError('Only text parts are supported')
        return part.text

This executor implements two primary methods:

  1. on_message_send: Handles synchronous requests and returns a complete response
  2. on_message_stream: Handles streaming requests and emits events as they occur

Helpers

The helpers.py file contains utility functions for managing tasks and transforming agent responses into A2A protocol events:

def update_task_with_agent_response(
    task: Task, agent_response: dict[str, Any]
) -> None:
    """Updates the provided task with the agent response."""
    task.status.timestamp = datetime.now().isoformat()
    parts: list[Part] = [Part(TextPart(text=agent_response['content']))]
    if agent_response['require_user_input']:
        task.status.state = TaskState.input_required
        message = Message(
            messageId=str(uuid4()),
            role=Role.agent,
            parts=parts,
        )
        task.status.message = message
        if not task.history:
            task.history = []

        task.history.append(message)
    else:
        task.status.state = TaskState.completed
        task.status.message = None
        if not task.artifacts:
            task.artifacts = []

        artifact: Artifact = Artifact(parts=parts, artifactId=str(uuid4()))
        task.artifacts.append(artifact)

def process_streaming_agent_response(
    task: Task,
    agent_response: dict[str, Any],
) -> tuple[TaskArtifactUpdateEvent | None, TaskStatusUpdateEvent]:
    """Processes the streaming agent responses and returns TaskArtifactUpdateEvent and TaskStatusUpdateEvent."""
    is_task_complete = agent_response['is_task_complete']
    require_user_input = agent_response['require_user_input']
    parts: list[Part] = [Part(TextPart(text=agent_response['content']))]

    end_stream = False
    artifact = None
    message = None

    # responses from this agent can be working/completed/input-required
    if not is_task_complete and not require_user_input:
        task_state = TaskState.working
        message = Message(role=Role.agent, parts=parts, messageId=str(uuid4()))
    elif require_user_input:
        task_state = TaskState.input_required
        message = Message(role=Role.agent, parts=parts, messageId=str(uuid4()))
        end_stream = True
    else:
        task_state = TaskState.completed
        artifact = Artifact(parts=parts, artifactId=str(uuid4()))
        end_stream = True

    task_artifact_update_event = None

    if artifact:
        task_artifact_update_event = TaskArtifactUpdateEvent(
            taskId=task.id,
            contextId=task.contextId,
            artifact=artifact,
            append=False,
            lastChunk=True,
        )

    task_status_event = TaskStatusUpdateEvent(
        taskId=task.id,
        contextId=task.contextId,
        status=TaskStatus(
            state=task_state,
            message=message,
            timestamp=datetime.now().isoformat(),
        ),
        final=end_stream,
    )

    return task_artifact_update_event, task_status_event

The key functions transform the agent's internal response format into the A2A protocol's event model.

Flow of Execution

Let's visualize the entire flow of execution when a client interacts with the Currency Agent:

a2a-langraph-agent-flow

sequenceDiagram
    participant Client
    participant A2AServer
    participant RequestHandler
    participant Executor as CurrencyAgentExecutor
    participant Agent as CurrencyAgent
    participant LLM as Gemini LLM
    participant API as Exchange Rate API

    Client->>A2AServer: Request Agent Card
    A2AServer->>Client: Return Agent Card

    %% Single turn conversation
    Client->>A2AServer: message/send
    A2AServer->>RequestHandler: Route request
    RequestHandler->>Executor: on_message_send
    Executor->>Agent: invoke(query, contextId)
    Agent->>LLM: Process with Gemini
    LLM-->>Agent: Need exchange rate info
    Agent->>API: get_exchange_rate
    API-->>Agent: Return exchange data
    Agent->>LLM: Process with data
    LLM-->>Agent: Generate response
    Agent-->>Executor: Return response dict
    Executor-->>RequestHandler: Update Task & enqueue event
    RequestHandler-->>A2AServer: Return Task with results
    A2AServer-->>Client: Send response

    %% Streaming example
    Client->>A2AServer: message/sendStream
    A2AServer->>RequestHandler: Route streaming request
    RequestHandler->>Executor: on_message_stream
    Executor->>Agent: stream(query, contextId)
    Agent->>LLM: Start processing

    LLM-->>Agent: Need exchange rate
    Agent-->>Executor: Yield status update
    Executor-->>RequestHandler: Enqueue TaskStatusUpdateEvent
    RequestHandler-->>A2AServer: SSE event
    A2AServer-->>Client: Stream status update

    Agent->>API: get_exchange_rate
    API-->>Agent: Return exchange data
    Agent-->>Executor: Yield process update
    Executor-->>RequestHandler: Enqueue TaskStatusUpdateEvent
    RequestHandler-->>A2AServer: SSE event
    A2AServer-->>Client: Stream process update

    Agent->>LLM: Process with data
    LLM-->>Agent: Generate final response
    Agent-->>Executor: Yield final response
    Executor-->>RequestHandler: Enqueue TaskArtifactUpdateEvent & TaskStatusUpdateEvent
    RequestHandler-->>A2AServer: Final SSE events
    A2AServer-->>Client: Stream final result

Client Interaction

The test_client.py script demonstrates how to interact with the agent through the A2A protocol:

async def run_single_turn_test(client: A2AClient) -> None:
    """Runs a single-turn non-streaming test."""
    send_payload = create_send_message_payload(
        text='how much is 100 USD in CAD?'
    )
    # Send Message
    send_response: SendMessageResponse = await client.send_message(
        payload=send_payload
    )
    print_json_response(send_response, 'Single Turn Request Response')

    # Query the task if needed
    if isinstance(send_response.root, SendMessageSuccessResponse) and \
       isinstance(send_response.root.result, Task):
        task_id: str = send_response.root.result.id
        task_id_payload = {'id': task_id}
        get_response: GetTaskResponse = await client.get_task(
            payload=task_id_payload
        )
        print_json_response(get_response, 'Query Task Response')

async def run_streaming_test(client: A2AClient) -> None:
    """Runs a single-turn streaming test."""
    send_payload = create_send_message_payload(
        text='how much is 50 EUR in JPY?'
    )
    print('--- Single Turn Streaming Request ---')
    stream_response = client.send_message_streaming(payload=send_payload)
    async for chunk in stream_response:
        print_json_response(chunk, 'Streaming Chunk')

async def run_multi_turn_test(client: A2AClient) -> None:
    """Runs a multi-turn non-streaming test."""
    print('--- Multi-Turn Request ---')
    # --- First Turn ---
    first_turn_payload = create_send_message_payload(
        text='how much is 100 USD?'
    )
    first_turn_response: SendMessageResponse = await client.send_message(
        payload=first_turn_payload
    )
    print_json_response(first_turn_response, 'Multi-Turn: First Turn Response')

    context_id: str | None = None
    if isinstance(first_turn_response.root, SendMessageSuccessResponse) and \
       isinstance(first_turn_response.root.result, Task):
        task: Task = first_turn_response.root.result
        context_id = task.contextId  # Capture context ID

        # --- Second Turn (if input required) ---
        if task.status.state == TaskState.input_required and context_id:
            print('--- Multi-Turn: Second Turn (Input Required) ---')
            second_turn_payload = create_send_message_payload(
                'in GBP', task.id, context_id
            )
            second_turn_response = await client.send_message(
                payload=second_turn_payload
            )
            print_json_response(
                second_turn_response, 'Multi-Turn: Second Turn Response'
            )

Advanced Features

Streaming

The Currency Agent supports streaming responses, allowing it to provide real-time updates as it processes the request:

  1. When the agent starts thinking, it sends an "in progress" update
  2. When it calls the exchange rate API, it streams an update
  3. When processing the data, it streams another update
  4. Finally, it sends the completed result

This provides a better user experience for operations that might take time to complete.

Multi-turn Conversations

The agent can engage in multi-turn conversations when it needs more information:

  1. If the user asks "How much is 100 USD?" without specifying the target currency
  2. The agent sets the task state to TaskState.input_required
  3. The client can then send a follow-up message with the same contextId
  4. The agent maintains context between turns

Example conversation:

User: How much is 100 USD?
Agent: To which currency would you like to convert 100 USD?
User: in GBP
Agent: 100 USD is approximately 78.45 GBP according to the current exchange rate.

Next Steps

Now that you understand how the Currency Agent works, you can:

  1. Try different queries: Test with various currency pairs
  2. Modify the agent: Add support for more features
    • Historical exchange rates
    • Currency trends analysis
    • Support for more currencies
  3. Extend the architecture:
    • Implement persistent TaskStore for long-lived sessions
    • Add authentication mechanisms
    • Deploy the agent to a production environment

For more information on A2A protocol features:

Related Articles

Go A2AProtocol.ai