A2A Protocol

Python A2A教程与源代码

MILO
Share
Python A2A教程与源代码

Python A2A教程源代码

目录

简介

在本教程中,您将使用Python构建一个简单的回显A2A服务器。这个基础实现将展示A2A提供的所有功能。通过本教程,您将能够使用Ollama或Google的Agent Development Kit添加代理功能。

您将学习:

  • A2A的基本概念
  • 如何用Python创建A2A服务器
  • 与A2A服务器交互
  • 添加训练模型作为代理

设置环境

您需要的工具

  • 代码编辑器,如Cursor/VsCode
  • 命令提示符,如Terminal(Linux)、iTerm/Warp(Mac)或Cursor中的Terminal

Python环境

我们将使用uv作为包管理器并设置我们的项目。

我们将使用的A2A库需要python >= 3.12,如果您还没有匹配的版本,uv可以安装。我们将使用python 3.12。

检查

运行以下命令以确保您已准备好进行下一步:

echo 'import sys; print(sys.version)' | uv run -

如果您看到类似以下内容,则表示您已准备就绪!

3.12.3 (main, Feb 4 2025, 14:48:35) [GCC 13.3.0]

我的环境

  • Python 3.13
  • uv: uv 0.7.2 (Homebrew 2025-04-30)
  • Warp
  • Ollama 0.6.7 (支持Qwen3)
  • macOs Sequoia 15.4.1

创建项目

首先使用uv创建一个项目。我们将添加--package标志,以便您以后可以添加测试或发布项目:

uv init --package my-project
cd my-project

使用虚拟环境

我们将为这个项目创建一个虚拟环境。这只需要做一次:

uv venv .venv

对于此及将来打开的任何终端窗口,您需要激活此虚拟环境:

source .venv/bin/activate

如果您使用的是VS Code等代码编辑器,您需要设置Python解释器以获取代码补全。在VS Code中,按Ctrl-Shift-P并选择Python: Select Interpreter。然后选择您的项目my-project,然后选择正确的python解释器Python 3.12.3 ('.venv':venv) ./.venv/bin/python

源代码现在应该类似于这样:

# my-project
tree
.
|____pyproject.toml
|____README.md
|____.venv
| |____bin
| | |____activate.bat
| | |____activate.ps1
| | |____python3
| | |____python
| | |____activate.fish
| | |____pydoc.bat
| | |____activate_this.py
| | |____activate
| | |____activate.nu
| | |____deactivate.bat
| | |____python3.13
| | |____activate.csh
| |____pyvenv.cfg
| |____CACHEDIR.TAG
| |____.gitignore
| |____lib
| | |____python3.13
| | | |____site-packages
| | | | |_____virtualenv.py
| | | | |_____virtualenv.pth
|____.python-version
|____src
| |____my_project
| | |______init__.py

添加Google-A2A Python库

接下来,我们将添加来自Google的示例A2A python库:

uv add git+https://github.com/google/A2A#subdirectory=samples/python

pyproject.toml:

[project]
name = "my-project"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
authors = [
    { name = "zhangcheng", email = "zh.milo@gmail.com" }
]
requires-python = ">=3.13"
dependencies = [
    "a2a-samples",
]

[project.scripts]
my-project = "my_project:main"

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.uv.sources]
a2a-samples = { git = "https://github.com/google/A2A", subdirectory = "samples/python" }

设置项目结构

现在创建一些我们稍后将使用的文件:

touch src/my_project/agent.py
touch src/my_project/task_manager.py

测试运行

如果一切设置正确,您现在应该能够运行您的应用程序:

uv run my-project

输出应该类似于这样:

Hello from my-project!

代理技能

代理技能是代理可以执行的一组功能。以下是我们的回显代理的示例:

{
  id: "my-project-echo-skill"
  name: "Echo Tool",
  description: "Echos the input given",
  tags: ["echo", "repeater"],
  examples: ["I will see this echoed back to me"],
  inputModes: ["text"],
  outputModes: ["text"]
}

这符合代理卡片的技能部分:

