Skip to content

Chains

Chains enable DAG-based multi-step orchestration — fan-out across tools, merge results, branch on conditions, loop until a threshold is met. Where a single tool call is a step, a chain is a recipe: define the graph of operations once, then execute it as a single chain/execute request with structured inputs and outputs.

Overview

Chains enable DAG (Directed Acyclic Graph) pipeline execution — multi-step compositions of tools, skills, and processes with branching, parallel fan-out, and error handling.

Protocol Messages (3 types)

Type StringModelDirection
chain/reqChainRequestAgent → Host
chain/respChainResponseHost → Agent
chain/eventChainEventHost → Agent (streaming)

ChainNode

FieldTypeDescription
node_idstrUnique node identifier
kindstrskill, tool, branch, or map
namestrSkill/tool/proc name to invoke
inputdictStatic input
input_mapdictJMESPath expressions to resolve from prior outputs
conditionstrBoolean expression for branch nodes
true_nodestrNext node if condition is true
false_nodestrNext node if condition is false
items_pathstrJMESPath to array for map fan-out
map_nodestrNode to run for each item
next_nodestrDefault next node
on_errorstrabort, skip, or a node_id to jump to
depslist[str]Dependency node IDs

ChainRequest

FieldTypeDescription
session_idstrSession identifier
chain_idstrChain identifier
nodeslist[ChainNode]All nodes in the DAG
entry_nodestrStarting node ID
initial_inputdictInput to the chain
correlation_idstrCorrelation ID
streamingboolEnable streaming events
timeoutfloatTotal chain timeout

ChainEvent (extends A2EEvent)

FieldTypeDescription
node_idstrWhich node
phasestrstart, done, skip, or error
outputdictNode output (on done)
errorstrError message (on error)

ChainResponse

FieldTypeDescription
chain_idstrChain identifier
successboolAll nodes succeeded
outputsdictnode_id → output mapping
final_outputdictOutput of terminal nodes
duration_msintTotal execution time
nodes_runintNumber of nodes executed
errorstrError if chain failed

ChainPlugin (Concrete DAG Executor)

Unlike most plugins, ChainPlugin is a concrete implementation with a full DAG scheduler:

python
class ChainPlugin(A2EPlugin):
    name = "chain"
    priority = 10

    def _run_chain(self, req):
        # 1. Build node lookup dict
        # 2. Track completed/running/failed sets
        # 3. Scheduler loop:
        #    a. Find runnable nodes (dependencies met)
        #    b. Spawn threads for each
        #    c. Join all
        #    d. Repeat until no more runnable or all done

    def can_run(self, node):
        # Check if all dependency nodes are completed

    def resolve_input(self, node):
        # Merge static input with dependency outputs via JMESPath

    def run_node(self, node):
        # Dispatch by node.kind:
        #   "tool"  -> _run_tool(name, inp)  via host.tool_registry
        #   "skill" -> _run_skill(name, inp)  via SkillPlugin
        #   "proc"  -> _run_proc(name, inp)   via ProcPlugin

Terminal nodes are identified as nodes that no other node depends on. Their output becomes final_output.

ChainsAPI (Client)

python
from a2e.caps.chains.client import ChainsAPI

chains = ChainsAPI(client)

# Define a chain
nodes = [
    {"node_id": "read", "kind": "tool", "name": "read_file",
     "input": {"path": "/data/input.txt"}, "next_node": "analyze"},
    {"node_id": "analyze", "kind": "skill", "name": "analysis",
     "input_map": {"data": "read.data"}, "next_node": "write"},
    {"node_id": "write", "kind": "tool", "name": "write_file",
     "input_map": {"content": "analyze.data", "path": "'/data/output.txt'"}},
]

result = chains.run(
    nodes=nodes,
    entry_node="read",
    initial_input={},
    streaming=True,
    on_event=lambda e: print(f"Node {e.node_id}: {e.phase}"),
    timeout=60.0
)

print(f"Success: {result.success}, Output: {result.final_output}")
print(f"Nodes run: {result.nodes_run} in {result.duration_ms}ms")

ChainResult (Client-Side)

The client wraps the response in a ChainResult model that also collects streaming events for full observability.

A2E Protocol v1.0 — Released under the MIT License.