A2A Protocol

A2A Protocol Extension: Secure Passport Complete Guide

MILO
Share
A2A Protocol Extension: Secure Passport Complete Guide

Table of Contents

  1. Complete Overview
  2. Detailed Technical Specification
  3. Four Practical Use Cases
  4. Complete Python Implementation Guide
  5. Best Practices

1. Complete Overview

Extension Purpose and Core Benefits

The Secure Passport Extension introduces a trusted context layer to the Agent2Agent (A2A) protocol, enabling calling agents to securely and voluntarily share a structured subset of their current context state with called agents. This extension aims to transform anonymous, transactional calls into collaborative partnerships.

Core Benefits

  • Instant Personalization: Specialized agents can immediately use context (such as loyalty tier or preferred currency)
  • Reduced Overhead: Eliminates multi-turn conversations needed to establish context
  • Enhanced Trust: Includes a signature field for cryptographic verification of data origin and integrity

Technical Specification and Identifiers

  • URI: https://github.com/a2aproject/a2a-samples/tree/main/samples/python/extensions/secure-passport
  • Type: Profile Extension / Pure Data Extension
  • Version: 1.0.0
  • Protocol: Agent2Agent (A2A)

Detailed Core Benefits

Instant Personalization

Specialized agents can immediately access the caller's context information without additional authentication or data collection steps. For example, a travel agent can instantly know the user's currency preference and loyalty tier.

Reduced Overhead

By pre-sharing context information, this avoids traditional "who are you? what do you need?" style multi-turn conversations, significantly improving the efficiency of inter-agent communication.

Enhanced Trust

Digital signature mechanisms ensure the integrity and origin verification of context data, providing security guarantees for high-privilege operations.


2. Detailed Technical Specification

Complete CallerContext Data Structure

The CallerContext object is the core payload of the secure passport, containing the following fields:

Field Type Required Description
clientId string Yes Verifiable unique identifier of the calling agent
signature string No Digital signature of the entire state object, signed by the calling agent's private key, for cryptographic trust verification
sessionId string No Session or conversation identifier for maintaining thread continuity
state object Yes Free-form JSON object containing context data (e.g., user preferences, loyalty tier)

Message Integration Example (Actual JSON)

Complete A2A Message Request Example

{
  "jsonrpc": "2.0",
  "id": "req-123",
  "method": "tasks/send",
  "params": {
    "message": {
      "messageId": "msg-456",
      "role": "user",
      "parts": [
        {"kind": "text", "content": "Book a flight for me."}
      ],
      "metadata": {
        "https://github.com/a2aproject/a2a-samples/tree/main/samples/python/extensions/secure-passport": {
          "clientId": "a2a://orchestrator-agent.com",
          "sessionId": "travel-session-xyz",
          "signature": "MOCK-SIG-123456...",
          "state": {
            "user_preferred_currency": "GBP",
            "loyalty_tier": "Gold"
          }
        }
      }
    }
  }
}

CallerContext Payload Example

{
  "clientId": "a2a://orchestrator-agent.com",
  "sessionId": "travel-session-xyz",
  "signature": "MOCK-SIG-123456...",
  "state": {
    "user_preferred_currency": "GBP",
    "loyalty_tier": "Gold"
  }
}

Agent Declaration and Negotiation Pattern

AgentCard Declaration Example

A2A agents capable of receiving and utilizing secure passport context must declare their support in the extensions section of the AgentCapabilities object in their AgentCard.

{
  "uri": "https://github.com/a2aproject/a2a-samples/tree/main/samples/python/extensions/secure-passport",
  "params": {
    "supportedStateKeys": ["user_preferred_currency", "loyalty_tier"]
  }
}

The called agent uses the supportedStateKeys array to explicitly declare the context data keys it understands and optimizes for.


3. Four Practical Use Cases

Scenario 1: Currency Conversion (High Trust)

Scenario Description: A travel orchestrator needs currency conversion services and must pass the user's currency preference.

Context Data:

{
  "clientId": "a2a://travel-orchestrator.com",
  "state": {
    "user_preferred_currency": "GBP",
    "user_id": "U001"
  },
  "signature": "sig-currency-1"
}

Benefits:

  • Currency conversion agent can immediately display prices in user's preferred currency
  • Digital signature ensures trustworthiness of currency preference
  • No additional authentication steps required

Scenario 2: Personalized Travel Booking (Session Data)

Scenario Description: A travel portal needs to provide personalized services for high-loyalty users.

Context Data:

{
  "clientId": "a2a://travel-portal.com",
  "sessionId": "travel-booking-session-999",
  "state": {
    "destination": "Bali, Indonesia",
    "loyalty_tier": "Platinum"
  },
  "signature": "sig-travel-2"
}

