DocumentationServer Guide

Server Guide

Learn how to build production-ready A2A servers using the A2A Node SDK. This guide covers server setup, agent implementation, middleware, and deployment strategies.

Overview

The A2A Node SDK provides a complete server implementation for hosting A2A agents:

  • A2AServer - Main server class with HTTP/WebSocket support
  • AgentExecutor - Core agent execution logic
  • TaskManager - Task lifecycle management
  • RequestHandler - Request processing pipeline
  • EventQueue - Asynchronous event processing
  • ContextMiddleware - Request context management
  • PushService - Real-time notifications
  • QueueManager - Task queue management

Quick Start

Basic Server Setup

import { A2AServer, DefaultAgentExecutor } from '@dexwox-labs/a2a-node';
 
// Create an agent executor
const executor = new DefaultAgentExecutor({
  async executeTask(task) {
    console.log('Processing task:', task.name);
    
    // Your agent logic here
    const result = `Processed: ${task.name}`;
    
    return {
      status: 'completed',
      result,
      artifacts: [{
        id: 'result-1',
        type: 'text',
        content: { text: result },
        createdAt: new Date().toISOString(),
        updatedAt: new Date().toISOString()
      }]
    };
  }
});
 
// Create the server
const server = new A2AServer({
  agentCard: {
    id: 'my-agent',
    name: 'My Agent',
    description: 'A simple A2A agent',
    capabilities: ['text-processing'],
    endpoint: 'http://localhost:3000',
    version: '1.0.0'
  },
  executor
});
 
// Start the server
await server.start(3000);
console.log('🚀 A2A server running on http://localhost:3000');

Agent Implementation

Custom Agent Executor

import { AgentExecutor, Task, TaskResult } from '@dexwox-labs/a2a-node';
 
class WeatherAgentExecutor implements AgentExecutor {
  async executeTask(task: Task): Promise<TaskResult> {
    const { name, input } = task;
    
    switch (name) {
      case 'get-weather':
        return this.getWeather(input);
      case 'get-forecast':
        return this.getForecast(input);
      default:
        throw new Error(`Unknown task: ${name}`);
    }
  }
  
  private async getWeather(input: any): Promise<TaskResult> {
    const { location } = input;
    
    // Simulate weather API call
    const weatherData = await this.fetchWeatherData(location);
    
    return {
      status: 'completed',
      result: weatherData,
      artifacts: [{
        id: `weather-${Date.now()}`,
        type: 'json',
        content: { data: weatherData },
        createdAt: new Date().toISOString(),
        updatedAt: new Date().toISOString()
      }]
    };
  }
  
  private async getForecast(input: any): Promise<TaskResult> {
    const { location, days = 5 } = input;
    
    // Simulate forecast API call
    const forecastData = await this.fetchForecastData(location, days);
    
    return {
      status: 'completed',
      result: forecastData,
      artifacts: [{
        id: `forecast-${Date.now()}`,
        type: 'json',
        content: { data: forecastData },
        createdAt: new Date().toISOString(),
        updatedAt: new Date().toISOString()
      }]
    };
  }
  
  private async fetchWeatherData(location: string) {
    // Your weather API integration
    return {
      location,
      temperature: 22,
      condition: 'sunny',
      humidity: 65
    };
  }
  
  private async fetchForecastData(location: string, days: number) {
    // Your forecast API integration
    return {
      location,
      days,
      forecast: Array.from({ length: days }, (_, i) => ({
        date: new Date(Date.now() + i * 24 * 60 * 60 * 1000).toISOString(),
        temperature: 20 + Math.random() * 10,
        condition: ['sunny', 'cloudy', 'rainy'][Math.floor(Math.random() * 3)]
      }))
    };
  }
}

Agent Card Configuration