{
  id: string; // 代理技能的唯一标识符
  name: string; // 技能的人类可读名称
  // 技能描述 - 将被客户端或人类用作
  // 了解技能作用的提示。
  description: string;
  // 描述此特定技能能力类别的标签词集合
  // (例如 "cooking", "customer support", "billing")
  tags: string[];
  // 技能可以执行的示例场景集合。
  // 将被客户端用作了解如何使用技能的提示。
  // (例如 "I need a recipe for bread")
  examples?: string[]; // 任务提示示例
  // 技能支持的交互模式集合
  // (如果与默认值不同)
  inputModes?: string[]; // 支持的输入mime类型
  outputModes?: string[]; // 支持的输出mime类型
}

实现

让我们在代码中创建这个代理技能。打开src/my-project/__init__.py并用以下代码替换内容:

import google_a2a
from google_a2a.common.types import AgentSkill

def main():
  skill = AgentSkill(
    id="my-project-echo-skill",
    name="Echo Tool",
    description="Echos the input given",
    tags=["echo", "repeater"],
    examples=["I will see this echoed back to me"],
    inputModes=["text"],
    outputModes=["text"],
  )
  print(skill)

if __name__ == "__main__":
  main()

如果遇到模块错误,请尝试以下方式:

from common.types import AgentSkill

# 相同的代码

测试运行

让我们运行一下:

uv run my-project

输出应该类似于这样:

id='my-project-echo-skill' name='Echo Tool' description='Echos the input given' tags=['echo', 'repeater'] examples=['I will see this echoed back to me'] inputModes=['text'] outputModes=['text']

代理卡片

现在我们已经定义了技能,可以创建代理卡片了。

远程代理需要以JSON格式发布代理卡片,描述代理的功能和技能以及身份验证机制。换句话说,这让世界了解您的代理以及如何与之交互。

实现

首先添加一些用于解析命令行参数的辅助工具。这对于以后启动服务器会很有帮助:

uv add click

并更新我们的代码:

import logging

import click
import google_a2a
from google_a2a.common.types import AgentSkill, AgentCapabilities, AgentCard

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@click.command()
@click.option("--host", default="localhost")
@click.option("--port", default=10002)
def main(host, port):
  skill = AgentSkill(
    id="my-project-echo-skill",
    name="Echo Tool",
    description="Echos the input given",
    tags=["echo", "repeater"],
    examples=["I will see this echoed back to me"],
    inputModes=["text"],
    outputModes=["text"],
  )
  logging.info(skill)

if __name__ == "__main__":
  main()

接下来我们添加代理卡片:

# ...
def main(host, port):
  # ...
  capabilities = AgentCapabilities()
  agent_card = AgentCard(
    name="Echo Agent",
    description="This agent echos the input given",
    url=f"http://{host}:{port}/",
    version="0.1.0",
    defaultInputModes=["text"],
    defaultOutputModes=["text"],
    capabilities=capabilities,
    skills=[skill]
  )
  logging.info(agent_card)

if __name__ == "__main__":
  main()

测试运行

让我们运行一下:

uv run my-project

输出应该类似于这样:

INFO:root:id='my-project-echo-skill' name='Echo Tool' description='Echos the input given' tags=['echo', 'repeater'] examples=['I will see this echoed back to me'] inputModes=['text'] outputModes=['text']
INFO:root:name='Echo Agent' description='This agent echos the input given' url='http://localhost:10002/' provider=None version='0.1.0' documentationUrl=None capabilities=AgentCapabilities(streaming=False, pushNotifications=False, stateTransitionHistory=False) authentication=None defaultInputModes=['text'] defaultOutputModes=['text'] skills=[AgentSkill(id='my-project-echo-skill', name='Echo Tool', description='Echos the input given', tags=['echo', 'repeater'], examples=['I will see this echoed back to me'], inputModes=['text'], outputModes=['text'])]

A2A服务器

我们几乎准备好启动服务器了!我们将使用Google-A2A中的A2AServer类,它在底层启动了一个uvicorn服务器。

任务管理器

在创建服务器之前,我们需要一个任务管理器来处理传入请求。

我们将实现InMemoryTaskManager接口,这需要我们实现两个方法:

async def on_send_task(
  self,
  request: SendTaskRequest
) -> SendTaskResponse:
  """
  此方法查询或为代理创建任务。
  调用者将收到恰好一个响应。
  """
  pass

async def on_send_task_subscribe(
  self,
  request: SendTaskStreamingRequest
) -> AsyncIterable[SendTaskStreamingResponse] | JSONRPCResponse:
  """
  此方法让调用者订阅关于任务的未来更新。
  调用者将收到一个响应,并通过客户端和服务器之间建立的会话
  额外接收订阅更新
  """
  pass

打开src/my_project/task_manager.py并添加以下代码。我们将简单地返回直接回显响应,并立即将任务标记为完成,不使用任何会话或订阅:

from typing import AsyncIterable

import google_a2a
from google_a2a.common.server.task_manager import InMemoryTaskManager
from google_a2a.common.types import (
  Artifact,
  JSONRPCResponse,
  Message,
  SendTaskRequest,
  SendTaskResponse,
  SendTaskStreamingRequest,
  SendTaskStreamingResponse,
  Task,
  TaskState,
  TaskStatus,
  TaskStatusUpdateEvent,
)

class MyAgentTaskManager(InMemoryTaskManager):
  def __init__(self):
    super().__init__()

  async def on_send_task(self, request: SendTaskRequest) -> SendTaskResponse:
    # 更新InMemoryTaskManager存储的任务
    await self.upsert_task(request.params)

    task_id = request.params.id
    # 我们的自定义逻辑,简单地将任务标记为完成
    # 并返回回显文本
    received_text = request.params.message.parts[0].text
    task = await self._update_task(
      task_id=task_id,
      task_state=TaskState.COMPLETED,
      response_text=f"on_send_task received: {received_text}"
    )

    # 发送响应
    return SendTaskResponse(id=request.id, result=task)

  async def on_send_task_subscribe(
    self,
    request: SendTaskStreamingRequest
  ) -> AsyncIterable[SendTaskStreamingResponse] | JSONRPCResponse:
    pass

  async def _update_task(
    self,
    task_id: str,
    task_state: TaskState,
    response_text: str,
  ) -> Task:
    task = self.tasks[task_id]
    agent_response_parts = [
      {
        "type": "text",
        "text": response_text,
      }
    ]
    task.status = TaskStatus(
      state=task_state,
      message=Message(
        role="agent",
        parts=agent_response_parts,
      )
    )
    task.artifacts = [
      Artifact(
        parts=agent_response_parts,
      )
    ]
    return task

A2A服务器

有了完整的任务管理器,我们现在可以创建服务器了。

打开src/my_project/__init__.py并添加以下代码:

# ...
from google_a2a.common.server import A2AServer
from my_project.task_manager import MyAgentTaskManager
# ...
def main(host, port):
  # ...

  task_manager = MyAgentTaskManager()
  server = A2AServer(
    agent_card=agent_card,
    task_manager=task_manager,
    host=host,
    port=port,
  )
  server.start()

测试运行

让我们运行一下:

uv run my-project

输出应该类似于这样:

INFO:root:id='my-project-echo-skill' name='Echo Tool' description='Echos the input given' tags=['echo', 'repeater'] examples=['I will see this echoed back to me'] inputModes=['text'] outputModes=['text']
INFO:root:name='Echo Agent' description='This agent echos the input given' url='http://localhost:10002/' provider=None version='0.1.0' documentationUrl=None capabilities=AgentCapabilities(streaming=False, pushNotifications=False, stateTransitionHistory=False) authentication=None defaultInputModes=['text'] defaultOutputModes=['text'] skills=[AgentSkill(id='my-project-echo-skill', name='Echo Tool', description='Echos the input given', tags=['echo', 'repeater'], examples=['I will see this echoed back to me'], inputModes=['text'], outputModes=['text'])]
INFO:     Started server process [582]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://localhost:10002 (Press CTRL+C to quit)

恭喜!您的A2A服务器现在已经运行!

与A2A服务器交互