Benefits:

  • Travel agent can immediately identify high-value customers
  • Session ID supports context continuity across multiple requests
  • Can provide special offers based on loyalty tier

Scenario 3: Retail Assistance (No Signature, Low Risk)

Scenario Description: E-commerce frontend needs product recommendation services with low risk, no signature verification required.

Context Data:

{
  "clientId": "a2a://ecommerce-front.com",
  "state": {
    "product_sku": "Nikon-Z-50mm-f1.8",
    "user_intent": "seeking_reviews"
  }
}

Benefits:

  • Recommendation agent can immediately understand user intent
  • No signature requirement, suitable for low-risk scenarios
  • Quick response without verification overhead

Scenario 4: Secure Database Access (Strict Permissions)

Scenario Description: Marketing agent needs to access financial database, requiring strict permission control.

Context Data:

{
  "clientId": "a2a://marketing-agent.com",
  "state": {
    "query_type": "quarterly_revenue",
    "access_scope": ["read:finance_db", "user:Gulli"]
  },
  "signature": "sig-finance-4"
}

Benefits:

  • Database agent can verify access permissions
  • Digital signature ensures legitimacy of request
  • Supports fine-grained permission control

4. Complete Python Implementation Guide

Installation and Setup Instructions

1. Environment Requirements

  • Python 3.9 or higher
  • Poetry (recommended for dependency management)

2. Project Setup

# Navigate to sample project directory
cd extensions/secure-passport/v1/samples/python

# Install dependencies
poetry install

# Activate virtual environment
poetry shell

Core Implementation (Code Examples)

1. Basic Data Model

from typing import Optional, Dict, Any
from pydantic import BaseModel, Field, ConfigDict

class CallerContext(BaseModel):
    """
    Secure passport payload containing context state shared by calling agent.
    """
    client_id: str = Field(..., alias='clientId', description="Verifiable unique identifier of calling client")
    signature: Optional[str] = Field(None, alias='signature', description="Cryptographic signature of 'state' payload")
    session_id: Optional[str] = Field(None, alias='sessionId', description="Session or conversation identifier")
    state: Dict[str, Any] = Field(..., description="Free-form JSON object containing context data")
    
    model_config = ConfigDict(
        populate_by_name=True, 
        extra='forbid'
    )
    
    @property
    def is_verified(self) -> bool:
        """Conceptually check if passport contains valid signature."""
        return self.signature is not None

2. Message Operation Functions

SECURE_PASSPORT_URI = "https://github.com/a2aproject/a2a-samples/tree/main/samples/python/extensions/secure-passport"

def add_secure_passport(message: A2AMessage, context: CallerContext) -> None:
    """Add secure passport (CallerContext) to message metadata."""
    message.metadata[SECURE_PASSPORT_URI] = context.model_dump(by_alias=True, exclude_none=True)

def get_secure_passport(message: A2AMessage) -> Optional[CallerContext]:
    """Retrieve and validate secure passport from message metadata."""
    passport_data = message.metadata.get(SECURE_PASSPORT_URI)
    if not passport_data:
        return None

    try:
        return CallerContext.model_validate(deepcopy(passport_data))
    except ValidationError as e:
        import logging
        logging.warning(f"Error: Received malformed secure passport data. Ignoring payload: {e}")
        return None

Middleware Integration Pattern

1. Client Middleware

@staticmethod
def client_middleware(next_handler: Callable[[A2AMessage], Any], message: A2AMessage, context: CallerContext):
    """
    [Conceptual middleware layer: Client/Calling agent]
    """
    print(f"[Middleware: Client] Attaching secure passport for {context.client_id}")
    add_secure_passport(message, context)
    return next_handler(message)

2. Server Middleware

@staticmethod
def server_middleware(next_handler: Callable[[A2AMessage, Optional[CallerContext]], Any], message: A2AMessage):
    """
    [Conceptual middleware layer: Server/Receiving agent]
    """
    passport = get_secure_passport(message)
    
    if passport:
        print(f"[Middleware: Server] Extracted secure passport. Verified: {passport.is_verified}")
    else:
        print("[Middleware: Server] No secure passport found or validation failed.")
        
    return next_handler(message, passport)

AgentCard Declaration Generation

class SecurePassportExtension:
    @staticmethod
    def get_agent_card_declaration(supported_state_keys: Optional[List[str]] = None) -> Dict[str, Any]:
        """
        Generate JSON structure for declaring support of this extension in A2A AgentCard.
        """
        declaration = {
            "uri": SECURE_PASSPORT_URI,
            "params": {}
        }
        if supported_state_keys:
            declaration["params"]["supportedStateKeys"] = supported_state_keys
        
        return declaration

# Usage examples
# Scenario 1: Agent supports basic secure passport
simple_declaration = SecurePassportExtension.get_agent_card_declaration()