const agentCard = {
  id: 'weather-agent',
  name: 'Weather Agent',
  description: 'Provides weather information and forecasts',
  version: '2.1.0',
  capabilities: [
    'weather-data',
    'weather-forecast',
    'location-based-services'
  ],
  endpoint: 'https://weather-agent.example.com',
  
  // Optional metadata
  metadata: {
    author: 'Weather Corp',
    license: 'MIT',
    homepage: 'https://weather-agent.example.com',
    repository: 'https://github.com/weather-corp/weather-agent'
  },
  
  // Supported task types
  supportedTasks: [
    {
      name: 'get-weather',
      description: 'Get current weather for a location',
      inputSchema: {
        type: 'object',
        properties: {
          location: { type: 'string', description: 'Location name or coordinates' }
        },
        required: ['location']
      }
    },
    {
      name: 'get-forecast',
      description: 'Get weather forecast for a location',
      inputSchema: {
        type: 'object',
        properties: {
          location: { type: 'string', description: 'Location name or coordinates' },
          days: { type: 'number', minimum: 1, maximum: 14, default: 5 }
        },
        required: ['location']
      }
    }
  ]
};

Advanced Server Configuration

Complete Server Setup

import { A2AServer, DefaultRequestHandler } from '@dexwox-labs/a2a-node';
 
const server = new A2AServer({
  agentCard,
  executor: new WeatherAgentExecutor(),
  
  // Server configuration
  port: 3000,
  host: '0.0.0.0',
  
  // Request handler configuration
  requestHandler: new DefaultRequestHandler({
    maxRequestSize: '10mb',
    timeout: 30000,
    cors: {
      origin: ['https://trusted-domain.com'],
      credentials: true
    }
  }),
  
  // Task manager configuration
  taskManager: {
    maxConcurrentTasks: 10,
    taskTimeout: 300000, // 5 minutes
    cleanupInterval: 60000, // 1 minute
    retentionPeriod: 86400000 // 24 hours
  },
  
  // WebSocket configuration
  websocket: {
    enabled: true,
    path: '/ws',
    heartbeatInterval: 30000,
    maxConnections: 100
  },
  
  // Security configuration
  security: {
    rateLimit: {
      windowMs: 60000, // 1 minute
      max: 100 // requests per window
    },
    authentication: {
      enabled: true,
      strategy: 'bearer', // or 'basic', 'custom'
      verify: async (token) => {
        // Your authentication logic
        return verifyToken(token);
      }
    }
  }
});

Middleware

Custom Middleware

import { RequestHandler, Request, Response, NextFunction } from '@dexwox-labs/a2a-node';
 
// Logging middleware
const loggingMiddleware = (req: Request, res: Response, next: NextFunction) => {
  const start = Date.now();
  
  res.on('finish', () => {
    const duration = Date.now() - start;
    console.log(`${req.method} ${req.path} - ${res.statusCode} (${duration}ms)`);
  });
  
  next();
};
 
// Authentication middleware
const authMiddleware = async (req: Request, res: Response, next: NextFunction) => {
  const token = req.headers.authorization?.replace('Bearer ', '');
  
  if (!token) {
    return res.status(401).json({ error: 'Authentication required' });
  }
  
  try {
    const user = await verifyToken(token);
    req.user = user;
    next();
  } catch (error) {
    return res.status(401).json({ error: 'Invalid token' });
  }
};
 
// Rate limiting middleware
const rateLimitMiddleware = createRateLimit({
  windowMs: 60000, // 1 minute
  max: 100, // requests per window
  message: 'Too many requests, please try again later'
});
 
// Apply middleware
server.use(loggingMiddleware);
server.use(authMiddleware);
server.use(rateLimitMiddleware);

Error Handling Middleware

