Skip to content

Plugin System

A2E's host is intentionally thin — it loads, routes, and manages lifecycle, but knows nothing about tools, memory, or environments. All capability logic lives in dynamically loaded plugins, so you can add a custom memory backend, swap a tool executor, or deploy a proprietary feedback pipeline without forking the runtime or changing a single agent.

Overview

A2E is a plugin-centric runtime. The host (server) is a thin execution kernel that loads, routes, and manages lifecycle. All capability-specific logic lives in plugins that are dynamically loaded from configuration.

A2EPlugin Interface

Class-Level Attributes

AttributeTypeDefaultDescription
namestrUnique plugin instance name
typestrCapability type (matches A2ECapability enum)
priorityint0Higher = runs first in dispatch
exclusiveboolFalseIf True, only this plugin handles its message types

Lifecycle Methods

MethodPurpose
setup(host_instance, config)Called at load time. Receives host reference and config dict. Extracts audit_log and session_id from config.
supported_messages()Abstract. Returns Dict[str, Type[BaseModel]] mapping type strings to Pydantic model classes.
handle(message)Abstract. Processes a decoded message, returns response A2EMessage or None.
caps_metadata()Returns {name, type, priority, exclusive} for capability negotiation.
emit_event(event)Sends an async event to the client through the host executor. Standard path for all server-initiated events.
save_state(store, key, session_id)Serializes plugin state to SnapshotStore under key "plugin_name:key".
restore_state(store, key, session_id)Restores from SnapshotStore.
teardown()Lifecycle cleanup on shutdown.

Audit Integration

Every plugin handler should call audit_handle(msg, response, req_id, t0) after processing. This constructs an AuditEntry with:

  • Timing: duration_ms from t0
  • Byte sizes: input_bytes, output_bytes
  • Success/error: success bool, error_code if failed

Audit failures are caught and printed — they never crash the plugin.

Event Emission (Plugin → Client)

Plugins can send asynchronous events to the client at any time via emit_event():

python
# a2e/core/plugins/interface.py
class A2EPlugin(ABC):
    def emit_event(self, event: A2EMessage):
        """Send an async event to the client through the host executor."""
        host = getattr(self, 'host_instance', None)
        if host and hasattr(host, '_send'):
            host._send(event)

This is the standard path for all server-initiated events (streaming output, progress updates, partial results). It routes through the executor's _send() method, which encodes and delivers the event via the transport.

How it works

Client-side routing

The A2EClient._on_message() dispatches each incoming message through three paths in order:

  1. Pending RPC match — if req_id matches an in-flight RPC, the message is delivered to its result queue.
  2. Event tied to an active RPC — if req_id has registered event callbacks (via rpc(..., event_callback=fn)), each callback is invoked.
  3. Unsolicited push — if the message type has registered push handlers (via register_push_handler()), those handlers are called.

The third path enables capability APIs to subscribe to server-initiated messages that arrive outside any RPC context — for example EnvStatePush, ProcReadEvent, or MCPServerPush.

See Client API for the push handler API.

Plugin Configuration

yaml
plugins:
  - name: mytools          # Unique instance name
    type: tools             # Capability type
    cls: a2e.caps.tools.plugin.ToolPlugin  # Import path
    metadata:
      enabled: true
      priority: 0
      exclusive: false
      # ... plugin-specific config ...

PluginConfig fields:

FieldTypeDescription
namestrUnique instance name
typestrCapability type string
clsstrDot-path import string
metadataPluginMetaenabled, priority, exclusive + extra fields

Dynamic Loading

The executor loads plugins at startup via importlib:

python
# A2EServerRuntimeExecutor._load_plugins()
for plugin_config in config.plugins:
    if not plugin_config.metadata.enabled:
        continue
    module_path, class_name = plugin_config.cls.rsplit(".", 1)
    mod = importlib.import_module(module_path)
    cls = getattr(mod, class_name)
    plugin = cls()
    plugin.setup(self, plugin_config.metadata.model_dump())
    self._plugin_registry.register(plugin)

# Wire push callback for plugins that support async events
for name, plugin in self._plugin_registry.all():
    if hasattr(plugin, 'set_push_callback'):
        plugin.set_push_callback(self._send)

Type Registry Building

After loading, each plugin's supported_messages() populates two structures:

python
# type_registry: msg_type -> Pydantic model class
# type_to_plugins: msg_type -> sorted list of plugins (by priority)

This enables the executor to decode incoming NDJSON by looking up the model class, then route to the correct plugin(s).

Message Dispatch

  • Exclusive mode: Only the highest-priority plugin handles the message
  • Broadcast mode: All registered plugins handle the message, sorted by priority
  • Non-core messages are dispatched to a ThreadPoolExecutor for concurrent handling

Plugin Registries

RegistryPurposeKey
PluginRegistryName -> plugin instancePlugin name
CapabilityRegistryCapability string -> list of pluginsA2ECapability value

The CapabilityRegistry is used during handshake to match agent-requested capabilities against loaded plugins.

A2E Protocol v1.0 — Released under the MIT License.