# Scenario 2: Agent supports specific keys (e.g., travel agent)
travel_keys = ["destination", "loyalty_tier", "dates"]
complex_declaration = SecurePassportExtension.get_agent_card_declaration(travel_keys)

Complete Use Case Demonstrations

1. Currency Conversion Example

def demonstrate_currency_conversion():
    """Demonstrate currency conversion use case"""
    passport = CallerContext(
        client_id="a2a://travel-orchestrator.com",
        state={"user_preferred_currency": "GBP", "user_id": "U001"},
        signature="sig-currency-1"
    )
    
    message = A2AMessage()
    add_secure_passport(message, passport)
    retrieved = get_secure_passport(message)
    
    if retrieved and retrieved.is_verified:
        currency = retrieved.state.get("user_preferred_currency")
        print(f"Display prices in {currency}")

2. Personalized Travel Booking Example

def demonstrate_travel_booking():
    """Demonstrate personalized travel booking use case"""
    passport = CallerContext(
        client_id="a2a://travel-portal.com",
        session_id="travel-booking-session-999",
        state={
            "destination": "Bali, Indonesia",
            "loyalty_tier": "Platinum"
        },
        signature="sig-travel-2"
    )
    
    message = A2AMessage()
    add_secure_passport(message, passport)
    retrieved = get_secure_passport(message)
    
    if retrieved and retrieved.is_verified:
        tier = retrieved.state.get("loyalty_tier")
        if tier == "Platinum":
            print("Provide special offers for Platinum members")

Running and Testing

1. Run Unit Tests

pytest tests/

2. Run Middleware Demo

python run.py

5. Best Practices

Security Considerations and Signature Verification

1. Signature Verification Strategy

def verify_signature(passport: CallerContext, public_key: str) -> bool:
    """
    Verify digital signature of secure passport
    """
    if not passport.signature:
        return False
    
    try:
        # Verify signature using public key
        # Actual cryptographic verification logic should be implemented here
        return verify_cryptographic_signature(
            data=passport.state,
            signature=passport.signature,
            public_key=public_key
        )
    except Exception as e:
        logging.error(f"Signature verification failed: {e}")
        return False

2. Sensitive Data Handling

  • Do not include sensitive or mutable data in the state object unless robust end-to-end encryption verification is implemented
  • For high-privilege operations, should verify the provided signature
  • Use appropriate cryptographic algorithms and key management

Implementation Patterns and SDK Helpers

1. Middleware Integration Pattern

class SecurePassportMiddleware:
    """Complete implementation of secure passport middleware"""
    
    def __init__(self, private_key: str = None, public_keys: Dict[str, str] = None):
        self.private_key = private_key
        self.public_keys = public_keys or {}
    
    def client_side(self, message: A2AMessage, context: CallerContext):
        """Client-side middleware handling"""
        if self.private_key:
            context.signature = self._sign_context(context.state)
        add_secure_passport(message, context)
    
    def server_side(self, message: A2AMessage) -> Optional[CallerContext]:
        """Server-side middleware handling"""
        passport = get_secure_passport(message)
        if passport and passport.signature:
            client_key = self.public_keys.get(passport.client_id)
            if client_key and not self._verify_signature(passport, client_key):
                logging.warning(f"Signature verification failed from {passport.client_id}")
                return None
        return passport

2. SDK Helper Methods

class A2ASDK:
    """Convenience methods for A2A SDK"""
    
    @staticmethod
    def create_agent_card_with_passport(supported_keys: List[str] = None) -> Dict:
        """Automatically generate AgentCard with secure passport declaration"""
        return {
            "capabilities": {
                "extensions": [
                    SecurePassportExtension.get_agent_card_declaration(supported_keys)
                ]
            }
        }
    
    @staticmethod
    def send_message_with_passport(message: A2AMessage, context: CallerContext):
        """Send message with secure passport"""
        add_secure_passport(message, context)
        return send_message(message)

Error Handling Strategies

1. Validation Error Handling

def safe_get_passport(message: A2AMessage) -> Optional[CallerContext]:
    """Safely get passport with complete error handling"""
    try:
        passport = get_secure_passport(message)
        if passport:
            # Log passport information (excluding sensitive data)
            logging.info(f"Received secure passport from {passport.client_id}")
        return passport
    except ValidationError as e:
        logging.error(f"Passport validation failed: {e}")
        return None
    except Exception as e:
        logging.error(f"Unexpected error getting passport: {e}")
        return None

2. Fallback Strategy

def handle_passport_failure(message: A2AMessage, fallback_handler):
    """Fallback strategy when passport handling fails"""
    passport = get_secure_passport(message)
    if not passport:
        logging.warning("No valid passport found, using fallback handling")
        return fallback_handler(message)
    
    if not passport.is_verified:
        logging.warning("Passport not verified, handling cautiously")
        return cautious_handler(message, passport)
    
    return normal_handler(message, passport)

