Skip to content

Environment

The environment capability brings RL-native interaction patterns to A2E — reset, step, observe, reward — enabling agents to interact with simulators, games, browser automation, or any stateful system through a standard env/step loop. Rewards from env interactions feed directly into the learn capability for on-policy adaptation.

Overview

The env capability provides a full RL environment interface — reset, step, observe, render, plan, and batch step. It follows the OpenAI Gym / PettingZoo paradigm, making A2E environments directly usable for reinforcement learning.

Protocol Messages (14 types)

Type StringModelDirection
env/reset/reqEnvResetRequestAgent → Host
env/reset/respEnvResetResponseHost → Agent
env/step/reqEnvStepRequestAgent → Host
env/step/respEnvStepResponseHost → Agent
env/observe/reqEnvObserveRequestAgent → Host
env/observe/respEnvObserveResponseHost → Agent
env/close/reqEnvCloseRequestAgent → Host
env/close/respEnvCloseResponseHost → Agent
env/spaces/reqEnvSpacesRequestAgent → Host
env/spaces/respEnvSpacesResponseHost → Agent
env/render/reqEnvRenderRequestAgent → Host
env/render/respEnvRenderResponseHost → Agent
env/plan/reqEnvPlanRequestAgent → Host
env/plan/respEnvPlanResponseHost → Agent
env/batch_step/reqEnvBatchStepRequestAgent → Host
env/batch_step/respEnvBatchStepResponseHost → Agent
env/state_pushEnvStatePushHost → Agent (server-initiated)

Core Models

EnvAction — What the agent does:

FieldTypeDescription
action_typestrAction identifier
payloaddictAction parameters
metadatadictExtra context

EnvObservation — What the agent sees:

FieldTypeDescription
episode_idstrCurrent episode
step_numintStep within episode
stateEnvStateCurrent environment state (flexible, extra="allow")
doneboolEpisode terminated
truncatedboolEpisode truncated (time limit)
rewardfloatStep reward
metadatadictExtra info

EnvStatePush — Server-initiated state delta:

FieldTypeDescription
state_deltadictIncremental state change
rewardfloatAssociated reward
terminalboolWhether environment terminated
event_typestrPush event type
reasonstrWhy this push occurred

EnvPlugin ABC

python
class EnvPlugin(A2EPlugin):
    name = "base_env"

    @abstractmethod
    def on_reset(self, seed=None, options=None) -> EnvState:
        """Reset environment, return initial state"""

    @abstractmethod
    def on_step(self, episode_id: str, action: EnvAction) -> tuple:
        """Returns (next_state, reward, done, info)"""

    @abstractmethod
    def on_close(self): ...

    # Implemented methods
    def reset(self, msg) -> EnvResetResponse: ...
    def step(self, msg) -> EnvStepResponse: ...
    def observe(self, msg) -> EnvObserveResponse: ...
    def close(self, msg) -> EnvCloseResponse: ...
    def spaces(self, msg) -> EnvSpacesResponse: ...    # Action/state schemas
    def render(self, msg) -> EnvRenderResponse: ...    # Screenshot, RGB, text
    def plan(self, msg) -> EnvPlanResponse: ...         # Affordances

    # Push support
    def set_push_callback(self, cb): ...
    def push(self, episode_id, step_id, action_id, event_type, delta): ...

The push() helper emits an EnvStatePush event via the executor's _send() path. It now guards against missing episode state — push() returns early if no episode is active.

Push event model:

python
EnvStatePush(
    episode_id=str,
    step_id=int,
    action_id=str,
    event_type=str,   # e.g. "observation", "reward", "termination"
    delta=dict,       # incremental state change
)

Episode management: The plugin tracks the current episode internally with _episode and _store (EpisodeStore). Episodes use a default_factory for UUID generation. Steps are persisted automatically.

EnvAPI (Client)

python
from a2e.caps.env.client import EnvAPI

env = EnvAPI(client)

# Reset environment
resp = env.reset(env_name="counter_env", seed=42, options={})
episode_id = resp.episode_id

# Step
resp = env.step(episode_id, EnvAction(action_type="inc", payload={}))
print(f"State: {resp.observation.state}, Reward: {resp.observation.reward}")

# Observe (without stepping)
obs = env.observe(episode_id)

# Check done
if env.is_done(resp.observation.done, resp.observation.truncated):
    print("Episode finished")

# Close
env.close(episode_id)

# Server-push support
env.on_push(lambda push: print(f"Push: {push.state_delta}"))

Under the hood, EnvAPI.__init__() calls client.register_push_handler("env/state_push", self._on_push) which routes incoming EnvStatePush messages to the registered callbacks. This decouples push handling from the RPC flow — pushes arrive as unsolicited server-initiated messages and are dispatched independently of any pending step/reset call.

EpisodeStore

Persistence for episode state:

ImplementationBackendSchema
SQLiteEpisodeStoreSQLiteepisodes table: episode_id PK, env_name, state JSON, done, step_count, created_at, updated_at

RL Loop Pattern

python
env = EnvAPI(client)
resp = env.reset(env_name="my_env")
episode_id = resp.episode_id

while True:
    action = select_action(resp.observation)  # Your policy
    resp = env.step(episode_id, action)
    if env.is_done(resp.observation.done, resp.observation.truncated):
        break
env.close(episode_id)

A2E Protocol v1.0 — Released under the MIT License.