Production-Ready Crew
Enterprise-grade AI crew with comprehensive error handling, monitoring, and production features
Production-Ready Crew
This example demonstrates how to build a production-ready RCrewAI crew with enterprise-grade features including comprehensive error handling, monitoring, logging, security controls, and deployment considerations.
Overview
We’ll create a customer support automation crew with:
- Robust Error Handling: Comprehensive exception handling and recovery
- Monitoring & Observability: Detailed metrics, logging, and health checks
- Security Controls: Input validation, access controls, and audit logging
- Performance Optimization: Caching, connection pooling, and resource management
- Scalability Features: Load balancing, concurrency controls, and auto-scaling
- Production Deployment: Docker containers, configuration management, and CI/CD
Complete Production Implementation
require 'rcrewai'
require 'logger'
require 'json'
require 'redis'
# ===== PRODUCTION CONFIGURATION =====
class ProductionConfig
def self.setup
# Environment-based configuration
@env = ENV.fetch('RAILS_ENV', 'development')
@redis_url = ENV.fetch('REDIS_URL', 'redis://localhost:6379')
# Configure RCrewAI for production
RCrewAI.configure do |config|
config.llm_provider = ENV.fetch('LLM_PROVIDER', 'openai').to_sym
config.temperature = ENV.fetch('LLM_TEMPERATURE', '0.1').to_f
config.max_tokens = ENV.fetch('LLM_MAX_TOKENS', '4000').to_i
# Provider-specific configuration
case config.llm_provider
when :openai
config.openai_api_key = ENV.fetch('OPENAI_API_KEY')
when :anthropic
config.anthropic_api_key = ENV.fetch('ANTHROPIC_API_KEY')
when :azure
config.azure_api_key = ENV.fetch('AZURE_OPENAI_API_KEY')
config.base_url = ENV.fetch('AZURE_OPENAI_ENDPOINT')
end
end
# Setup Redis for caching and coordination
@redis = Redis.new(url: @redis_url)
# Setup structured logging
@logger = setup_logger
[@redis, @logger]
end
private
def self.setup_logger
logger = Logger.new($stdout)
logger.level = ENV.fetch('LOG_LEVEL', 'INFO').upcase.constantize rescue Logger::INFO
logger.formatter = proc do |severity, datetime, progname, msg|
{
timestamp: datetime.iso8601,
level: severity,
component: progname || 'rcrewai',
message: msg,
environment: @env,
process_id: Process.pid
}.to_json + "\n"
end
logger
end
class << self
attr_reader :redis, :logger, :env
end
end
# Initialize production configuration
ProductionConfig.setup
redis = ProductionConfig.redis
logger = ProductionConfig.logger
# ===== PRODUCTION-GRADE BASE CLASSES =====
class ProductionAgent < RCrewAI::Agent
def initialize(**options)
super
@logger = ProductionConfig.logger
@start_time = nil
end
def execute_task(task)
@start_time = Time.now
task_labels = { agent_name: name, task_name: task.name }
begin
@logger.info("Task execution started", task_labels)
result = super(task)
duration = Time.now - @start_time
@logger.info("Task execution completed", task_labels.merge(
duration: duration,
result_length: result.length
))
result
rescue => e
duration = Time.now - @start_time
@logger.error("Task execution failed", task_labels.merge(
error: e.message,
error_class: e.class.name,
duration: duration
))
raise
end
end
end
# ===== CUSTOMER SUPPORT CREW =====
logger.info("Initializing production customer support crew")
# Create production crew
support_crew = RCrewAI::Crew.new("customer_support_crew", process: :hierarchical)
# Support Manager
support_manager = ProductionAgent.new(
name: "support_manager",
role: "Customer Support Manager",
goal: "Efficiently coordinate customer support operations and ensure customer satisfaction",
backstory: "You are an experienced customer support manager with expertise in escalation handling, team coordination, and customer relationship management.",
manager: true,
allow_delegation: true,
tools: [
RCrewAI::Tools::WebSearch.new(max_results: 5),
RCrewAI::Tools::FileReader.new,
RCrewAI::Tools::FileWriter.new
],
verbose: ProductionConfig.env == 'development',
max_execution_time: 600
)
# Technical Support Specialist
tech_support = ProductionAgent.new(
name: "tech_support_specialist",
role: "Technical Support Specialist",
goal: "Resolve technical issues and provide expert technical guidance",
backstory: "You are a senior technical support specialist with deep knowledge of software systems, APIs, and troubleshooting.",
tools: [
RCrewAI::Tools::WebSearch.new(max_results: 10),
RCrewAI::Tools::FileReader.new,
RCrewAI::Tools::FileWriter.new
],
verbose: ProductionConfig.env == 'development',
max_execution_time: 900
)
# Add agents to crew
support_crew.add_agent(support_manager)
support_crew.add_agent(tech_support)
# ===== PRODUCTION TASK DEFINITIONS =====
# Customer Issue Analysis
issue_analysis = RCrewAI::Task.new(
name: "customer_issue_analysis",
description: "Analyze incoming customer support tickets to categorize issues, assess severity, and determine initial response strategy.",
expected_output: "Structured analysis with issue categorization, severity assessment, and recommended response strategy",
async: true,
max_retries: 3,
retry_delay: 10
)
# Technical Issue Resolution
technical_resolution = RCrewAI::Task.new(
name: "technical_issue_resolution",
description: "Investigate and resolve technical issues reported by customers. Provide step-by-step solutions and code examples.",
expected_output: "Comprehensive technical solution with troubleshooting steps and configuration guidance",
context: [issue_analysis],
async: true,
max_retries: 2
)
# Add tasks to crew
support_crew.add_task(issue_analysis)
support_crew.add_task(technical_resolution)
# ===== PRODUCTION EXECUTION WITH MONITORING =====
class ProductionExecutor
def initialize(crew, logger)
@crew = crew
@logger = logger
@execution_id = SecureRandom.uuid
end
def execute
@logger.info("Starting production crew execution", {
execution_id: @execution_id,
crew_name: @crew.name,
agent_count: @crew.agents.length,
task_count: @crew.tasks.length
})
start_time = Time.now
begin
results = @crew.execute(
async: true,
max_concurrency: ENV.fetch('MAX_CONCURRENCY', '3').to_i
)
duration = Time.now - start_time
@logger.info("Crew execution completed", {
execution_id: @execution_id,
duration: duration,
success_rate: results[:success_rate]
})
generate_execution_report(results, duration)
results
rescue => e
duration = Time.now - start_time
@logger.error("Crew execution failed", {
execution_id: @execution_id,
error: e.message,
duration: duration
})
raise
end
end
private
def generate_execution_report(results, duration)
report = {
execution_id: @execution_id,
timestamp: Time.now.iso8601,
crew_name: @crew.name,
duration: duration,
metrics: {
total_tasks: results[:total_tasks],
completed_tasks: results[:completed_tasks],
success_rate: results[:success_rate]
}
}
File.write("execution_report_#{@execution_id}.json", report.to_json)
@logger.info("Execution report generated")
end
end
# ===== HEALTH CHECKS =====
class HealthChecker
def self.check_system_health
health_status = {
timestamp: Time.now.iso8601,
status: 'healthy',
checks: {}
}
# Check Redis connectivity
begin
ProductionConfig.redis.ping
health_status[:checks][:redis] = { status: 'healthy' }
rescue => e
health_status[:checks][:redis] = { status: 'unhealthy', message: e.message }
health_status[:status] = 'unhealthy'
end
health_status
end
end
# ===== PRODUCTION EXECUTION =====
if __FILE__ == $0
logger.info("Starting production customer support crew")
# Pre-execution health check
health_status = HealthChecker.check_system_health
if health_status[:status] == 'unhealthy'
logger.error("System health check failed", health_status)
exit 1
end
# Execute crew with production monitoring
executor = ProductionExecutor.new(support_crew, logger)
begin
results = executor.execute
puts "\n🎉 PRODUCTION EXECUTION COMPLETED"
puts "Success Rate: #{results[:success_rate]}%"
puts "Completed Tasks: #{results[:completed_tasks]}/#{results[:total_tasks]}"
rescue => e
logger.error("Production execution failed", { error: e.message })
puts "\n❌ PRODUCTION EXECUTION FAILED"
exit 1
end
end
Docker Configuration
Dockerfile:
FROM ruby:3.1-alpine
WORKDIR /app
COPY Gemfile Gemfile.lock ./
RUN bundle install --without development test
COPY . .
ENV RAILS_ENV=production
ENV LOG_LEVEL=INFO
HEALTHCHECK --interval=30s --timeout=3s --retries=3 \
CMD ruby -e "puts HealthChecker.check_system_health[:status]" || exit 1
CMD ["ruby", "production_crew.rb"]
docker-compose.yml:
version: '3.8'
services:
rcrewai-app:
build: .
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- REDIS_URL=redis://redis:6379
- MAX_CONCURRENCY=3
depends_on:
- redis
restart: unless-stopped
redis:
image: redis:7-alpine
restart: unless-stopped
Environment Variables
# LLM Configuration
export LLM_PROVIDER=openai
export OPENAI_API_KEY=sk-your-key
export LLM_TEMPERATURE=0.1
# Redis Configuration
export REDIS_URL=redis://localhost:6379
# Execution Configuration
export MAX_CONCURRENCY=3
export LOG_LEVEL=INFO
Production Features
1. Structured Logging
JSON-formatted logs for easy parsing:
{
"timestamp": "2024-01-15T10:30:45Z",
"level": "INFO",
"component": "rcrewai",
"message": "Task execution completed",
"agent_name": "tech_support_specialist",
"duration": 45.2
}
2. Health Checks
System health monitoring:
health_status = HealthChecker.check_system_health
# Checks: Redis connectivity, system resources
3. Error Recovery
Automatic retry with backoff:
max_retries: 3,
retry_delay: 10 # Increases on retry
4. Execution Reports
Detailed execution analytics:
{
"execution_id": "uuid",
"duration": 120.5,
"success_rate": 100.0,
"total_tasks": 2,
"completed_tasks": 2
}
Monitoring and Alerting
Key Metrics
- Success/failure rates
- Execution duration
- Queue depth
- System health
Alerting Rules
- Success rate below 80%
- Execution time above 5 minutes
- System health check failures
Security Best Practices
- Environment Variables: Store sensitive data in env vars
- Input Validation: Sanitize all inputs
- Access Controls: Implement proper authorization
- Audit Logging: Track all operations
Deployment Strategies
Rolling Updates
kubectl set image deployment/rcrewai rcrewai=rcrewai:v1.2.0
kubectl rollout status deployment/rcrewai
Blue-Green Deployment
- Deploy to green environment
- Test thoroughly
- Switch traffic
- Keep blue for rollback
This production-ready implementation provides enterprise-grade reliability, monitoring, and scalability for AI crew deployments.