Performance Optimization Tips

1. Caching Strategy

class PassportCache:
    """Passport cache to avoid repeated verification"""
    
    def __init__(self, ttl: int = 300):  # 5-minute TTL
        self.cache = {}
        self.ttl = ttl
    
    def get_verified_passport(self, client_id: str, signature: str) -> Optional[CallerContext]:
        """Get verified passport from cache"""
        cache_key = f"{client_id}:{signature}"
        if cache_key in self.cache:
            cached_data, timestamp = self.cache[cache_key]
            if time.time() - timestamp < self.ttl:
                return cached_data
            else:
                del self.cache[cache_key]
        return None
    
    def cache_verified_passport(self, passport: CallerContext):
        """Cache verified passport"""
        cache_key = f"{passport.client_id}:{passport.signature}"
        self.cache[cache_key] = (passport, time.time())

2. Asynchronous Processing

import asyncio
from typing import AsyncGenerator

async def process_messages_with_passport(messages: AsyncGenerator[A2AMessage, None]):
    """Asynchronously process messages with passport"""
    async for message in messages:
        passport = get_secure_passport(message)
        if passport:
            # Asynchronously verify signature
            is_verified = await verify_signature_async(passport)
            if is_verified:
                await process_verified_message(message, passport)
            else:
                await process_unverified_message(message, passport)
        else:
            await process_no_passport_message(message)

Debugging and Monitoring Methods

1. Detailed Logging

import logging
import json

def setup_passport_logging():
    """Setup detailed logging for secure passport"""
    logger = logging.getLogger('secure_passport')
    logger.setLevel(logging.DEBUG)
    
    handler = logging.StreamHandler()
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    handler.setFormatter(formatter)
    logger.addHandler(handler)

def log_passport_operation(operation: str, passport: CallerContext, success: bool):
    """Log passport operation"""
    logger = logging.getLogger('secure_passport')
    log_data = {
        'operation': operation,
        'client_id': passport.client_id,
        'has_signature': passport.signature is not None,
        'session_id': passport.session_id,
        'state_keys': list(passport.state.keys()),
        'success': success
    }
    logger.info(f"Passport operation: {json.dumps(log_data)}")

2. Monitoring Metrics

from collections import defaultdict
import time

class PassportMetrics:
    """Monitoring metrics for secure passport"""
    
    def __init__(self):
        self.operation_counts = defaultdict(int)
        self.verification_times = []
        self.error_counts = defaultdict(int)
    
    def record_operation(self, operation: str, duration: float, success: bool):
        """Record operation metrics"""
        self.operation_counts[operation] += 1
        if operation == 'verify_signature':
            self.verification_times.append(duration)
        
        if not success:
            self.error_counts[operation] += 1
    
    def get_stats(self) -> Dict:
        """Get statistics"""
        avg_verification_time = (
            sum(self.verification_times) / len(self.verification_times)
            if self.verification_times else 0
        )
        
        return {
            'total_operations': sum(self.operation_counts.values()),
            'operation_breakdown': dict(self.operation_counts),
            'average_verification_time': avg_verification_time,
            'error_breakdown': dict(self.error_counts)
        }

3. Health Check

def health_check_passport_system() -> Dict[str, Any]:
    """Health check for secure passport system"""
    health_status = {
        'status': 'healthy',
        'checks': {}
    }
    
    # Check passport verification functionality
    try:
        test_passport = CallerContext(
            client_id="test://health-check",
            state={"test": "data"},
            signature="test-signature"
        )
        health_status['checks']['passport_creation'] = 'ok'
    except Exception as e:
        health_status['checks']['passport_creation'] = f'error: {e}'
        health_status['status'] = 'unhealthy'
    
    # Check message operation functionality
    try:
        message = A2AMessage()
        add_secure_passport(message, test_passport)
        retrieved = get_secure_passport(message)
        health_status['checks']['message_operations'] = 'ok' if retrieved else 'error'
    except Exception as e:
        health_status['checks']['message_operations'] = f'error: {e}'
        health_status['status'] = 'unhealthy'
    
    return health_status

Summary

The Secure Passport Extension provides a powerful and flexible context-sharing mechanism for the A2A protocol. By following the best practices in this guide, you can:

  1. Implement instant personalization: Enable agents to immediately understand caller context
  2. Ensure secure communication: Verify data integrity and origin through digital signatures
  3. Optimize performance: Reduce unnecessary multi-turn conversations and verification overhead
  4. Support multiple scenarios: From low-risk retail assistance to high-security database access

This extension lays a solid foundation for building more intelligent, efficient, and secure agent ecosystems.

A2A Extension List