const errorHandlingMiddleware = (error: Error, req: Request, res: Response, next: NextFunction) => {
  console.error('Server error:', error);
  
  if (error instanceof A2AError) {
    return res.status(400).json({
      error: {
        code: error.code,
        message: error.message
      }
    });
  }
  
  if (error instanceof ValidationError) {
    return res.status(400).json({
      error: {
        code: 'VALIDATION_ERROR',
        message: error.message,
        details: error.details
      }
    });
  }
  
  // Generic error response
  res.status(500).json({
    error: {
      code: 'INTERNAL_SERVER_ERROR',
      message: 'An unexpected error occurred'
    }
  });
};
 
server.use(errorHandlingMiddleware);

Task Management

Custom Task Manager

import { TaskManager, Task, TaskTransition } from '@dexwox-labs/a2a-node';
 
class CustomTaskManager extends TaskManager {
  private tasks = new Map<string, Task>();
  private taskHistory = new Map<string, TaskTransition[]>();
  
  async createTask(taskData: Partial<Task>): Promise<Task> {
    const task: Task = {
      id: generateTaskId(),
      name: taskData.name!,
      description: taskData.description,
      status: 'submitted',
      input: taskData.input,
      createdAt: new Date().toISOString(),
      updatedAt: new Date().toISOString()
    };
    
    this.tasks.set(task.id, task);
    this.addTransition(task.id, 'submitted', 'Task created');
    
    // Emit task created event
    this.emit('taskCreated', task);
    
    return task;
  }
  
  async updateTaskStatus(taskId: string, status: Task['status'], result?: any, error?: string): Promise<void> {
    const task = this.tasks.get(taskId);
    if (!task) {
      throw new TaskNotFoundError(taskId);
    }
    
    const previousStatus = task.status;
    task.status = status;
    task.updatedAt = new Date().toISOString();
    
    if (result) task.result = result;
    if (error) task.error = error;
    
    this.addTransition(taskId, status, `Status changed from ${previousStatus} to ${status}`);
    
    // Emit task updated event
    this.emit('taskUpdated', task);
  }
  
  private addTransition(taskId: string, status: Task['status'], description: string): void {
    if (!this.taskHistory.has(taskId)) {
      this.taskHistory.set(taskId, []);
    }
    
    const transitions = this.taskHistory.get(taskId)!;
    transitions.push({
      status,
      description,
      timestamp: new Date().toISOString()
    });
  }
}

Task Lifecycle Hooks

const executor = new DefaultAgentExecutor({
  async executeTask(task: Task): Promise<TaskResult> {
    // Pre-execution hook
    await this.onTaskStart(task);
    
    try {
      // Execute task logic
      const result = await this.processTask(task);
      
      // Post-execution hook
      await this.onTaskComplete(task, result);
      
      return result;
    } catch (error) {
      // Error hook
      await this.onTaskError(task, error);
      throw error;
    }
  },
  
  async onTaskStart(task: Task): Promise<void> {
    console.log(`Starting task: ${task.id}`);
    // Log to monitoring system
    await this.logTaskEvent('start', task);
  },
  
  async onTaskComplete(task: Task, result: TaskResult): Promise<void> {
    console.log(`Completed task: ${task.id}`);
    // Log to monitoring system
    await this.logTaskEvent('complete', task, result);
  },
  
  async onTaskError(task: Task, error: Error): Promise<void> {
    console.error(`Task failed: ${task.id}`, error);
    // Log to monitoring system
    await this.logTaskEvent('error', task, null, error);
  }
});

WebSocket Support

Real-time Communication

// Enable WebSocket support
const server = new A2AServer({
  agentCard,
  executor,
  websocket: {
    enabled: true,
    path: '/ws',
    heartbeatInterval: 30000
  }
});
 
// Handle WebSocket connections
server.on('websocket:connection', (ws, req) => {
  console.log('New WebSocket connection');
  
  // Send welcome message
  ws.send(JSON.stringify({
    type: 'welcome',
    message: 'Connected to A2A agent'
  }));
  
  // Handle incoming messages
  ws.on('message', async (data) => {
    try {
      const message = JSON.parse(data.toString());
      await handleWebSocketMessage(ws, message);
    } catch (error) {
      ws.send(JSON.stringify({
        type: 'error',
        message: 'Invalid message format'
      }));
    }
  });
  
  // Handle disconnection
  ws.on('close', () => {
    console.log('WebSocket connection closed');
  });
});
 
