Skip to content

Session Management

Every agent connection in A2E gets its own isolated session with a dedicated executor, transport channel, and plugin state — no shared mutable state, no cross-agent interference, no session leaks. This isolation is the foundation for running multiple agents (with different capabilities, models, and backends) on the same host without conflict.

Overview

Each agent connection gets its own Session with an isolated executor and transport. Sessions are created by SessionManager and persisted through the agent's lifecycle.

Session Lifecycle

Session Class

Each Session wraps:

  • A UUID session ID (auto-generated)
  • An A2EServerRuntimeExecutor (isolated message processing)
  • A DirectTransport (internal communication channel)
  • An asyncio.Queue for outbound messages (fed to SSE stream)

Key Methods

MethodPurpose
bind_transport(transport)Wires executor output to the outbound queue
stream()Async generator yielding from the outbound queue — used by the SSE endpoint
close()Tears down executor, stops transport

Transport Binding

When a session binds a transport, the executor's output is wired to an asyncio.Queue:

python
def bind_transport(self, transport):
    # Executor sends responses -> queue -> SSE stream
    transport.set_out_handler(lambda msg: self._outbound.put_nowait(msg))

This enables the HTTP SSE endpoint to yield responses as they arrive:

python
async def stream_endpoint(session_id):
    session = session_manager.get(session_id)
    async for msg in session.stream():
        yield f"data: {msg}\n\n"

SessionManager

Dict-based session storage with CRUD operations:

MethodReturnsDescription
create()SessionCreates new session with fresh DirectTransport and executor
get(session_id)Session or NoneRetrieves by UUID
delete(session_id)Removes and closes session

Session Creation (HTTP Mode)

Even in HTTP mode, the internal communication between FastAPI routes and the executor uses a DirectTransport:

Agent -> POST /send -> FastAPI route -> Session.transport.deliver(msg)
                                         -> Executor reads from transport
                                         -> Plugin.handle()
                                         -> Response -> Session.outbound queue
                                         -> SSE /stream -> Agent

Session Creation (Direct Mode)

Direct mode creates two paired DirectTransports at server startup:

python
# A2EServer._start_direct()
t_server = DirectTransport()
t_client = DirectTransport()
t_client.connect(t_server)

runtime = A2EServerRuntimeExecutor(config, transport=t_server)
runtime.start()

# Return t_client for the agent-side to use
return t_client

Concurrency

  • Each session's executor has its own ThreadPoolExecutor for non-core message dispatch
  • The asyncio.Queue for outbound messages is thread-safe
  • Multiple concurrent RPCs are supported within a session via request correlation (req_id)

A2E Protocol v1.0 — Released under the MIT License.