This is a Python implementation that adheres to the A2A (Agent2Agent) protocol. A travel assistant demo implemented based on Google's official a2a-python SDK and OpenAI Python SDK. It is a travel assistant in line with the specifications of the OpenAI model, capable of providing you with travel planning services.
Source Code
Project Architecture
This project demonstrates how to build an interoperable travel planning agent using the A2A protocol, including the following core components:
- Travel Planner Agent: Core travel assistant logic based on OpenAI-compatible interface
- Agent Executor: A2A protocol adapter that bridges agent logic to A2A server
- A2A Server: A2A protocol-compliant server providing standardized inter-agent communication interface
- Loop Client: Test client for interacting with the A2A server
Workflow Sequence Diagram
sequenceDiagram
participant Client
participant A2AServer
participant RequestHandler
participant Executor as TravelPlannerAgentExecutor
participant Agent as TravelPlannerAgent
participant LLM as OpenAI-Compatible LLM
Client->>A2AServer: Request Agent Card
A2AServer->>Client: Return Agent Card (skills, capabilities)
Note over Client,A2AServer: User queries travel planning
Client->>A2AServer: message/sendStream (streaming request)
A2AServer->>RequestHandler: Route streaming request
RequestHandler->>Executor: execute(context, event_queue)
Executor->>Agent: stream(query)
Agent->>LLM: chat.completions.create(stream=True)
loop Streaming response processing
LLM-->>Agent: Return streaming content chunk
Agent-->>Executor: yield {'content': chunk, 'done': False}
Executor-->>RequestHandler: TaskArtifactUpdateEvent
RequestHandler-->>A2AServer: Push SSE event
A2AServer-->>Client: Stream content update
end
LLM-->>Agent: Final response completed
Agent-->>Executor: yield {'content': '', 'done': True}
Executor-->>RequestHandler: Final TaskArtifactUpdateEvent
RequestHandler-->>A2AServer: Final SSE event
A2AServer-->>Client: Streaming response completed
Main Workflow
- Agent Card Retrieval: Client first retrieves agent card from A2A server to understand agent capabilities and skills
- Streaming Request Processing: Client sends streaming message request with user query
- Agent Execution: Agent executor processes the request and calls travel planner agent core logic
- LLM Interaction: Agent conducts streaming conversation with OpenAI-compatible LLM
- Real-time Response: Stream responses to client in real-time via Server-Sent Events (SSE)
Getting started
- Configure environment variables:
Copy the example file and configure your API credentials.
cp env.example .env
Edit .env
file with your actual values:
# Required: Your API key for the AI model service
API_KEY=your_actual_api_key_here
# Optional: Model name (default: google/gemini-2.0-flash-001)
MODEL_NAME=google/gemini-2.0-flash-001
# Optional: Base URL for the API service
BASE_URL=https://openrouter.ai/api/v1
-
Install dependencies and start the server:
uv venv source .venv/bin/activate uv sync uv run .
-
Run the loop client in a new terminal:
source .venv/bin/activate uv run loop_client.py
Configuration
The application uses environment variables for configuration:
API_KEY
(required): Your API key for the AI model serviceMODEL_NAME
(optional): The model name to use (default: "google/gemini-2.0-flash-001")BASE_URL
(optional): The base URL for the API service (default: "https://openrouter.ai/api/v1")
Technical Features
Current Implementation
- ✅ A2A Protocol Compliance: Fully compliant with Agent2Agent protocol specification
- ✅ Streaming Response: Supports real-time streaming content generation
- ✅ OpenAI Compatible: Supports any OpenAI-compatible API interface
- ✅ Modular Design: Clear separation between agent logic and protocol adaptation
- ✅ Environment Configuration: Flexible environment variable configuration
Future Enhancement Plans
Task State Management Enhancement
Based on Google A2A LangGraph sample, planning to add the following features:
- 🔄 Task Lifecycle Management: Implement complete task state tracking (submitted → working → completed/failed)
- 🔄 Multi-turn Conversation Support: Add
input_required
state to support complex travel planning scenarios requiring user clarification - 🔄 Task Persistence: Implement task state persistence for long-running planning tasks
- 🔄 Enhanced Error Handling: More detailed error states and recovery mechanisms
- 🔄 Task Cancellation: Support cancellation of ongoing tasks
State Management Example
# Future state management implementation example
class TravelPlannerTaskManager:
async def handle_complex_query(self, query: str, context: RequestContext):
# Detect if more information is needed
if self.needs_clarification(query):
return TaskStatus(
state=TaskState.input_required,
message="More information needed: Please provide specific destination, dates, and budget range"
)
# Execute complex multi-step planning
task_id = await self.create_long_running_task(query)
return TaskStatus(
state=TaskState.working,
taskId=task_id,
message="Creating detailed travel plan..."
)
Planned Feature Additions
- 📋 Structured Data Support: Add DataPart support for form-based travel preference collection
- 🖼️ Multimedia Support: Support FilePart for generating and processing travel images, maps, etc.
- 🔍 Tool Integration: Integrate external APIs (weather, flights, hotels, etc.) as tool calls
- 🌐 Multi-language Support: Extend multi-language travel planning capabilities
- 📊 Analytics Metrics: Add collection of task execution time, success rate metrics
Go A2A