Input Tokens
- Prompts and instructions
- Context from files
- Previous conversation history
- System messages
Managing Claude Code costs at enterprise scale requires strategic planning, monitoring, and optimization. This guide provides comprehensive strategies for controlling expenses while maximizing value.
Input Tokens
Output Tokens
Hidden Costs
Set up usage tracking
# Create monitoring scriptcat > monitor-usage.sh << 'EOF'#!/bin/bash
# ConfigurationCOST_THRESHOLD=100 # Daily limit in USDALERT_EMAIL="tech-team@company.com"
# Track usagetrack_usage() { local user=$1 local tokens=$2 local cost=$3
# Log to database echo "INSERT INTO usage_logs (user, tokens, cost, timestamp) VALUES ('$user', $tokens, $cost, NOW());" | mysql usage_db
# Check thresholds DAILY_TOTAL=$(echo "SELECT SUM(cost) FROM usage_logs WHERE DATE(timestamp) = CURDATE();" | mysql usage_db)
if (( $(echo "$DAILY_TOTAL > $COST_THRESHOLD" | bc -l) )); then send_alert "Daily cost threshold exceeded: \$$DAILY_TOTAL" fi}
# Monitor Claude Code usageclaudecode monitor --format json | while read -r line; do USER=$(echo $line | jq -r '.user') TOKENS=$(echo $line | jq -r '.tokens') COST=$(echo $line | jq -r '.cost') track_usage "$USER" "$TOKENS" "$COST"doneEOF
chmod +x monitor-usage.sh
Create cost dashboard
import pandas as pdimport plotly.graph_objects as gofrom datetime import datetime, timedeltaimport mysql.connector
class CostDashboard: def __init__(self, db_config): self.conn = mysql.connector.connect(**db_config)
def get_usage_data(self, days=30): query = """ SELECT DATE(timestamp) as date, user, SUM(tokens) as total_tokens, SUM(cost) as total_cost FROM usage_logs WHERE timestamp >= DATE_SUB(NOW(), INTERVAL %s DAY) GROUP BY DATE(timestamp), user ORDER BY date DESC """ df = pd.read_sql(query, self.conn, params=[days]) return df
def create_cost_chart(self): df = self.get_usage_data()
fig = go.Figure()
# Add traces for each user for user in df['user'].unique(): user_data = df[df['user'] == user] fig.add_trace(go.Scatter( x=user_data['date'], y=user_data['total_cost'], mode='lines+markers', name=user, stackgroup='one' ))
fig.update_layout( title='Claude Code Usage Costs by User', xaxis_title='Date', yaxis_title='Cost (USD)', hovermode='x unified' )
return fig
def generate_report(self): df = self.get_usage_data()
# Calculate statistics total_cost = df['total_cost'].sum() avg_daily_cost = df.groupby('date')['total_cost'].sum().mean() top_users = df.groupby('user')['total_cost'].sum().nlargest(5)
report = f""" # Claude Code Usage Report
## Summary (Last 30 Days) - Total Cost: ${total_cost:.2f} - Average Daily Cost: ${avg_daily_cost:.2f} - Total Tokens: {df['total_tokens'].sum():,}
## Top Users by Cost {top_users.to_string()}
## Cost Trends See attached visualization """
return report
Implement alerting
groups: - name: claude_code_costs interval: 5m rules: - alert: HighTokenUsage expr: | sum(rate(claude_code_tokens_total[5m])) by (user) > 10000 for: 10m labels: severity: warning annotations: summary: "High token usage for user {{ $labels.user }}" description: "{{ $labels.user }} is using {{ $value }} tokens/minute"
- alert: CostThresholdExceeded expr: | sum(claude_code_cost_total) by (department) > 1000 for: 5m labels: severity: critical annotations: summary: "Department {{ $labels.department }} exceeded budget" description: "Current cost: ${{ $value }}"
Budget Allocation System
from datetime import datetimefrom enum import Enumimport json
class BudgetPeriod(Enum): DAILY = "daily" WEEKLY = "weekly" MONTHLY = "monthly"
class BudgetManager: def __init__(self, config_file="budgets.json"): with open(config_file) as f: self.config = json.load(f) self.usage = {}
def check_budget(self, department, user, estimated_tokens): """Check if request is within budget""" dept_config = self.config.get(department, {})
# Check department budget dept_limit = dept_config.get('limit', float('inf')) dept_usage = self.get_usage(department, dept_config['period'])
if dept_usage + self.estimate_cost(estimated_tokens) > dept_limit: return False, f"Department budget exceeded: ${dept_usage:.2f}/${dept_limit:.2f}"
# Check user budget user_limit = dept_config.get('user_limits', {}).get(user, float('inf')) user_usage = self.get_usage(f"{department}:{user}", dept_config['period'])
if user_usage + self.estimate_cost(estimated_tokens) > user_limit: return False, f"User budget exceeded: ${user_usage:.2f}/${user_limit:.2f}"
return True, "Budget check passed"
def record_usage(self, department, user, tokens, cost): """Record actual usage""" timestamp = datetime.now()
# Record department usage if department not in self.usage: self.usage[department] = [] self.usage[department].append({ 'timestamp': timestamp, 'tokens': tokens, 'cost': cost })
# Record user usage user_key = f"{department}:{user}" if user_key not in self.usage: self.usage[user_key] = [] self.usage[user_key].append({ 'timestamp': timestamp, 'tokens': tokens, 'cost': cost })
def get_usage(self, key, period): """Get usage for current period""" if key not in self.usage: return 0.0
# Calculate period start now = datetime.now() if period == BudgetPeriod.DAILY: period_start = now.replace(hour=0, minute=0, second=0) elif period == BudgetPeriod.WEEKLY: period_start = now - timedelta(days=now.weekday()) else: # Monthly period_start = now.replace(day=1, hour=0, minute=0, second=0)
# Sum usage in period total = sum( entry['cost'] for entry in self.usage[key] if entry['timestamp'] >= period_start )
return total
def estimate_cost(self, tokens): """Estimate cost for token count""" # Claude-3 pricing (example) return tokens * 0.000015 # $15 per million tokens
{ "engineering": { "limit": 500, "period": "daily", "user_limits": { "senior_dev_1": 100, "senior_dev_2": 100, "junior_dev_1": 50, "intern_1": 25 }, "model_restrictions": { "claude-3-opus": ["senior_dev_1", "senior_dev_2"], "claude-3-sonnet": "*", "claude-instant": "*" } }, "marketing": { "limit": 200, "period": "weekly", "user_limits": { "content_lead": 150, "writer_1": 50 }, "model_restrictions": { "claude-3-opus": ["content_lead"], "claude-3-sonnet": "*" } }, "support": { "limit": 1000, "period": "monthly", "user_limits": {}, "model_restrictions": { "claude-instant": "*" } }}
departments: engineering: limit: 500 period: daily users: - name: senior_dev_1 limit: 100 models: [claude-3-opus, claude-3-sonnet] - name: senior_dev_2 limit: 100 models: [claude-3-opus, claude-3-sonnet] - name: junior_dev_1 limit: 50 models: [claude-3-sonnet, claude-instant]
marketing: limit: 200 period: weekly users: - name: content_lead limit: 150 models: [claude-3-opus, claude-3-sonnet] - name: writer_1 limit: 50 models: [claude-3-sonnet]
Efficient Prompting
class PromptOptimizer: def __init__(self): self.token_counter = TokenCounter()
def optimize_prompt(self, prompt, max_tokens=2000): """Optimize prompt to reduce tokens"""
# Remove unnecessary whitespace prompt = ' '.join(prompt.split())
# Compress repeated instructions prompt = self.compress_instructions(prompt)
# Remove redundant context prompt = self.remove_redundancy(prompt)
# Truncate if needed if self.token_counter.count(prompt) > max_tokens: prompt = self.smart_truncate(prompt, max_tokens)
return prompt
def compress_instructions(self, prompt): """Replace verbose instructions with concise ones""" replacements = { "Please analyze the following code and provide": "Analyze:", "Can you help me understand": "Explain:", "I would like you to": "", "Could you please": "" }
for verbose, concise in replacements.items(): prompt = prompt.replace(verbose, concise)
return prompt
def batch_similar_requests(self, requests): """Batch similar requests together""" batched = {}
for req in requests: # Group by request type req_type = self.classify_request(req) if req_type not in batched: batched[req_type] = [] batched[req_type].append(req)
# Create batched prompts batch_prompts = [] for req_type, reqs in batched.items(): if len(reqs) > 1: batch_prompt = f"Process these {len(reqs)} {req_type} requests:\n" for i, req in enumerate(reqs, 1): batch_prompt += f"\n{i}. {req}" batch_prompts.append(batch_prompt) else: batch_prompts.extend(reqs)
return batch_prompts
Context Management
class ContextManager: def __init__(self, max_context_tokens=4000): self.max_tokens = max_context_tokens self.context_cache = {}
def prepare_context(self, files, focus_file=None): """Prepare optimized context from files""" context_parts = [] token_budget = self.max_tokens
# Priority 1: Focus file if focus_file and focus_file in files: content = self.summarize_file(files[focus_file]) tokens = self.count_tokens(content) if tokens <= token_budget * 0.5: # Max 50% for focus file context_parts.append(f"=== {focus_file} ===\n{content}") token_budget -= tokens
# Priority 2: Related files related = self.find_related_files(focus_file, files) for file in related: if token_budget <= 0: break
summary = self.create_summary(files[file]) tokens = self.count_tokens(summary)
if tokens <= token_budget: context_parts.append(f"=== {file} (summary) ===\n{summary}") token_budget -= tokens
return "\n\n".join(context_parts)
def create_summary(self, content): """Create token-efficient summary""" lines = content.split('\n')
# Extract key elements imports = [l for l in lines if l.strip().startswith('import')] functions = [l for l in lines if 'def ' in l or 'function ' in l] classes = [l for l in lines if 'class ' in l]
summary = [] if imports: summary.append("Imports: " + ", ".join(imports[:5])) if functions: summary.append("Functions: " + ", ".join(functions[:10])) if classes: summary.append("Classes: " + ", ".join(classes[:5]))
return "\n".join(summary)
Implement smart routing
class ModelRouter: def __init__(self): self.models = { 'claude-3-opus': { 'cost_per_1k': 0.015, 'capabilities': ['complex_code', 'architecture', 'analysis'] }, 'claude-3-sonnet': { 'cost_per_1k': 0.003, 'capabilities': ['general_code', 'refactoring', 'testing'] }, 'claude-instant': { 'cost_per_1k': 0.0008, 'capabilities': ['simple_code', 'formatting', 'comments'] } }
def select_model(self, task_type, complexity, budget_remaining): """Select optimal model based on task and budget"""
# Map task to required capabilities required_caps = self.get_required_capabilities(task_type)
# Filter capable models capable_models = [ model for model, info in self.models.items() if any(cap in info['capabilities'] for cap in required_caps) ]
# Sort by cost capable_models.sort(key=lambda m: self.models[m]['cost_per_1k'])
# Check budget for model in capable_models: estimated_cost = self.estimate_task_cost(model, complexity) if estimated_cost <= budget_remaining: return model
return None # No model within budget
def estimate_task_cost(self, model, complexity): """Estimate cost for task""" base_tokens = { 'simple': 1000, 'moderate': 5000, 'complex': 15000 }
tokens = base_tokens.get(complexity, 5000) cost_per_token = self.models[model]['cost_per_1k'] / 1000
return tokens * cost_per_token
Create routing rules
rules: - pattern: "fix.*bug|debug|error" model: claude-3-sonnet max_tokens: 2000
- pattern: "implement.*feature|create.*from scratch" model: claude-3-opus max_tokens: 8000
- pattern: "format|lint|comment" model: claude-instant max_tokens: 1000
- pattern: "test|unit test|integration test" model: claude-3-sonnet max_tokens: 3000
- pattern: "architecture|design|refactor.*large" model: claude-3-opus max_tokens: 10000
default: model: claude-3-sonnet max_tokens: 4000
Intelligent Cache System
import hashlibimport jsonimport redisfrom datetime import timedelta
class ResponseCache: def __init__(self, redis_host='localhost', ttl_hours=24): self.redis = redis.Redis(host=redis_host, decode_responses=True) self.ttl = timedelta(hours=ttl_hours)
def get_cache_key(self, prompt, model, params): """Generate deterministic cache key""" cache_data = { 'prompt': prompt, 'model': model, 'params': params }
# Create hash of request cache_string = json.dumps(cache_data, sort_keys=True) return f"claude:cache:{hashlib.sha256(cache_string.encode()).hexdigest()}"
def get(self, prompt, model, params): """Get cached response if available""" key = self.get_cache_key(prompt, model, params) cached = self.redis.get(key)
if cached: # Update access count self.redis.hincrby(f"{key}:meta", "hits", 1) return json.loads(cached)
return None
def set(self, prompt, model, params, response, tokens_used): """Cache response with metadata""" key = self.get_cache_key(prompt, model, params)
# Store response self.redis.setex( key, self.ttl, json.dumps(response) )
# Store metadata meta_key = f"{key}:meta" self.redis.hset(meta_key, mapping={ 'prompt_tokens': len(prompt.split()), 'response_tokens': tokens_used, 'model': model, 'created': datetime.now().isoformat(), 'hits': 0 }) self.redis.expire(meta_key, self.ttl)
def get_cache_stats(self): """Get caching statistics""" stats = { 'total_cached': 0, 'total_hits': 0, 'tokens_saved': 0, 'cost_saved': 0 }
# Scan all cache keys for key in self.redis.scan_iter("claude:cache:*:meta"): meta = self.redis.hgetall(key) stats['total_cached'] += 1 hits = int(meta.get('hits', 0)) stats['total_hits'] += hits
if hits > 0: tokens = int(meta.get('response_tokens', 0)) stats['tokens_saved'] += tokens * hits
# Calculate cost saved (example rates) model = meta.get('model', 'claude-3-sonnet') if model == 'claude-3-opus': cost_per_token = 0.000015 elif model == 'claude-3-sonnet': cost_per_token = 0.000003 else: cost_per_token = 0.0000008
stats['cost_saved'] += tokens * hits * cost_per_token
return stats
Create comprehensive reports
class ExecutiveReport: def __init__(self, db_connection): self.db = db_connection
def generate_monthly_report(self): """Generate executive monthly report"""
# Get data costs = self.get_monthly_costs() usage = self.get_usage_patterns() savings = self.calculate_savings()
report = f""" # Claude Code Executive Summary - {datetime.now().strftime('%B %Y')}
## Financial Overview - Total Spend: ${costs['total']:,.2f} - Budget Utilization: {costs['utilization']}% - Cost per Developer: ${costs['per_developer']:,.2f} - MoM Change: {costs['mom_change']:+.1f}%
## Usage Insights - Active Users: {usage['active_users']} - Total Requests: {usage['total_requests']:,} - Average Tokens/Request: {usage['avg_tokens']:,} - Peak Usage Time: {usage['peak_time']}
## Cost Optimization - Savings from Caching: ${savings['caching']:,.2f} - Savings from Model Routing: ${savings['routing']:,.2f} - Prompt Optimization Savings: ${savings['prompts']:,.2f} - Total Savings: ${savings['total']:,.2f}
## Department Breakdown {self.format_department_table(costs['by_department'])}
## Recommendations {self.generate_recommendations(costs, usage, savings)} """
return report
def generate_recommendations(self, costs, usage, savings): """Generate cost optimization recommendations""" recs = []
# Check for high-cost users if costs['top_user_percentage'] > 20: recs.append("- Consider additional training for high-usage users")
# Check cache hit rate if savings['cache_hit_rate'] < 30: recs.append("- Improve caching strategy to increase hit rate")
# Check model distribution if costs['opus_percentage'] > 50: recs.append("- Review model selection to use Sonnet where appropriate")
return "\n".join(recs) if recs else "- No immediate actions required"
Automate report distribution
import smtplibfrom email.mime.multipart import MIMEMultipartfrom email.mime.text import MIMETextfrom email.mime.image import MIMEImage
def send_monthly_report(recipients, report_html, charts): """Send monthly report to executives"""
msg = MIMEMultipart('related') msg['Subject'] = f"Claude Code Usage Report - {datetime.now().strftime('%B %Y')}" msg['From'] = 'ai-ops@company.com' msg['To'] = ', '.join(recipients)
# Attach HTML report msg.attach(MIMEText(report_html, 'html'))
# Attach charts for i, chart in enumerate(charts): img = MIMEImage(chart) img.add_header('Content-ID', f'<chart{i}>') msg.attach(img)
# Send email with smtplib.SMTP('smtp.company.com', 587) as server: server.starttls() server.login('ai-ops@company.com', os.environ['EMAIL_PASSWORD']) server.send_message(msg)
Time-Based Restrictions
# Only allow expensive models during business hoursdef check_time_restriction(model, user): current_hour = datetime.now().hour
if model == 'claude-3-opus': if current_hour < 8 or current_hour > 18: return False, "Opus model restricted to business hours"
return True, "Allowed"
Request Throttling
# Implement request rate limitingfrom collections import defaultdictimport time
class RateLimiter: def __init__(self): self.requests = defaultdict(list)
def check_rate_limit(self, user, limit=10, window=60): now = time.time()
# Clean old requests self.requests[user] = [ req for req in self.requests[user] if now - req < window ]
# Check limit if len(self.requests[user]) >= limit: return False, f"Rate limit exceeded: {limit} requests per {window}s"
# Record request self.requests[user].append(now) return True, "Allowed"
Circuit Breaker Implementation
class CostCircuitBreaker: def __init__(self, daily_limit=1000, emergency_limit=1500): self.daily_limit = daily_limit self.emergency_limit = emergency_limit self.current_usage = 0 self.emergency_mode = False
def check_request(self, estimated_cost, user, priority='normal'): """Check if request should be allowed"""
if self.current_usage >= self.emergency_limit: # Complete shutdown return False, "Emergency limit reached - all requests blocked"
if self.current_usage >= self.daily_limit: # Emergency mode - only critical requests if priority != 'critical': return False, "Daily limit reached - only critical requests allowed" self.emergency_mode = True
if self.emergency_mode: # Notify admins self.send_emergency_alert(user, estimated_cost)
return True, "Request approved"
def send_emergency_alert(self, user, cost): """Alert administrators of emergency usage""" alert = f""" URGENT: Claude Code Emergency Mode Active
Current Usage: ${self.current_usage:.2f} Daily Limit: ${self.daily_limit:.2f} Emergency Limit: ${self.emergency_limit:.2f}
Request from: {user} Estimated Cost: ${cost:.2f}
Action Required: Review and approve critical requests only. """
# Send to Slack, email, etc. send_slack_alert('#ai-ops-emergency', alert)
Continue optimizing costs with:
Remember: Effective cost control is about balance. Focus on maximizing value while minimizing waste, not just reducing costs. Regular monitoring and optimization can often achieve 30-50% cost reduction without impacting productivity.