Production-Ready Crew

Enterprise-grade AI crew with comprehensive error handling, monitoring, and production features

Production-Ready Crew

This example demonstrates how to build a production-ready RCrewAI crew with enterprise-grade features including comprehensive error handling, monitoring, logging, security controls, and deployment considerations.

Overview

We’ll create a customer support automation crew with:

  • Robust Error Handling: Comprehensive exception handling and recovery
  • Monitoring & Observability: Detailed metrics, logging, and health checks
  • Security Controls: Input validation, access controls, and audit logging
  • Performance Optimization: Caching, connection pooling, and resource management
  • Scalability Features: Load balancing, concurrency controls, and auto-scaling
  • Production Deployment: Docker containers, configuration management, and CI/CD

Complete Production Implementation

require 'rcrewai'
require 'logger'
require 'json'
require 'redis'

# ===== PRODUCTION CONFIGURATION =====

class ProductionConfig
  def self.setup
    # Environment-based configuration
    @env = ENV.fetch('RAILS_ENV', 'development')
    @redis_url = ENV.fetch('REDIS_URL', 'redis://localhost:6379')
    
    # Configure RCrewAI for production
    RCrewAI.configure do |config|
      config.llm_provider = ENV.fetch('LLM_PROVIDER', 'openai').to_sym
      config.temperature = ENV.fetch('LLM_TEMPERATURE', '0.1').to_f
      config.max_tokens = ENV.fetch('LLM_MAX_TOKENS', '4000').to_i
      
      # Provider-specific configuration
      case config.llm_provider
      when :openai
        config.openai_api_key = ENV.fetch('OPENAI_API_KEY')
      when :anthropic
        config.anthropic_api_key = ENV.fetch('ANTHROPIC_API_KEY')
      when :azure
        config.azure_api_key = ENV.fetch('AZURE_OPENAI_API_KEY')
        config.base_url = ENV.fetch('AZURE_OPENAI_ENDPOINT')
      end
    end
    
    # Setup Redis for caching and coordination
    @redis = Redis.new(url: @redis_url)
    
    # Setup structured logging
    @logger = setup_logger
    
    [@redis, @logger]
  end
  
  private
  
  def self.setup_logger
    logger = Logger.new($stdout)
    logger.level = ENV.fetch('LOG_LEVEL', 'INFO').upcase.constantize rescue Logger::INFO
    logger.formatter = proc do |severity, datetime, progname, msg|
      {
        timestamp: datetime.iso8601,
        level: severity,
        component: progname || 'rcrewai',
        message: msg,
        environment: @env,
        process_id: Process.pid
      }.to_json + "\n"
    end
    logger
  end
  
  class << self
    attr_reader :redis, :logger, :env
  end
end

# Initialize production configuration
ProductionConfig.setup
redis = ProductionConfig.redis
logger = ProductionConfig.logger

# ===== PRODUCTION-GRADE BASE CLASSES =====

class ProductionAgent < RCrewAI::Agent
  def initialize(**options)
    super
    @logger = ProductionConfig.logger
    @start_time = nil
  end
  
  def execute_task(task)
    @start_time = Time.now
    task_labels = { agent_name: name, task_name: task.name }
    
    begin
      @logger.info("Task execution started", task_labels)
      
      result = super(task)
      
      duration = Time.now - @start_time
      @logger.info("Task execution completed", task_labels.merge(
        duration: duration,
        result_length: result.length
      ))
      
      result
      
    rescue => e
      duration = Time.now - @start_time
      
      @logger.error("Task execution failed", task_labels.merge(
        error: e.message,
        error_class: e.class.name,
        duration: duration
      ))
      
      raise
    end
  end
end

# ===== CUSTOMER SUPPORT CREW =====

logger.info("Initializing production customer support crew")

# Create production crew
support_crew = RCrewAI::Crew.new("customer_support_crew", process: :hierarchical)