首先,我们将使用Google-A2A的命令行工具向我们的A2A服务器发送请求。尝试之后,我们将编写自己的基本客户端,看看这在幕后是如何工作的。

使用Google-A2A的命令行工具

在上一步中,让您的A2A服务器已经运行:

# 这应该已经在您的终端中运行
$ uv run my-project
INFO:     Started server process [20538]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://localhost:10002 (Press CTRL+C to quit)

在同一目录中打开一个新终端:

source .venv/bin/activate
uv run google-a2a-cli --agent http://localhost:10002

# 如果出错,请尝试这个(确保.venv/lib/python3.13/site-packages中有一个hosts目录):
uv run python -m hosts.cli --agent http://localhost:10002

注意:只有当您从这个pull request安装了google-a2a,这才能工作,因为cli之前没有暴露。

否则,您必须直接检出Google/A2A仓库,导航到samples/python仓库并直接运行cli。

然后您可以通过输入并按Enter键向服务器发送消息:

=========  starting a new task ========

What do you want to send to the agent? (:q or quit to exit): Hello!

如果一切正常,您将在响应中看到:

$ uv run python -m hosts.cli --agent http://localhost:10002
======= Agent Card ========
{"name":"Echo Agent","description":"This agent echos the input given","url":"http://localhost:10002/","version":"0.1.0","capabilities":{"streaming":false,"pushNotifications":false,"stateTransitionHistory":false},"defaultInputModes":["text"],"defaultOutputModes":["text"],"skills":[{"id":"my-project-echo-skill","name":"Echo Tool","description":"Echos the input given","tags":["echo","repeater"],"examples":["I will see this echoed back to me"],"inputModes":["text"],"outputModes":["text"]}]}
=========  starting a new task ========

What do you want to send to the agent? (:q or quit to exit): hello
Select a file path to attach? (press enter to skip):

{"jsonrpc":"2.0","id":"5b3b74b7ea80495daff4047ee48a6c48","result":{"id":"740f1e21465b4ee2af4af7b8c6cacad5","sessionId":"7fbd065264cb4d6c91ed96909589fc35","status":{"state":"completed","message":{"role":"agent","parts":[{"type":"text","text":"on_send_task received: hello"}]},"timestamp":"2025-05-03T22:18:41.649600"},"artifacts":[{"parts":[{"type":"text","text":"on_send_task received: hello"}],"index":0}],"history":[{"role":"user","parts":[{"type":"text","text":"hello"}]}]}}
=========  starting a new task ========

What do you want to send to the agent? (:q or quit to exit):

要退出,请输入:q并按Enter键。

添加代理功能

现在我们已经有了一个基本的A2A服务器运行,让我们添加更多功能。我们将探索A2A如何异步工作并流式传输响应。

流式传输

这允许客户端订阅服务器并接收多个更新,而不是单个响应。这对于长时间运行的代理任务或可能向客户端流式传输多个Artifacts的情况非常有用。

首先,我们将声明我们的代理已准备好进行流式传输。打开src/my_project/__init__.py并更新AgentCapabilities:

# ...
def main(host, port):
  # ...
  capabilities = AgentCapabilities(
    streaming=True
  )
  # ...

现在在src/my_project/task_manager.py中,我们必须实现on_send_task_subscribe

import asyncio
# ...
class MyAgentTaskManager(InMemoryTaskManager):
  # ...
  async def _stream_3_messages(self, request: SendTaskStreamingRequest):
    task_id = request.params.id
    received_text = request.params.message.parts[0].text

    text_messages = ["one", "two", "three"]
    for text in text_messages:
      parts = [
        {
          "type": "text",
          "text": f"{received_text}: {text}",
        }
      ]
      message = Message(role="agent", parts=parts)
      is_last = text == text_messages[-1]
      task_state = TaskState.COMPLETED if is_last else TaskState.WORKING
      task_status = TaskStatus(
        state=task_state,
        message=message
      )
      task_update_event = TaskStatusUpdateEvent(
        id=request.params.id,
        status=task_status,
        final=is_last,
      )
      await self.enqueue_events_for_sse(
        request.params.id,
        task_update_event
      )

  async def on_send_task_subscribe(
    self,
    request: SendTaskStreamingRequest
  ) -> AsyncIterable[SendTaskStreamingResponse] | JSONRPCResponse:
    # 更新InMemoryTaskManager存储的任务
    await self.upsert_task(request.params)

    task_id = request.params.id
    # 为此任务创建一个工作队列
    sse_event_queue = await self.setup_sse_consumer(task_id=task_id)

    # 为此任务启动异步工作
    asyncio.create_task(self._stream_3_messages(request))

    # 告诉客户端期望未来的流式响应
    return self.dequeue_events_for_sse(
      request_id=request.id,
      task_id=task_id,
      sse_event_queue=sse_event_queue,
    )