async function handleWebSocketMessage(ws: WebSocket, message: any) {
  switch (message.type) {
    case 'task:create':
      const task = await server.createTask(message.data);
      ws.send(JSON.stringify({
        type: 'task:created',
        data: task
      }));
      break;
      
    case 'task:status':
      const taskStatus = await server.getTask(message.taskId);
      ws.send(JSON.stringify({
        type: 'task:status',
        data: taskStatus
      }));
      break;
      
    default:
      ws.send(JSON.stringify({
        type: 'error',
        message: `Unknown message type: ${message.type}`
      }));
  }
}

Health Monitoring

Health Checks

// Add health check endpoint
server.get('/health', async (req, res) => {
  const health = {
    status: 'healthy',
    timestamp: new Date().toISOString(),
    uptime: process.uptime(),
    memory: process.memoryUsage(),
    tasks: {
      active: await taskManager.getActiveTasks().length,
      completed: await taskManager.getCompletedTasks().length
    }
  };
  
  res.json(health);
});
 
// Add metrics endpoint
server.get('/metrics', async (req, res) => {
  const metrics = {
    requests_total: requestCounter.get(),
    tasks_created_total: taskCreatedCounter.get(),
    tasks_completed_total: taskCompletedCounter.get(),
    tasks_failed_total: taskFailedCounter.get(),
    response_time_seconds: responseTimeHistogram.get()
  };
  
  res.json(metrics);
});

Performance Monitoring

import { performance } from 'perf_hooks';
 
class PerformanceMonitor {
  private metrics = new Map<string, number[]>();
  
  startTimer(operation: string): () => void {
    const start = performance.now();
    
    return () => {
      const duration = performance.now() - start;
      this.recordMetric(operation, duration);
    };
  }
  
  recordMetric(operation: string, value: number): void {
    if (!this.metrics.has(operation)) {
      this.metrics.set(operation, []);
    }
    
    const values = this.metrics.get(operation)!;
    values.push(value);
    
    // Keep only last 1000 measurements
    if (values.length > 1000) {
      values.shift();
    }
  }
  
  getStats(operation: string) {
    const values = this.metrics.get(operation) || [];
    if (values.length === 0) return null;
    
    const sorted = [...values].sort((a, b) => a - b);
    return {
      count: values.length,
      min: sorted[0],
      max: sorted[sorted.length - 1],
      avg: values.reduce((a, b) => a + b, 0) / values.length,
      p50: sorted[Math.floor(sorted.length * 0.5)],
      p95: sorted[Math.floor(sorted.length * 0.95)],
      p99: sorted[Math.floor(sorted.length * 0.99)]
    };
  }
}
 
const monitor = new PerformanceMonitor();
 
// Use in executor
const executor = new DefaultAgentExecutor({
  async executeTask(task: Task): Promise<TaskResult> {
    const endTimer = monitor.startTimer('task_execution');
    
    try {
      const result = await this.processTask(task);
      return result;
    } finally {
      endTimer();
    }
  }
});

Deployment

Docker Deployment

# Dockerfile
FROM node:18-alpine
 
WORKDIR /app
 
# Copy package files
COPY package*.json ./
COPY pnpm-lock.yaml ./
 
# Install dependencies
RUN npm install -g pnpm
RUN pnpm install --frozen-lockfile
 
# Copy source code
COPY . .
 
# Build the application
RUN pnpm build
 
# Expose port
EXPOSE 3000
 
# Health check
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
  CMD curl -f http://localhost:3000/health || exit 1
 
# Start the server
CMD ["pnpm", "start"]

Docker Compose