# Support Manager
support_manager = ProductionAgent.new(
  name: "support_manager",
  role: "Customer Support Manager",
  goal: "Efficiently coordinate customer support operations and ensure customer satisfaction",
  backstory: "You are an experienced customer support manager with expertise in escalation handling, team coordination, and customer relationship management.",
  manager: true,
  allow_delegation: true,
  tools: [
    RCrewAI::Tools::WebSearch.new(max_results: 5),
    RCrewAI::Tools::FileReader.new,
    RCrewAI::Tools::FileWriter.new
  ],
  verbose: ProductionConfig.env == 'development',
  max_execution_time: 600
)

# Technical Support Specialist
tech_support = ProductionAgent.new(
  name: "tech_support_specialist",
  role: "Technical Support Specialist", 
  goal: "Resolve technical issues and provide expert technical guidance",
  backstory: "You are a senior technical support specialist with deep knowledge of software systems, APIs, and troubleshooting.",
  tools: [
    RCrewAI::Tools::WebSearch.new(max_results: 10),
    RCrewAI::Tools::FileReader.new,
    RCrewAI::Tools::FileWriter.new
  ],
  verbose: ProductionConfig.env == 'development',
  max_execution_time: 900
)

# Add agents to crew
support_crew.add_agent(support_manager)
support_crew.add_agent(tech_support)

# ===== PRODUCTION TASK DEFINITIONS =====

# Customer Issue Analysis
issue_analysis = RCrewAI::Task.new(
  name: "customer_issue_analysis",
  description: "Analyze incoming customer support tickets to categorize issues, assess severity, and determine initial response strategy.",
  expected_output: "Structured analysis with issue categorization, severity assessment, and recommended response strategy",
  async: true,
  max_retries: 3,
  retry_delay: 10
)

# Technical Issue Resolution  
technical_resolution = RCrewAI::Task.new(
  name: "technical_issue_resolution",
  description: "Investigate and resolve technical issues reported by customers. Provide step-by-step solutions and code examples.",
  expected_output: "Comprehensive technical solution with troubleshooting steps and configuration guidance",
  context: [issue_analysis],
  async: true,
  max_retries: 2
)

# Add tasks to crew
support_crew.add_task(issue_analysis)
support_crew.add_task(technical_resolution)

# ===== PRODUCTION EXECUTION WITH MONITORING =====

class ProductionExecutor
  def initialize(crew, logger)
    @crew = crew
    @logger = logger
    @execution_id = SecureRandom.uuid
  end
  
  def execute
    @logger.info("Starting production crew execution", {
      execution_id: @execution_id,
      crew_name: @crew.name,
      agent_count: @crew.agents.length,
      task_count: @crew.tasks.length
    })
    
    start_time = Time.now
    
    begin
      results = @crew.execute(
        async: true,
        max_concurrency: ENV.fetch('MAX_CONCURRENCY', '3').to_i
      )
      
      duration = Time.now - start_time
      
      @logger.info("Crew execution completed", {
        execution_id: @execution_id,
        duration: duration,
        success_rate: results[:success_rate]
      })
      
      generate_execution_report(results, duration)
      
      results
      
    rescue => e
      duration = Time.now - start_time
      
      @logger.error("Crew execution failed", {
        execution_id: @execution_id,
        error: e.message,
        duration: duration
      })
      
      raise
    end
  end
  
  private
  
  def generate_execution_report(results, duration)
    report = {
      execution_id: @execution_id,
      timestamp: Time.now.iso8601,
      crew_name: @crew.name,
      duration: duration,
      metrics: {
        total_tasks: results[:total_tasks],
        completed_tasks: results[:completed_tasks],
        success_rate: results[:success_rate]
      }
    }
    
    File.write("execution_report_#{@execution_id}.json", report.to_json)
    @logger.info("Execution report generated")
  end
end

# ===== HEALTH CHECKS =====