重启您的A2A服务器以获取新更改,然后重新运行cli:

$ uv run python -m hosts.cli --agent http://localhost:10002
======= Agent Card ========
{"name":"Echo Agent","description":"This agent echos the input given","url":"http://localhost:10002/","version":"0.1.0","capabilities":{"streaming":true,"pushNotifications":false,"stateTransitionHistory":false},"defaultInputModes":["text"],"defaultOutputModes":["text"],"skills":[{"id":"my-project-echo-skill","name":"Echo Tool","description":"Echos the input given","tags":["echo","repeater"],"examples":["I will see this echoed back to me"],"inputModes":["text"],"outputModes":["text"]}]}
=========  starting a new task ========

What do you want to send to the agent? (:q or quit to exit): Streaming?
Select a file path to attach? (press enter to skip):
stream event => {"jsonrpc":"2.0","id":"c6f21c0b7e5e497caaca4a692aaefd7a","result":{"id":"d7218dd3c122477c89d62e7d897fea0b","status":{"state":"working","message":{"role":"agent","parts":[{"type":"text","text":"Streaming?: one"}]},"timestamp":"2025-05-03T22:22:31.354656"},"final":false}}
stream event => {"jsonrpc":"2.0","id":"c6f21c0b7e5e497caaca4a692aaefd7a","result":{"id":"d7218dd3c122477c89d62e7d897fea0b","status":{"state":"working","message":{"role":"agent","parts":[{"type":"text","text":"Streaming?: two"}]},"timestamp":"2025-05-03T22:22:31.354684"},"final":false}}
stream event => {"jsonrpc":"2.0","id":"c6f21c0b7e5e497caaca4a692aaefd7a","result":{"id":"d7218dd3c122477c89d62e7d897fea0b","status":{"state":"completed","message":{"role":"agent","parts":[{"type":"text","text":"Streaming?: three"}]},"timestamp":"2025-05-03T22:22:31.354698"},"final":true}}
=========  starting a new task ========

What do you want to send to the agent? (:q or quit to exit):

有时代理可能需要额外输入。例如,也许代理会询问客户端是否想继续重复3条消息。在这种情况下,代理将响应TaskState.INPUT_REQUIRED,然后客户端将重新发送send_task_streaming,使用相同的task_idsession_id,但更新了提供代理所需输入的消息。在服务器端,我们将更新on_send_task_subscribe来处理这种情况:

import asyncio
from typing import AsyncIterable

from common.server.task_manager import InMemoryTaskManager
from common.types import (
  Artifact,
  JSONRPCResponse,
  Message,
  SendTaskRequest,
  SendTaskResponse,
  SendTaskStreamingRequest,
  SendTaskStreamingResponse,
  Task,
  TaskState,
  TaskStatus,
  TaskStatusUpdateEvent,
)

