Toolkit Builder Plugin & Client Example
Overview
Toolkits are bundled collections of tools with shared configuration and schemas. While tools are individual stateless operations, toolkits group related tools together and provide a configuration interface (e.g., credentials, API keys, root directories). This cookbook covers:
- Plugin side: Writing a custom
ToolkitPluginthat registers a toolkit with its tool manifest and configuration schema - Client side: Listing toolkits, configuring them, and using their tools
Plugin Side: Database Toolkit Plugin
Below is a complete toolkit plugin that provides a PostgreSQL database toolkit with connection configuration and bundled query tools:
python
import json
import psycopg2
import psycopg2.extras
from a2e.core.plugins.interface import A2EPlugin
from a2e.caps.toolkits.protocol import (
ToolkitDefinition,
ToolkitListRequest, ToolkitListResponse,
ToolkitConfigureRequest, ToolkitConfigureResponse,
)
from a2e.caps.tools.protocol import (
ToolDefinition, ToolParameter,
ToolListRequest, ToolListResponse,
ToolCallRequest, ToolCallResponse,
ToolResult,
)
class DatabaseToolkitPlugin(A2EPlugin):
"""PostgreSQL database toolkit with configurable connection."""
name = "db_toolkit"
type = "toolkits"
priority = 5
def setup(self, host, config):
super().setup(host, config)
self._connections = {} # session_id -> psycopg2 connection
self._configured = {} # toolkit_name -> bool
# Define the toolkit
self._toolkits = {
"postgres": ToolkitDefinition(
name="postgres",
alias="pg",
description="PostgreSQL database toolkit — query, insert, update, and schema inspection",
category="database",
tags=["database", "sql", "postgres", "persistence"],
icon_svg=None,
schema={
"type": "object",
"properties": {
"host": {
"type": "string",
"description": "PostgreSQL server hostname",
"default": "localhost",
},
"port": {
"type": "integer",
"description": "PostgreSQL server port",
"default": 5432,
},
"database": {
"type": "string",
"description": "Database name",
},
"username": {
"type": "string",
"description": "Database username",
},
"password": {
"type": "string",
"description": "Database password",
},
"ssl_mode": {
"type": "string",
"description": "SSL mode: disable, require, verify-ca, verify-full",
"enum": ["disable", "require", "verify-ca", "verify-full"],
"default": "require",
},
"max_rows": {
"type": "integer",
"description": "Maximum rows returned per query",
"default": 1000,
},
},
"required": ["host", "database", "username", "password"],
},
tools=[
"db_query",
"db_execute",
"db_list_tables",
"db_describe_table",
],
configured=False,
version="1.0.0",
),
}
# Define the tools inside this toolkit
self._tool_defs = [
ToolDefinition(
name="db_query",
description="Execute a SELECT query and return results as JSON",
input_parameters=[
ToolParameter(name="sql", type="string", description="SQL SELECT query", required=True),
ToolParameter(name="params", type="array", description="Query parameters for parameterized queries", required=False),
],
output_parameters=[
ToolParameter(name="rows", type="array", description="Query result rows"),
ToolParameter(name="row_count", type="integer", description="Number of rows returned"),
],
streaming=False,
idempotent=True,
tags=["database", "sql", "read"],
version="1.0.0",
toolkit="postgres",
),
ToolDefinition(
name="db_execute",
description="Execute an INSERT/UPDATE/DELETE and return affected row count",
input_parameters=[
ToolParameter(name="sql", type="string", description="SQL statement (INSERT/UPDATE/DELETE)", required=True),
ToolParameter(name="params", type="array", description="Query parameters", required=False),
],
output_parameters=[
ToolParameter(name="affected", type="integer", description="Number of affected rows"),
],
streaming=False,
idempotent=False,
tags=["database", "sql", "write"],
version="1.0.0",
toolkit="postgres",
),
ToolDefinition(
name="db_list_tables",
description="List all tables in the connected database",
input_parameters=[],
output_parameters=[
ToolParameter(name="tables", type="array", description="List of table names"),
],
streaming=False,
idempotent=True,
tags=["database", "schema", "read"],
version="1.0.0",
toolkit="postgres",
),
ToolDefinition(
name="db_describe_table",
description="Get column definitions for a specific table",
input_parameters=[
ToolParameter(name="table_name", type="string", description="Table to describe", required=True),
],
output_parameters=[
ToolParameter(name="columns", type="array", description="Column definitions"),
],
streaming=False,
idempotent=True,
tags=["database", "schema", "read"],
version="1.0.0",
toolkit="postgres",
),
]
# --- Message routing ---
def supported_messages(self) -> dict[str, type]:
return {
"toolkit/list/req": ToolkitListRequest,
"toolkit/configure/req": ToolkitConfigureRequest,
"tool/list/req": ToolListRequest,
"tool/call/req": ToolCallRequest,
}
def handle(self, msg):
if isinstance(msg, ToolkitListRequest):
return self._list_toolkits(msg)
elif isinstance(msg, ToolkitConfigureRequest):
return self._configure_toolkit(msg)
elif isinstance(msg, ToolListRequest):
# Only return tools if toolkit is configured
if self._configured.get("postgres"):
return ToolListResponse(tools=self._tool_defs)
return ToolListResponse(tools=[])
elif isinstance(msg, ToolCallRequest):
return self._execute_tool(msg)
return None
# --- ToolkitPlugin ABC ---
def _list_toolkits(self, msg) -> ToolkitListResponse:
"""Return toolkit definitions with current configured status."""
toolkits = []
for name, tk in self._toolkits.items():
# Clone with updated configured status
updated = tk.model_copy(update={
"configured": self._configured.get(name, False),
})
toolkits.append(updated)
return ToolkitListResponse(toolkits=toolkits)
def _configure_toolkit(self, msg) -> ToolkitConfigureResponse:
"""Validate config against schema and establish DB connection."""
toolkit_name = msg.toolkit_name
config = msg.config
if toolkit_name not in self._toolkits:
return ToolkitConfigureResponse(
toolkit_name=toolkit_name,
status="error",
message=f"Unknown toolkit: {toolkit_name}",
)
# Validate required fields from schema
schema = self._toolkits[toolkit_name].schema
required = schema.get("required", [])
missing = [f for f in required if f not in config or not config[f]]
if missing:
return ToolkitConfigureResponse(
toolkit_name=toolkit_name,
status="error",
message=f"Missing required fields: {', '.join(missing)}",
)
# Validate enum fields
properties = schema.get("properties", {})
for field, value in config.items():
prop = properties.get(field, {})
if "enum" in prop and value not in prop["enum"]:
return ToolkitConfigureResponse(
toolkit_name=toolkit_name,
status="error",
message=f"Invalid value for '{field}': {value}. Allowed: {prop['enum']}",
)
# Try to connect
try:
conn = psycopg2.connect(
host=config["host"],
port=config.get("port", 5432),
dbname=config["database"],
user=config["username"],
password=config["password"],
sslmode=config.get("ssl_mode", "require"),
)
conn.autocommit = True
# Store connection
self._connections[msg.session_id] = conn
self._configured[toolkit_name] = True
# Save config for tool execution (minus password)
self._db_config = {
k: v for k, v in config.items()
if k != "password"
}
self._max_rows = config.get("max_rows", 1000)
return ToolkitConfigureResponse(
toolkit_name=toolkit_name,
status="ok",
message="Database toolkit configured and connected successfully",
)
except psycopg2.Error as exc:
return ToolkitConfigureResponse(
toolkit_name=toolkit_name,
status="error",
message=f"Connection failed: {exc}",
)
# --- Tool execution ---
def _execute_tool(self, msg: ToolCallRequest) -> ToolCallResponse:
conn = self._connections.get(msg.session_id)
if not conn:
result = ToolResult(
success=False,
tool_name=msg.tool_name,
error="Database not configured — call toolkit/configure first",
error_code="TOOL_ERROR",
duration_ms=0,
)
return ToolCallResponse(data=result)
import time
t0 = time.time()
try:
if msg.tool_name == "db_query":
data = self._db_query(conn, msg.arguments)
elif msg.tool_name == "db_execute":
data = self._db_execute(conn, msg.arguments)
elif msg.tool_name == "db_list_tables":
data = self._db_list_tables(conn)
elif msg.tool_name == "db_describe_table":
data = self._db_describe_table(conn, msg.arguments)
else:
raise ValueError(f"Unknown tool: {msg.tool_name}")
duration_ms = int((time.time() - t0) * 1000)
result = ToolResult(
success=True,
tool_name=msg.tool_name,
data=data,
duration_ms=duration_ms,
)
return ToolCallResponse(data=result)
except Exception as exc:
duration_ms = int((time.time() - t0) * 1000)
result = ToolResult(
success=False,
tool_name=msg.tool_name,
error=str(exc),
error_code="TOOL_ERROR",
duration_ms=duration_ms,
)
return ToolCallResponse(data=result)
def _db_query(self, conn, args):
sql = args["sql"]
params = args.get("params")
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(sql, params)
rows = cur.fetchmany(self._max_rows)
return {
"rows": [dict(r) for r in rows],
"row_count": len(rows),
"truncated": cur.rowcount > self._max_rows,
}
def _db_execute(self, conn, args):
sql = args["sql"]
params = args.get("params")
with conn.cursor() as cur:
cur.execute(sql, params)
return {"affected": cur.rowcount}
def _db_list_tables(self, conn):
with conn.cursor() as cur:
cur.execute("""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public'
ORDER BY table_name
""")
return {"tables": [r[0] for r in cur.fetchall()]}
def _db_describe_table(self, conn, args):
table = args["table_name"]
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute("""
SELECT column_name, data_type, is_nullable, column_default
FROM information_schema.columns
WHERE table_name = %s AND table_schema = 'public'
ORDER BY ordinal_position
""", (table,))
return {"columns": [dict(r) for r in cur.fetchall()]}
# --- Lifecycle ---
def teardown(self):
for conn in self._connections.values():
try:
conn.close()
except Exception:
pass
self._connections.clear()
# --- State persistence ---
def save_state(self, store, key, session_id):
# Save configuration (not password), not live connections
if hasattr(self, "_db_config"):
store.save(f"{self.name}:{key}", {
"config": self._db_config,
"configured": self._configured,
})
def restore_state(self, store, key, session_id):
state = store.load(f"{self.name}:{key}")
if state:
self._configured = state.get("configured", {})
def clear_state(self, store, key, session_id):
self.teardown()
self._configured = {}
store.clear(f"{self.name}:{key}")Register in Config
yaml
plugins:
- name: db_toolkit
type: toolkits
cls: my_package.db_toolkit.DatabaseToolkitPlugin
metadata:
enabled: true
priority: 5Client Side: Toolkit Discovery and Usage
python
import logging
from a2e.schema import A2EHostConfig
from a2e.core.server.server import A2EServer
from a2e.core.client.client import A2EClient
from a2e.caps.toolkits.client import ToolkitAPI
from a2e.caps.tools.client import ToolAPI
logger = logging.getLogger("toolkit-agent")
config = A2EHostConfig.from_yaml("config.yaml")
server = A2EServer(config)
transport = server.start()
client = A2EClient(transport, logger, agent_caps=["toolkits", "tools"])
client.connect()
toolkits = ToolkitAPI(client)
tools = ToolAPI(client)
# ============================================================
# 1. List available toolkits
# ============================================================
kit_list = toolkits.list()
for kit in kit_list:
status = "configured" if kit.configured else "not configured"
print(f" {kit.name} (v{kit.version}) [{status}]: {kit.description}")
print(f" Category: {kit.category}, Tags: {kit.tags}")
print(f" Tools: {kit.tools}")
print(f" Schema: {json.dumps(kit.schema, indent=2)}")
# Find a specific toolkit by name
pg = toolkits.get("postgres")
if pg:
print(f"Found: {pg.name} — {pg.description}")
print(f"Required config fields: {pg.schema.get('required', [])}")
# ============================================================
# 2. Configure a toolkit
# ============================================================
# Full configuration with all required fields
resp = toolkits.configure("postgres", {
"host": "db.example.com",
"port": 5432,
"database": "analytics",
"username": "agent_user",
"password": "secret123",
"ssl_mode": "require",
"max_rows": 500,
})
print(f"Configure status: {resp.status}")
if resp.status == "ok":
print(f"Message: {resp.message}")
else:
print(f"Error: {resp.message}")
# Example error: "Missing required fields: username, password"
# ============================================================
# 3. Idempotent configuration
# ============================================================
# ensure() only configures if not already configured — safe to call repeatedly
resp = toolkits.ensure("postgres", {
"host": "db.example.com",
"database": "analytics",
"username": "agent_user",
"password": "secret123",
})
# First call: configures and connects
# Second call: skips (already configured)
# ============================================================
# 4. Use toolkit tools after configuration
# ============================================================
# Once configured, the toolkit's tools appear in tool/list
tool_list = tools.list()
pg_tools = [t for t in tool_list if t.toolkit == "postgres"]
print(f"PostgreSQL tools: {[t.name for t in pg_tools]}")
# List tables
result = tools.call("db_list_tables", {})
if result.success:
for table in result.data["tables"]:
print(f" Table: {table}")
# Describe a table
result = tools.call("db_describe_table", {"table_name": "users"})
if result.success:
for col in result.data["columns"]:
print(f" {col['column_name']}: {col['data_type']} "
f"({'NULL' if col['is_nullable'] == 'YES' else 'NOT NULL'})")
# Run a query
result = tools.call("db_query", {
"sql": "SELECT id, name, email FROM users WHERE active = %s LIMIT 10",
"params": [True],
})
if result.success:
print(f"Rows: {result.data['row_count']}")
for row in result.data["rows"]:
print(f" {row['id']}: {row['name']} <{row['email']}>")
# Execute a mutation
result = tools.call("db_execute", {
"sql": "UPDATE users SET last_login = NOW() WHERE id = %s",
"params": [42],
})
if result.success:
print(f"Affected rows: {result.data['affected']}")
# ============================================================
# 5. Error handling
# ============================================================
# Configure with invalid credentials
resp = toolkits.configure("postgres", {
"host": "db.example.com",
"database": "analytics",
"username": "wrong_user",
"password": "wrong_pass",
})
# resp.status == "error", resp.message contains connection error
# Try to use tools without configuring
# (Requires a fresh session where toolkit is not configured)
result = tools.call("db_query", {"sql": "SELECT 1"})
# result.success == False, result.error == "Database not configured"
# Query with invalid SQL
result = tools.call("db_query", {"sql": "SELECTT * FROM users"})
# result.success == False, result.error contains syntax error
# ============================================================
# 6. Schema-driven client-side validation
# ============================================================
# Use toolkit.schema to validate config before sending
pg = toolkits.get("postgres")
required_fields = pg.schema.get("required", [])
properties = pg.schema.get("properties", {})
config_candidate = {
"host": "db.example.com",
"database": "analytics",
# Oops, forgot username and password
}
# Client-side validation
missing = [f for f in required_fields if f not in config_candidate]
if missing:
print(f"Cannot configure: missing {missing}")
# Don't send the request — fix config first
# Validate enum fields
for field, value in config_candidate.items():
prop = properties.get(field, {})
if "enum" in prop and value not in prop["enum"]:
print(f"Invalid '{field}': {value}. Allowed: {prop['enum']}")
# ============================================================
# 7. Filtering toolkits by tags and category
# ============================================================
# List all database toolkits
db_kits = toolkits.list(filter_tags=["database"])
for kit in db_kits:
print(f"Database toolkit: {kit.name}")
client.disconnect()Schema Design Patterns
The schema field on ToolkitDefinition is a standard JSON Schema object. Common patterns:
Simple key-value config
python
schema = {
"type": "object",
"properties": {
"api_key": {"type": "string", "description": "API key"},
"base_url": {"type": "string", "description": "API base URL", "default": "https://api.example.com"},
},
"required": ["api_key"],
}Enum-based configuration
python
schema = {
"type": "object",
"properties": {
"region": {
"type": "string",
"description": "Cloud region",
"enum": ["us-east-1", "eu-west-1", "ap-south-1"],
},
"tier": {
"type": "string",
"description": "Service tier",
"enum": ["free", "pro", "enterprise"],
"default": "free",
},
},
"required": ["region"],
}Nested configuration
python
schema = {
"type": "object",
"properties": {
"connection": {
"type": "object",
"description": "Connection settings",
"properties": {
"host": {"type": "string"},
"port": {"type": "integer", "default": 443},
},
"required": ["host"],
},
"auth": {
"type": "object",
"description": "Authentication settings",
"properties": {
"type": {"type": "string", "enum": ["api_key", "oauth", "basic"]},
"token": {"type": "string"},
},
"required": ["type", "token"],
},
},
"required": ["connection", "auth"],
}Relationship to Other Capabilities
ToolkitPlugin ToolPlugin
| |
v v
ToolkitDefinition ToolDefinition
- name: "postgres" - name: "db_query"
- tools: ["db_query", ...] - toolkit: "postgres"
- schema: { JSON Schema } - input_parameters: [...]
- output_parameters: [...]
|
v
ToolkitAPI (client) ToolAPI (client)
- list() - list()
- configure() - call()
- ensure() - call()
- get()- Toolkits → Tools: Once a toolkit is configured, its tools appear in
tool/list/respwith thetoolkitfield set. - MCP: MCP server tools are registered as plain tools (not toolkits), since MCP manages its own configuration.
- Skills: Skills may reference toolkits in their
toolkitsfield to declare dependencies. - Memory: Toolkit configuration can be persisted via
save_state/restore_state.
Key Patterns
| Pattern | When to Use |
|---|---|
toolkits.list() | Discover available toolkits |
toolkits.get(name) | Look up a specific toolkit |
toolkits.configure(name, config) | Initialize a toolkit with credentials |
toolkits.ensure(name, config) | Idempotent configure (skip if already done) |
tools.call(toolkit_tool, args) | Use toolkit tools after configuration |
Validate config against schema | Client-side validation before sending |
Tips
- Validate config early: Use
schema.requiredandschema.properties.enumon the client side to catch errors before network round-trips. - Mark configured = False on startup: The plugin should report
configured=Falseuntiltoolkit/configure/reqsucceeds. - Don't store passwords in state:
save_stateshould persist config minus secrets; re-authenticate onrestore_state. - Use ensure() for idempotent setup: Prevents duplicate configuration calls in agent loops.
- Set max_rows / rate limits: Prevent runaway queries from exhausting resources.
- Schema drives UI: The JSON Schema in
ToolkitDefinition.schemais designed for client-side form generation — usedescription,default, andenumto provide rich metadata.