Skip to content

Chain Pipeline Example

A complete example of building and running a multi-step chain pipeline.

Data Processing Chain

python
import logging
from a2e.schema import A2EHostConfig
from a2e.core.server.server import A2EServer
from a2e.core.client.client import A2EClient
from a2e.caps.chains.client import ChainsAPI

logger = logging.getLogger("chain-agent")

# Setup (direct mode for simplicity)
config = A2EHostConfig.from_yaml("config.yaml")
server = A2EServer(config)
transport = server.start()

client = A2EClient(transport, logger, agent_caps=["chains", "tools", "skills"])
client.connect()

chains = ChainsAPI(client)

# Define a data processing pipeline
nodes = [
    # Step 1: Read input file
    {
        "node_id": "read",
        "kind": "tool",
        "name": "read_file",
        "input": {"path": "/data/raw.json"},
        "next_node": "parse"
    },
    # Step 2: Parse and validate
    {
        "node_id": "parse",
        "kind": "skill",
        "name": "json-parser",
        "input_map": {"raw_data": "read.data"},
        "on_error": "abort",
        "next_node": "transform"
    },
    # Step 3: Transform (branch based on data type)
    {
        "node_id": "transform",
        "kind": "branch",
        "condition": "parse.data_type == 'numeric'",
        "true_node": "numeric_transform",
        "false_node": "text_transform"
    },
    # Branch A: Numeric processing
    {
        "node_id": "numeric_transform",
        "kind": "tool",
        "name": "compute_stats",
        "input_map": {"values": "parse.values"},
        "next_node": "write_output"
    },
    # Branch B: Text processing
    {
        "node_id": "text_transform",
        "kind": "skill",
        "name": "text-summarizer",
        "input_map": {"text": "parse.text"},
        "next_node": "write_output"
    },
    # Step 4: Write results
    {
        "node_id": "write_output",
        "kind": "tool",
        "name": "write_file",
        "input_map": {
            "content": "numeric_transform.stats OR text_transform.summary",
            "path": "'/data/output.json'"
        },
    }
]

# Run with streaming
def on_event(event):
    print(f"[{event.node_id}] {event.phase}: "
          f"{event.output if event.output else event.error or ''}")

result = chains.run(
    nodes=nodes,
    entry_node="read",
    initial_input={},
    streaming=True,
    on_event=on_event,
    timeout=120.0
)

print(f"Chain completed: success={result.success}")
print(f"Nodes run: {result.nodes_run} in {result.duration_ms}ms")
print(f"Final output: {result.final_output}")

client.disconnect()

Map (Fan-Out) Pattern

Process multiple items in parallel:

python
nodes = [
    # Read list of files
    {
        "node_id": "list_files",
        "kind": "tool",
        "name": "glob",
        "input": {"pattern": "/data/*.json"},
        "next_node": "process_each"
    },
    # Map: process each file
    {
        "node_id": "process_each",
        "kind": "map",
        "items_path": "list_files.files",  # JMESPath to array
        "map_node": "analyze_file",
        "next_node": "aggregate"
    },
    # Per-item processing
    {
        "node_id": "analyze_file",
        "kind": "skill",
        "name": "file-analyzer",
        "input_map": {"file_path": "_item.path"}
    },
    # Aggregate results
    {
        "node_id": "aggregate",
        "kind": "tool",
        "name": "merge_results",
        "input_map": {"results": "process_each"}
    }
]

JMESPath Expressions

input_map values use JMESPath to reference outputs from prior nodes:

ExpressionMeaning
"read.data"Output of node "read", field "data"
"parse.values"Nested field access
"'/fixed/path'"Literal string (quoted)
"numeric_transform.stats OR text_transform.summary"First non-null
"_item.path"Current item in map iteration

A2E Protocol v1.0 — Released under the MIT License.