class MyAgentTaskManager(InMemoryTaskManager):
  def __init__(self):
    super().__init__()

  async def on_send_task(self, request: SendTaskRequest) -> SendTaskResponse:
    # 更新InMemoryTaskManager存储的任务
    await self.upsert_task(request.params)

    task_id = request.params.id
    # 我们的自定义逻辑,简单地将任务标记为完成
    # 并返回回显文本
    received_text = request.params.message.parts[0].text
    task = await self._update_task(
      task_id=task_id,
      task_state=TaskState.COMPLETED,
      response_text=f"on_send_task received: {received_text}"
    )

    # 发送响应
    return SendTaskResponse(id=request.id, result=task)

  async def _stream_3_messages(self, request: SendTaskStreamingRequest):
    task_id = request.params.id
    received_text = request.params.message.parts[0].text

    text_messages = ["one", "two", "three"]
    for text in text_messages:
      parts = [
        {
          "type": "text",
          "text": f"{received_text}: {text}",
        }
      ]
      message = Message(role="agent", parts=parts)
      # is_last = text == text_messages[-1]
      task_state = TaskState.WORKING
      # task_state = TaskState.COMPLETED if is_last else TaskState.WORKING
      task_status = TaskStatus(
        state=task_state,
        message=message
      )
      task_update_event = TaskStatusUpdateEvent(
        id=request.params.id,
        status=task_status,
        final=False,
      )
      await self.enqueue_events_for_sse(
        request.params.id,
        task_update_event
      )
    ask_message = Message(
      role="agent",
      parts=[
        {
          "type": "text",
          "text": "Would you like more messages? (Y/N)"
        }
      ]
    )
    task_update_event = TaskStatusUpdateEvent(
      id=request.params.id,
      status=TaskStatus(
        state=TaskState.INPUT_REQUIRED,
        message=ask_message
      ),
      final=True,
    )
    await self.enqueue_events_for_sse(
      request.params.id,
      task_update_event
    )

  async def on_send_task_subscribe(
    self,
    request: SendTaskStreamingRequest
  ) -> AsyncIterable[SendTaskStreamingResponse] | JSONRPCResponse:
    task_id = request.params.id
    is_new_task = task_id in self.tasks
    # 更新InMemoryTaskManager存储的任务
    await self.upsert_task(request.params)

    received_text = request.params.message.parts[0].text
    sse_event_queue = await self.setup_sse_consumer(task_id=task_id)
    if not is_new_task and received_text == "N":
      task_update_event = TaskStatusUpdateEvent(
        id=request.params.id,
        status=TaskStatus(
          state=TaskState.COMPLETED,
          message=Message(
            role="agent",
            parts=[
              {
                "type": "text",
                "text": "All done!"
              }
            ]
          )
        ),
        final=True,
      )
      await self.enqueue_events_for_sse(
        request.params.id,
        task_update_event,
      )
    else:
      asyncio.create_task(self._stream_3_messages(request))

    return self.dequeue_events_for_sse(
      request_id=request.id,
      task_id=task_id,
      sse_event_queue=sse_event_queue,
    )

  async def _update_task(
    self,
    task_id: str,
    task_state: TaskState,
    response_text: str,
  ) -> Task:
    task = self.tasks[task_id]
    agent_response_parts = [
      {
        "type": "text",
        "text": response_text,
      }
    ]
    task.status = TaskStatus(
      state=task_state,
      message=Message(
        role="agent",
        parts=agent_response_parts,
      )
    )
    task.artifacts = [
      Artifact(
        parts=agent_response_parts,
      )
    ]
    return task

现在重启服务器并运行cli,我们可以看到任务将继续运行,直到我们告诉代理N

uv run python -m hosts.cli --agent http://localhost:10002
======= Agent Card ========
{"name":"Echo Agent","description":"This agent echos the input given","url":"http://localhost:10002/","version":"0.1.0","capabilities":{"streaming":true,"pushNotifications":false,"stateTransitionHistory":false},"defaultInputModes":["text"],"defaultOutputModes":["text"],"skills":[{"id":"my-project-echo-skill","name":"Echo Tool","description":"Echos the input given","tags":["echo","repeater"],"examples":["I will see this echoed back to me"],"inputModes":["text"],"outputModes":["text"]}]}
=========  starting a new task ========

