tutorialintermediate
Async multi-agent orchestration Jun 2026 • Agent Patterns Two async multi-agent patterns — a fixed N-agent team with peer messaging through a shared hub, and dynamically spawned async subagents — reduced to their bare messaging and lifecycle mechanics.
cookbook
View original on cookbookThis cookbook demonstrates two async multi-agent orchestration patterns using Claude: a fixed N-agent team with peer messaging through a shared hub, and dynamically spawned async subagents. Built on the Anthropic Python SDK and asyncio, it provides bare messaging and lifecycle mechanics without domain-specific tasks, allowing developers to see exactly which tools fire and in what order before adding their own tools and logic. The patterns run on any API key in under thirty seconds and include a message hub, messaging tools (send_message and wait_for_message), and a base agent loop that handles tool dispatching and inbox management.
Key Points
- •Use a shared Hub class to manage agent inboxes, asyncio.Event objects for blocking waits, and agent status tracking — the core infrastructure for multi-agent coordination
- •Implement two core messaging tools: send_message (to reach other agents) and wait_for_message (to block until receiving messages) — plain text in agent turns goes nowhere
- •Messages arrive inline appended to tool results rather than through polling, ensuring agents receive updates automatically during their next tool call
- •Run agents through a standard tool-use loop (run_agent function) that dispatches send_message/wait_for_message against the hub and routes extra tools through extra_dispatch
- •For fixed N-agent teams: register all agents upfront, give each a persona and task, use send_message for peer introductions, and coordinate via the shared hub
- •For dynamic subagents: spawn agents on-demand using hub.new_name() to generate unique names and register them automatically
- •Drain inboxes after each tool call and append formatted messages to the last tool result so agents see all received messages in context
- •Print live tool calls and inbox deliveries with TRACE tracking to observe orchestration flow and debug agent interactions
- •Set max_turns limits and timeout handling (60s default) to prevent infinite loops and manage agent lifecycle gracefully
- •Keep domain logic separate from messaging mechanics — this pattern is a pure skeleton ready for custom tools and tasks to be dropped in
Found this useful? Add it to a playbook for a step-by-step implementation guide.
Workflow Diagram
Start Process
Step A
Step B
Step C
Complete
Concepts
Artifacts (5)
Hub Classpythontemplate
class Hub:
def __init__(self):
self.inbox: dict[str, list[dict]] = defaultdict(list)
self.event: dict[str, asyncio.Event] = defaultdict(asyncio.Event)
self.status: dict[str, str] = {}
self._ids = itertools.count(1)
def register(self, name: str):
self.status[name] = "active"
_ = self.inbox[name], self.event[name]
def new_name(self, prefix="helper") -> str:
n = f"{prefix}{next(self._ids)}"
self.register(n)
return n
def post(self, sender: str, recipients: list[str], content: str) -> list[str]:
delivered = []
for rid in recipients:
if rid in self.status:
self.inbox[rid].append({"from": sender, "content": content})
self.event[rid].set()
delivered.append(rid)
return delivered
def drain(self, name: str) -> list[dict]:
msgs, self.inbox[name] = self.inbox[name], []
self.event[name] = asyncio.Event()
return msgs
@staticmethod
def render(msgs: list[dict]) -> str:
if not msgs:
return ""
body = "\n".join(
f'<agent-message from="{m["from"]}">\n{m["content"]}\n</agent-message>'
for m in msgs
)
return f"\n\n[Messages received while you were working:]\n{body}"Messaging Tools Definitionpythonconfig
SEND_MESSAGE = {
"name": "send_message",
"description": (
"Send a message to one or more other agents. It will appear appended to their next tool "
"result. This is the ONLY way to reach other agents — plain text in your turn goes nowhere."
),
"input_schema": {
"type": "object",
"properties": {
"recipient_ids": {
"type": "array",
"items": {"type": "string"},
"minItems": 1
},
"content": {"type": "string"},
},
"required": ["recipient_ids", "content"],
},
}
WAIT_FOR_MESSAGE = {
"name": "wait_for_message",
"description": (
"Block until another agent messages you. Note: messages also arrive automatically appended "
"to the result of ANY other tool call, so only use this when you have nothing else to do."
),
"input_schema": {
"type": "object",
"properties": {},
},
}
BASE_TOOLS = [SEND_MESSAGE, WAIT_FOR_MESSAGE]Agent Loop Functionpythonscript
async def run_agent(
hub: Hub,
name: str,
system: str,
first_user_turn: str,
tools: list = None,
extra_dispatch=None,
max_turns: int = 20,
) -> str:
tools = tools or BASE_TOOLS
extra_dispatch = extra_dispatch or {}
messages = [{"role": "user", "content": first_user_turn}]
try:
for _ in range(max_turns):
resp = await client.messages.create(
model=MODEL,
max_tokens=2048,
system=system,
tools=tools,
messages=messages,
)
messages.append({"role": "assistant", "content": resp.content})
if resp.stop_reason == "end_turn":
hub.status[name] = "done"
return "".join(getattr(b, "text", "") for b in resp.content)
if resp.stop_reason != "tool_use":
raise RuntimeError(f"unexpected stop_reason: {resp.stop_reason}")
results = []
for block in resp.content:
if block.type != "tool_use":
continue
TRACE[name].append(block.name)
if block.name == "send_message":
rids = block.input["recipient_ids"]
delivered = hub.post(name, rids, block.input["content"])
unknown = [r for r in rids if r not in delivered]
out = f"delivered to {delivered}" + (f"; unknown: {unknown}" if unknown else "")
elif block.name == "wait_for_message":
hub.status[name] = "idling"
try:
await asyncio.wait_for(hub.event[name].wait(), timeout=60)
out = "woke: new messages"
except TimeoutError:
out = "woke: 60s timeout"
hub.status[name] = "active"
elif block.name in extra_dispatch:
out = await extra_dispatch[block.name](block)
else:
out = f"error: no dispatch for {block.name}"
print(f"[{name}] {block.name}({_snip(block.input)}) → {_snip(out)}")
results.append({"type": "tool_result", "tool_use_id": block.id, "content": out})
inbox = hub.drain(name)
for m in inbox:
print(f"[{name}] ← received from {m['from']}: {_snip(m['content'])}")
if results:
results[-1]["content"] += hub.render(inbox)
messages.append({"role": "user", "content": results})
hub.status[name] = "done"
return f"[{name} hit max_turns={max_turns}]"
except Exception:
hub.status[name] = "crashed"
raiseFixed N-Agent Team Examplepythontemplate
TEAM_SYSTEM = "You are {name}, one of 3 agents working together (peers: {peers})."
TASKS = {
"lead": "You are the lead. Introduce yourself to the others. Once everyone has introduced themselves, finish with a one-sentence summary of the team.",
"helper1": "You are a backend engineer named Ada. Introduce yourself to the others, then wait for their replies.",
"helper2": "You are a designer named Bo. Introduce yourself to the others, then wait for their replies.",
}
async def run_team() -> str:
hub = Hub()
names = list(TASKS)
for n in names:
hub.register(n)
helper_tasks = [
asyncio.create_task(
run_agent(
hub,
n,
system=TEAM_SYSTEM.format(name=n, peers=[p for p in names if p != n]),
first_user_turn=TASKS[n],
)
)
for n in names[1:]
]
try:
return await run_agent(
hub,
"lead",
system=TEAM_SYSTEM.format(name="lead", peers=names[1:]),
first_user_turn=TASKS["lead"],
)
finally:
for t in helper_tasks:
t.cancel()
await asyncio.gather(*helper_tasks, return_exceptions=True)Setup and Importspythonscript
import asyncio
import itertools
from collections import Counter, defaultdict
import anthropic
MODEL = "claude-opus-4-8" # swap for the newest Claude model available to you
client = anthropic.AsyncAnthropic() # reads ANTHROPIC_API_KEY from env
TRACE: dict[str, list[str]] = defaultdict(list)
def _snip(s, n=60):
s = str(s).replace("\n", " ")
return s if len(s) <= n else s[:n] + "…"
def print_trace():
for agent in sorted(TRACE):
counts = Counter(TRACE[agent])
print(f"{agent}: " + ", ".join(f"{n} × {t}" for t, n in counts.most_common()))
TRACE.clear()