Server Guide
Learn how to build production-ready A2A servers using the A2A Node SDK. This guide covers server setup, agent implementation, middleware, and deployment strategies.
Overview
The A2A Node SDK provides a complete server implementation for hosting A2A agents:
A2AServer
- Main server class with HTTP/WebSocket supportAgentExecutor
- Core agent execution logicTaskManager
- Task lifecycle managementRequestHandler
- Request processing pipelineEventQueue
- Asynchronous event processingContextMiddleware
- Request context managementPushService
- Real-time notificationsQueueManager
- Task queue management
Quick Start
Basic Server Setup
import { A2AServer, DefaultAgentExecutor } from '@dexwox-labs/a2a-node';
// Create an agent executor
const executor = new DefaultAgentExecutor({
async executeTask(task) {
console.log('Processing task:', task.name);
// Your agent logic here
const result = `Processed: ${task.name}`;
return {
status: 'completed',
result,
artifacts: [{
id: 'result-1',
type: 'text',
content: { text: result },
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString()
}]
};
}
});
// Create the server
const server = new A2AServer({
agentCard: {
id: 'my-agent',
name: 'My Agent',
description: 'A simple A2A agent',
capabilities: ['text-processing'],
endpoint: 'http://localhost:3000',
version: '1.0.0'
},
executor
});
// Start the server
await server.start(3000);
console.log('🚀 A2A server running on http://localhost:3000');
Agent Implementation
Custom Agent Executor
import { AgentExecutor, Task, TaskResult } from '@dexwox-labs/a2a-node';
class WeatherAgentExecutor implements AgentExecutor {
async executeTask(task: Task): Promise<TaskResult> {
const { name, input } = task;
switch (name) {
case 'get-weather':
return this.getWeather(input);
case 'get-forecast':
return this.getForecast(input);
default:
throw new Error(`Unknown task: ${name}`);
}
}
private async getWeather(input: any): Promise<TaskResult> {
const { location } = input;
// Simulate weather API call
const weatherData = await this.fetchWeatherData(location);
return {
status: 'completed',
result: weatherData,
artifacts: [{
id: `weather-${Date.now()}`,
type: 'json',
content: { data: weatherData },
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString()
}]
};
}
private async getForecast(input: any): Promise<TaskResult> {
const { location, days = 5 } = input;
// Simulate forecast API call
const forecastData = await this.fetchForecastData(location, days);
return {
status: 'completed',
result: forecastData,
artifacts: [{
id: `forecast-${Date.now()}`,
type: 'json',
content: { data: forecastData },
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString()
}]
};
}
private async fetchWeatherData(location: string) {
// Your weather API integration
return {
location,
temperature: 22,
condition: 'sunny',
humidity: 65
};
}
private async fetchForecastData(location: string, days: number) {
// Your forecast API integration
return {
location,
days,
forecast: Array.from({ length: days }, (_, i) => ({
date: new Date(Date.now() + i * 24 * 60 * 60 * 1000).toISOString(),
temperature: 20 + Math.random() * 10,
condition: ['sunny', 'cloudy', 'rainy'][Math.floor(Math.random() * 3)]
}))
};
}
}
Agent Card Configuration
const agentCard = {
id: 'weather-agent',
name: 'Weather Agent',
description: 'Provides weather information and forecasts',
version: '2.1.0',
capabilities: [
'weather-data',
'weather-forecast',
'location-based-services'
],
endpoint: 'https://weather-agent.example.com',
// Optional metadata
metadata: {
author: 'Weather Corp',
license: 'MIT',
homepage: 'https://weather-agent.example.com',
repository: 'https://github.com/weather-corp/weather-agent'
},
// Supported task types
supportedTasks: [
{
name: 'get-weather',
description: 'Get current weather for a location',
inputSchema: {
type: 'object',
properties: {
location: { type: 'string', description: 'Location name or coordinates' }
},
required: ['location']
}
},
{
name: 'get-forecast',
description: 'Get weather forecast for a location',
inputSchema: {
type: 'object',
properties: {
location: { type: 'string', description: 'Location name or coordinates' },
days: { type: 'number', minimum: 1, maximum: 14, default: 5 }
},
required: ['location']
}
}
]
};
Advanced Server Configuration
Complete Server Setup
import { A2AServer, DefaultRequestHandler } from '@dexwox-labs/a2a-node';
const server = new A2AServer({
agentCard,
executor: new WeatherAgentExecutor(),
// Server configuration
port: 3000,
host: '0.0.0.0',
// Request handler configuration
requestHandler: new DefaultRequestHandler({
maxRequestSize: '10mb',
timeout: 30000,
cors: {
origin: ['https://trusted-domain.com'],
credentials: true
}
}),
// Task manager configuration
taskManager: {
maxConcurrentTasks: 10,
taskTimeout: 300000, // 5 minutes
cleanupInterval: 60000, // 1 minute
retentionPeriod: 86400000 // 24 hours
},
// WebSocket configuration
websocket: {
enabled: true,
path: '/ws',
heartbeatInterval: 30000,
maxConnections: 100
},
// Security configuration
security: {
rateLimit: {
windowMs: 60000, // 1 minute
max: 100 // requests per window
},
authentication: {
enabled: true,
strategy: 'bearer', // or 'basic', 'custom'
verify: async (token) => {
// Your authentication logic
return verifyToken(token);
}
}
}
});
Middleware
Custom Middleware
import { RequestHandler, Request, Response, NextFunction } from '@dexwox-labs/a2a-node';
// Logging middleware
const loggingMiddleware = (req: Request, res: Response, next: NextFunction) => {
const start = Date.now();
res.on('finish', () => {
const duration = Date.now() - start;
console.log(`${req.method} ${req.path} - ${res.statusCode} (${duration}ms)`);
});
next();
};
// Authentication middleware
const authMiddleware = async (req: Request, res: Response, next: NextFunction) => {
const token = req.headers.authorization?.replace('Bearer ', '');
if (!token) {
return res.status(401).json({ error: 'Authentication required' });
}
try {
const user = await verifyToken(token);
req.user = user;
next();
} catch (error) {
return res.status(401).json({ error: 'Invalid token' });
}
};
// Rate limiting middleware
const rateLimitMiddleware = createRateLimit({
windowMs: 60000, // 1 minute
max: 100, // requests per window
message: 'Too many requests, please try again later'
});
// Apply middleware
server.use(loggingMiddleware);
server.use(authMiddleware);
server.use(rateLimitMiddleware);
Error Handling Middleware
const errorHandlingMiddleware = (error: Error, req: Request, res: Response, next: NextFunction) => {
console.error('Server error:', error);
if (error instanceof A2AError) {
return res.status(400).json({
error: {
code: error.code,
message: error.message
}
});
}
if (error instanceof ValidationError) {
return res.status(400).json({
error: {
code: 'VALIDATION_ERROR',
message: error.message,
details: error.details
}
});
}
// Generic error response
res.status(500).json({
error: {
code: 'INTERNAL_SERVER_ERROR',
message: 'An unexpected error occurred'
}
});
};
server.use(errorHandlingMiddleware);
Task Management
Custom Task Manager
import { TaskManager, Task, TaskTransition } from '@dexwox-labs/a2a-node';
class CustomTaskManager extends TaskManager {
private tasks = new Map<string, Task>();
private taskHistory = new Map<string, TaskTransition[]>();
async createTask(taskData: Partial<Task>): Promise<Task> {
const task: Task = {
id: generateTaskId(),
name: taskData.name!,
description: taskData.description,
status: 'submitted',
input: taskData.input,
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString()
};
this.tasks.set(task.id, task);
this.addTransition(task.id, 'submitted', 'Task created');
// Emit task created event
this.emit('taskCreated', task);
return task;
}
async updateTaskStatus(taskId: string, status: Task['status'], result?: any, error?: string): Promise<void> {
const task = this.tasks.get(taskId);
if (!task) {
throw new TaskNotFoundError(taskId);
}
const previousStatus = task.status;
task.status = status;
task.updatedAt = new Date().toISOString();
if (result) task.result = result;
if (error) task.error = error;
this.addTransition(taskId, status, `Status changed from ${previousStatus} to ${status}`);
// Emit task updated event
this.emit('taskUpdated', task);
}
private addTransition(taskId: string, status: Task['status'], description: string): void {
if (!this.taskHistory.has(taskId)) {
this.taskHistory.set(taskId, []);
}
const transitions = this.taskHistory.get(taskId)!;
transitions.push({
status,
description,
timestamp: new Date().toISOString()
});
}
}
Task Lifecycle Hooks
const executor = new DefaultAgentExecutor({
async executeTask(task: Task): Promise<TaskResult> {
// Pre-execution hook
await this.onTaskStart(task);
try {
// Execute task logic
const result = await this.processTask(task);
// Post-execution hook
await this.onTaskComplete(task, result);
return result;
} catch (error) {
// Error hook
await this.onTaskError(task, error);
throw error;
}
},
async onTaskStart(task: Task): Promise<void> {
console.log(`Starting task: ${task.id}`);
// Log to monitoring system
await this.logTaskEvent('start', task);
},
async onTaskComplete(task: Task, result: TaskResult): Promise<void> {
console.log(`Completed task: ${task.id}`);
// Log to monitoring system
await this.logTaskEvent('complete', task, result);
},
async onTaskError(task: Task, error: Error): Promise<void> {
console.error(`Task failed: ${task.id}`, error);
// Log to monitoring system
await this.logTaskEvent('error', task, null, error);
}
});
WebSocket Support
Real-time Communication
// Enable WebSocket support
const server = new A2AServer({
agentCard,
executor,
websocket: {
enabled: true,
path: '/ws',
heartbeatInterval: 30000
}
});
// Handle WebSocket connections
server.on('websocket:connection', (ws, req) => {
console.log('New WebSocket connection');
// Send welcome message
ws.send(JSON.stringify({
type: 'welcome',
message: 'Connected to A2A agent'
}));
// Handle incoming messages
ws.on('message', async (data) => {
try {
const message = JSON.parse(data.toString());
await handleWebSocketMessage(ws, message);
} catch (error) {
ws.send(JSON.stringify({
type: 'error',
message: 'Invalid message format'
}));
}
});
// Handle disconnection
ws.on('close', () => {
console.log('WebSocket connection closed');
});
});
async function handleWebSocketMessage(ws: WebSocket, message: any) {
switch (message.type) {
case 'task:create':
const task = await server.createTask(message.data);
ws.send(JSON.stringify({
type: 'task:created',
data: task
}));
break;
case 'task:status':
const taskStatus = await server.getTask(message.taskId);
ws.send(JSON.stringify({
type: 'task:status',
data: taskStatus
}));
break;
default:
ws.send(JSON.stringify({
type: 'error',
message: `Unknown message type: ${message.type}`
}));
}
}
Health Monitoring
Health Checks
// Add health check endpoint
server.get('/health', async (req, res) => {
const health = {
status: 'healthy',
timestamp: new Date().toISOString(),
uptime: process.uptime(),
memory: process.memoryUsage(),
tasks: {
active: await taskManager.getActiveTasks().length,
completed: await taskManager.getCompletedTasks().length
}
};
res.json(health);
});
// Add metrics endpoint
server.get('/metrics', async (req, res) => {
const metrics = {
requests_total: requestCounter.get(),
tasks_created_total: taskCreatedCounter.get(),
tasks_completed_total: taskCompletedCounter.get(),
tasks_failed_total: taskFailedCounter.get(),
response_time_seconds: responseTimeHistogram.get()
};
res.json(metrics);
});
Performance Monitoring
import { performance } from 'perf_hooks';
class PerformanceMonitor {
private metrics = new Map<string, number[]>();
startTimer(operation: string): () => void {
const start = performance.now();
return () => {
const duration = performance.now() - start;
this.recordMetric(operation, duration);
};
}
recordMetric(operation: string, value: number): void {
if (!this.metrics.has(operation)) {
this.metrics.set(operation, []);
}
const values = this.metrics.get(operation)!;
values.push(value);
// Keep only last 1000 measurements
if (values.length > 1000) {
values.shift();
}
}
getStats(operation: string) {
const values = this.metrics.get(operation) || [];
if (values.length === 0) return null;
const sorted = [...values].sort((a, b) => a - b);
return {
count: values.length,
min: sorted[0],
max: sorted[sorted.length - 1],
avg: values.reduce((a, b) => a + b, 0) / values.length,
p50: sorted[Math.floor(sorted.length * 0.5)],
p95: sorted[Math.floor(sorted.length * 0.95)],
p99: sorted[Math.floor(sorted.length * 0.99)]
};
}
}
const monitor = new PerformanceMonitor();
// Use in executor
const executor = new DefaultAgentExecutor({
async executeTask(task: Task): Promise<TaskResult> {
const endTimer = monitor.startTimer('task_execution');
try {
const result = await this.processTask(task);
return result;
} finally {
endTimer();
}
}
});
Deployment
Docker Deployment
# Dockerfile
FROM node:18-alpine
WORKDIR /app
# Copy package files
COPY package*.json ./
COPY pnpm-lock.yaml ./
# Install dependencies
RUN npm install -g pnpm
RUN pnpm install --frozen-lockfile
# Copy source code
COPY . .
# Build the application
RUN pnpm build
# Expose port
EXPOSE 3000
# Health check
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
CMD curl -f http://localhost:3000/health || exit 1
# Start the server
CMD ["pnpm", "start"]
Docker Compose
# docker-compose.yml
version: '3.8'
services:
a2a-agent:
build: .
ports:
- "3000:3000"
environment:
- NODE_ENV=production
- PORT=3000
- REDIS_URL=redis://redis:6379
- DATABASE_URL=postgresql://user:pass@postgres:5432/a2a
depends_on:
- redis
- postgres
restart: unless-stopped
redis:
image: redis:7-alpine
restart: unless-stopped
postgres:
image: postgres:15-alpine
environment:
- POSTGRES_DB=a2a
- POSTGRES_USER=user
- POSTGRES_PASSWORD=pass
volumes:
- postgres_data:/var/lib/postgresql/data
restart: unless-stopped
volumes:
postgres_data:
Kubernetes Deployment
# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: a2a-agent
spec:
replicas: 3
selector:
matchLabels:
app: a2a-agent
template:
metadata:
labels:
app: a2a-agent
spec:
containers:
- name: a2a-agent
image: your-registry/a2a-agent:latest
ports:
- containerPort: 3000
env:
- name: NODE_ENV
value: "production"
- name: PORT
value: "3000"
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 3000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health
port: 3000
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: a2a-agent-service
spec:
selector:
app: a2a-agent
ports:
- port: 80
targetPort: 3000
type: LoadBalancer
Best Practices
1. Error Handling
⚠️
Always implement comprehensive error handling to ensure your agent can gracefully handle failures.
class RobustAgentExecutor implements AgentExecutor {
async executeTask(task: Task): Promise<TaskResult> {
try {
// Validate input
this.validateTaskInput(task);
// Execute with timeout
const result = await Promise.race([
this.processTask(task),
this.createTimeoutPromise(300000) // 5 minutes
]);
return result;
} catch (error) {
// Log error for debugging
console.error(`Task ${task.id} failed:`, error);
// Return appropriate error response
if (error instanceof ValidationError) {
throw new TaskFailedError(task.id, 'Invalid input', error.details);
} else if (error instanceof TimeoutError) {
throw new TaskFailedError(task.id, 'Task timeout');
} else {
throw new TaskFailedError(task.id, 'Internal error');
}
}
}
}
2. Resource Management
// Implement resource cleanup
class ResourceAwareExecutor implements AgentExecutor {
private activeConnections = new Set<any>();
private tempFiles = new Set<string>();
async executeTask(task: Task): Promise<TaskResult> {
try {
const result = await this.processTask(task);
return result;
} finally {
// Always cleanup resources
await this.cleanup();
}
}
private async cleanup(): Promise<void> {
// Close connections
for (const connection of this.activeConnections) {
await connection.close();
}
this.activeConnections.clear();
// Delete temp files
for (const file of this.tempFiles) {
await fs.unlink(file).catch(() => {}); // Ignore errors
}
this.tempFiles.clear();
}
}
3. Configuration Management
// Environment-based configuration
const config = {
server: {
port: parseInt(process.env.PORT || '3000'),
host: process.env.HOST || '0.0.0.0'
},
agent: {
id: process.env.AGENT_ID || 'default-agent',
name: process.env.AGENT_NAME || 'Default Agent',
endpoint: process.env.AGENT_ENDPOINT || 'http://localhost:3000'
},
database: {
url: process.env.DATABASE_URL || 'sqlite:./data.db'
},
redis: {
url: process.env.REDIS_URL || 'redis://localhost:6379'
}
};
Next Steps
- Client Guide - Learn how to build A2A clients
- Protocol Specification - Understand the A2A protocol
- Examples - See real-world implementations
- API Reference - Complete API documentation