What do you want to send to the agent? (:q or quit to exit): Streaming?
Select a file path to attach? (press enter to skip):
stream event => {"jsonrpc":"2.0","id":"18357b72fc5841ef8e8ede073b91ac48","result":{"id":"b02f6989e72f44818560778d39fcef18","status":{"state":"working","message":{"role":"agent","parts":[{"type":"text","text":"Streaming?: one"}]},"timestamp":"2025-05-04T09:18:18.235994"},"final":false}}
stream event => {"jsonrpc":"2.0","id":"18357b72fc5841ef8e8ede073b91ac48","result":{"id":"b02f6989e72f44818560778d39fcef18","status":{"state":"working","message":{"role":"agent","parts":[{"type":"text","text":"Streaming?: two"}]},"timestamp":"2025-05-04T09:18:18.236021"},"final":false}}
stream event => {"jsonrpc":"2.0","id":"18357b72fc5841ef8e8ede073b91ac48","result":{"id":"b02f6989e72f44818560778d39fcef18","status":{"state":"working","message":{"role":"agent","parts":[{"type":"text","text":"Streaming?: three"}]},"timestamp":"2025-05-04T09:18:18.236033"},"final":false}}
stream event => {"jsonrpc":"2.0","id":"18357b72fc5841ef8e8ede073b91ac48","result":{"id":"b02f6989e72f44818560778d39fcef18","status":{"state":"input-required","message":{"role":"agent","parts":[{"type":"text","text":"Would you like more messages? (Y/N)"}]},"timestamp":"2025-05-04T09:18:18.236044"},"final":true}}
=========  starting a new task ========

What do you want to send to the agent? (:q or quit to exit): N
Select a file path to attach? (press enter to skip):
stream event => {"jsonrpc":"2.0","id":"86ce510ba68b4797a5b68061c8c4780b","result":{"id":"64e51665dc354d2da7c31bcc45abc8f9","status":{"state":"completed","message":{"role":"agent","parts":[{"type":"text","text":"All done!"}]},"timestamp":"2025-05-04T09:22:24.598749"},"final":true}}
=========  starting a new task ========

What do you want to send to the agent? (:q or quit to exit):

恭喜!您现在拥有一个能够异步执行工作并在需要时向用户请求输入的代理。

使用本地Ollama模型

现在我们来到了激动人心的部分。我们将为A2A服务器添加AI功能。

在本教程中,我们将设置一个本地Ollama模型并将其与A2A服务器集成。

要求

我们将安装ollamalangchain,并下载支持MCP工具的ollama模型(用于将来的教程)。

  1. 下载ollama
  2. 运行ollama服务器:
# 注意:如果ollama已经在运行,您可能会收到错误,例如
# Error: listen tcp 127.0.0.1:11434: bind: address already in use
# 在Linux上,您可以运行systemctl stop ollama来停止ollama
ollama serve
  1. 此列表下载模型。我们将使用qwq,因为它支持tools(如其标签所示)并在24GB显卡上运行:
ollama pull qwq

# 或ollama pull qwen3:4b
# 仅2.4G
  1. 安装langchain
uv add langchain langchain-ollama langgraph

现在ollama设置好了,我们可以开始将其集成到我们的A2A服务器中。

将Ollama集成到我们的A2A服务器中

首先打开src/my_project/__init__.py

# ...

@click.command()
@click.option("--host", default="localhost")
@click.option("--port", default=10002)
@click.option("--ollama-host", default="http://127.0.0.1:11434")
@click.option("--ollama-model", default=None)
def main(host, port, ollama_host, ollama_model):
  # ...
  capabilities = AgentCapabilities(
    streaming=False # 我们将流式功能作为读者的练习
  )
  # ...
  task_manager = MyAgentTaskManager(
    ollama_host=ollama_host,
    ollama_model=ollama_model,
  )
  # ..

现在让我们在src/my_project/agent.py中添加AI功能:

from langchain_ollama import ChatOllama
from langgraph.prebuilt import create_react_agent
from langgraph.graph.graph import CompiledGraph

def create_ollama_agent(ollama_base_url: str, ollama_model: str):
  ollama_chat_llm = ChatOllama(
    base_url=ollama_base_url,
    model=ollama_model,
    temperature=0.2
  )
  agent = create_react_agent(ollama_chat_llm, tools=[])
  return agent

async def run_ollama(ollama_agent: CompiledGraph, prompt: str):
  agent_response = await ollama_agent.ainvoke(
    {"messages": prompt }
  )
  message = agent_response["messages"][-1].content
  return str(message)