# docker-compose.yml
version: '3.8'
 
services:
  a2a-agent:
    build: .
    ports:
      - "3000:3000"
    environment:
      - NODE_ENV=production
      - PORT=3000
      - REDIS_URL=redis://redis:6379
      - DATABASE_URL=postgresql://user:pass@postgres:5432/a2a
    depends_on:
      - redis
      - postgres
    restart: unless-stopped
    
  redis:
    image: redis:7-alpine
    restart: unless-stopped
    
  postgres:
    image: postgres:15-alpine
    environment:
      - POSTGRES_DB=a2a
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=pass
    volumes:
      - postgres_data:/var/lib/postgresql/data
    restart: unless-stopped
 
volumes:
  postgres_data:

Kubernetes Deployment

# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: a2a-agent
spec:
  replicas: 3
  selector:
    matchLabels:
      app: a2a-agent
  template:
    metadata:
      labels:
        app: a2a-agent
    spec:
      containers:
      - name: a2a-agent
        image: your-registry/a2a-agent:latest
        ports:
        - containerPort: 3000
        env:
        - name: NODE_ENV
          value: "production"
        - name: PORT
          value: "3000"
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"
        livenessProbe:
          httpGet:
            path: /health
            port: 3000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /health
            port: 3000
          initialDelaySeconds: 5
          periodSeconds: 5
 
---
apiVersion: v1
kind: Service
metadata:
  name: a2a-agent-service
spec:
  selector:
    app: a2a-agent
  ports:
  - port: 80
    targetPort: 3000
  type: LoadBalancer

Best Practices

1. Error Handling

⚠️

Always implement comprehensive error handling to ensure your agent can gracefully handle failures.

class RobustAgentExecutor implements AgentExecutor {
  async executeTask(task: Task): Promise<TaskResult> {
    try {
      // Validate input
      this.validateTaskInput(task);
      
      // Execute with timeout
      const result = await Promise.race([
        this.processTask(task),
        this.createTimeoutPromise(300000) // 5 minutes
      ]);
      
      return result;
    } catch (error) {
      // Log error for debugging
      console.error(`Task ${task.id} failed:`, error);
      
      // Return appropriate error response
      if (error instanceof ValidationError) {
        throw new TaskFailedError(task.id, 'Invalid input', error.details);
      } else if (error instanceof TimeoutError) {
        throw new TaskFailedError(task.id, 'Task timeout');
      } else {
        throw new TaskFailedError(task.id, 'Internal error');
      }
    }
  }
}

2. Resource Management

// Implement resource cleanup
class ResourceAwareExecutor implements AgentExecutor {
  private activeConnections = new Set<any>();
  private tempFiles = new Set<string>();
  
  async executeTask(task: Task): Promise<TaskResult> {
    try {
      const result = await this.processTask(task);
      return result;
    } finally {
      // Always cleanup resources
      await this.cleanup();
    }
  }
  
  private async cleanup(): Promise<void> {
    // Close connections
    for (const connection of this.activeConnections) {
      await connection.close();
    }
    this.activeConnections.clear();
    
    // Delete temp files
    for (const file of this.tempFiles) {
      await fs.unlink(file).catch(() => {}); // Ignore errors
    }
    this.tempFiles.clear();
  }
}

3. Configuration Management

// Environment-based configuration
const config = {
  server: {
    port: parseInt(process.env.PORT || '3000'),
    host: process.env.HOST || '0.0.0.0'
  },
  agent: {
    id: process.env.AGENT_ID || 'default-agent',
    name: process.env.AGENT_NAME || 'Default Agent',
    endpoint: process.env.AGENT_ENDPOINT || 'http://localhost:3000'
  },
  database: {
    url: process.env.DATABASE_URL || 'sqlite:./data.db'
  },
  redis: {
    url: process.env.REDIS_URL || 'redis://localhost:6379'
  }
};

Next Steps