class HealthChecker
  def self.check_system_health
    health_status = {
      timestamp: Time.now.iso8601,
      status: 'healthy',
      checks: {}
    }
    
    # Check Redis connectivity
    begin
      ProductionConfig.redis.ping
      health_status[:checks][:redis] = { status: 'healthy' }
    rescue => e
      health_status[:checks][:redis] = { status: 'unhealthy', message: e.message }
      health_status[:status] = 'unhealthy'
    end
    
    health_status
  end
end

# ===== PRODUCTION EXECUTION =====

if __FILE__ == $0
  logger.info("Starting production customer support crew")
  
  # Pre-execution health check
  health_status = HealthChecker.check_system_health
  if health_status[:status] == 'unhealthy'
    logger.error("System health check failed", health_status)
    exit 1
  end
  
  # Execute crew with production monitoring
  executor = ProductionExecutor.new(support_crew, logger)
  
  begin
    results = executor.execute
    
    puts "\n🎉 PRODUCTION EXECUTION COMPLETED"
    puts "Success Rate: #{results[:success_rate]}%"
    puts "Completed Tasks: #{results[:completed_tasks]}/#{results[:total_tasks]}"
    
  rescue => e
    logger.error("Production execution failed", { error: e.message })
    puts "\n❌ PRODUCTION EXECUTION FAILED"
    exit 1
  end
end

Docker Configuration

Dockerfile:

FROM ruby:3.1-alpine

WORKDIR /app

COPY Gemfile Gemfile.lock ./
RUN bundle install --without development test

COPY . .

ENV RAILS_ENV=production
ENV LOG_LEVEL=INFO

HEALTHCHECK --interval=30s --timeout=3s --retries=3 \
  CMD ruby -e "puts HealthChecker.check_system_health[:status]" || exit 1

CMD ["ruby", "production_crew.rb"]

docker-compose.yml:

version: '3.8'
services:
  rcrewai-app:
    build: .
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - REDIS_URL=redis://redis:6379
      - MAX_CONCURRENCY=3
    depends_on:
      - redis
    restart: unless-stopped
      
  redis:
    image: redis:7-alpine
    restart: unless-stopped

Environment Variables

# LLM Configuration
export LLM_PROVIDER=openai
export OPENAI_API_KEY=sk-your-key
export LLM_TEMPERATURE=0.1

# Redis Configuration  
export REDIS_URL=redis://localhost:6379

# Execution Configuration
export MAX_CONCURRENCY=3
export LOG_LEVEL=INFO

Production Features

1. Structured Logging

JSON-formatted logs for easy parsing:

{
  "timestamp": "2024-01-15T10:30:45Z",
  "level": "INFO",
  "component": "rcrewai", 
  "message": "Task execution completed",
  "agent_name": "tech_support_specialist",
  "duration": 45.2
}

2. Health Checks

System health monitoring:

health_status = HealthChecker.check_system_health
# Checks: Redis connectivity, system resources

3. Error Recovery

Automatic retry with backoff:

max_retries: 3,
retry_delay: 10  # Increases on retry

4. Execution Reports

Detailed execution analytics:

{
  "execution_id": "uuid",
  "duration": 120.5,
  "success_rate": 100.0,
  "total_tasks": 2,
  "completed_tasks": 2
}

Monitoring and Alerting

Key Metrics

  • Success/failure rates
  • Execution duration
  • Queue depth
  • System health

Alerting Rules

  • Success rate below 80%
  • Execution time above 5 minutes
  • System health check failures

Security Best Practices

  1. Environment Variables: Store sensitive data in env vars
  2. Input Validation: Sanitize all inputs
  3. Access Controls: Implement proper authorization
  4. Audit Logging: Track all operations

Deployment Strategies

Rolling Updates

kubectl set image deployment/rcrewai rcrewai=rcrewai:v1.2.0
kubectl rollout status deployment/rcrewai

Blue-Green Deployment

  1. Deploy to green environment
  2. Test thoroughly
  3. Switch traffic
  4. Keep blue for rollback

This production-ready implementation provides enterprise-grade reliability, monitoring, and scalability for AI crew deployments.