最后,让我们从src/my_project/task_manager.py调用我们的ollama代理:

# ...
from my_project.agent import create_ollama_agent, run_ollama

class MyAgentTaskManager(InMemoryTaskManager):
  def __init__(
    self,
    ollama_host: str,
    ollama_model: typing.Union[None, str]
  ):
    super().__init__()
    if ollama_model is not None:
      self.ollama_agent = create_ollama_agent(
        ollama_base_url=ollama_host,
        ollama_model=ollama_model
      )
    else:
      self.ollama_agent = None

  async def on_send_task(self, request: SendTaskRequest) -> SendTaskResponse:
    # ...
    received_text = request.params.message.parts[0].text
    response_text = f"on_send_task received: {received_text}"
    if self.ollama_agent is not None:
      response_text = await run_ollama(ollama_agent=self.ollama_agent, prompt=received_text)

    task = await self._update_task(
      task_id=task_id,
      task_state=TaskState.COMPLETED,
      response_text=response_text
    )

    # 发送响应
    return SendTaskResponse(id=request.id, result=task)

  # ...

让我们测试一下!

首先重新运行我们的A2A服务器,将qwq替换为您下载的ollama模型:

uv run my-project --ollama-host http://127.0.0.1:11434 --ollama-model qwen3:4b

然后重新运行cli:

uv run python -m hosts.cli --agent http://localhost:10002

注意,如果您使用的是大型模型,可能需要一段时间才能加载。cli可能会超时。在这种情况下,一旦ollama服务器完成加载模型,就重新运行cli。

您应该会看到类似以下内容:

======= Agent Card ========
{"name":"Echo Agent","description":"This agent echos the input given","url":"http://localhost:10002/","version":"0.1.0","capabilities":{"streaming":false,"pushNotifications":false,"stateTransitionHistory":false},"defaultInputModes":["text"],"defaultOutputModes":["text"],"skills":[{"id":"my-project-echo-skill","name":"Echo Tool","description":"Echos the input given","tags":["echo","repeater"],"examples":["I will see this echoed back to me"],"inputModes":["text"],"outputModes":["text"]}]}
=========  starting a new task ========

What do you want to send to the agent? (:q or quit to exit): hey
Select a file path to attach? (press enter to skip):

{"jsonrpc":"2.0","id":"eca7ecf4d6da4a65a4ff99ab0954b957","result":{"id":"62636e021ac0483bb31d40c1473796fa","sessionId":"438927e3540f459389f3d3cb216dd945","status":{"state":"completed","message":{"role":"agent","parts":[{"type":"text","text":"<think>\nOkay, the user just said \"hey\". That's a pretty open-ended greeting. I need to respond in a friendly and welcoming way. Maybe start with a greeting like \"Hi there!\" to keep it casual. Then, ask how I can assist them. Since they didn't specify a topic, I should keep the response general but inviting. Let me make sure the tone is positive and approachable. Also, check if there's any specific context I should consider, but since there's no prior conversation, it's safe to assume they just want to start a new interaction. Alright, time to put that together.\n</think>\n\nHi there! How can I assist you today? 😊"}]},"timestamp":"2025-05-04T10:01:55.068049"},"artifacts":[{"parts":[{"type":"text","text":"<think>\nOkay, the user just said \"hey\". That's a pretty open-ended greeting. I need to respond in a friendly and welcoming way. Maybe start with a greeting like \"Hi there!\" to keep it casual. Then, ask how I can assist them. Since they didn't specify a topic, I should keep the response general but inviting. Let me make sure the tone is positive and approachable. Also, check if there's any specific context I should consider, but since there's no prior conversation, it's safe to assume they just want to start a new interaction. Alright, time to put that together.\n</think>\n\nHi there! How can I assist you today? 😊"}],"index":0}],"history":[{"role":"user","parts":[{"type":"text","text":"hey"}]}]}}
=========  starting a new task ========

What do you want to send to the agent? (:q or quit to exit):

恭喜!您现在拥有一个使用AI模型生成响应的A2A服务器!