Building APIs is where backend development meets the real world. Whether you’re creating a simple REST endpoint or a complex GraphQL schema, Claude Code transforms API development from tedious boilerplate into elegant, well-tested services. This lesson explores how to leverage AI assistance for API design, implementation, and maintenance.
Scenario: Your startup needs to build an API serving mobile apps, web clients, and third-party integrations. Requirements include real-time updates, complex authorization, rate limiting, and support for 100K daily active users. Traditional approach: months of development. With Claude Code: weeks.
- Manual OpenAPI spec writing
- Design authentication flow
- Write boilerplate for each endpoint
- Implement validation manually
- Add error handling everywhere
Week 5-6: Testing & Documentation
- Write tests for each endpoint
- Create Postman collections
- Document each parameter
- Performance optimization
Day 1-2: Design & Implementation
> Design a RESTful API for our e-commerce platform
> Include authentication, products, orders, and users
> Follow best practices for naming and structure
> Implement the API using FastAPI with:
Day 3: Testing & Documentation
> Generate comprehensive tests for all endpoints
> Create OpenAPI documentation
> Add example requests and responses
> Design a RESTful API for a task management system with:
> - User authentication and authorization
> - Projects, tasks, and comments
> - Real-time notifications
> Follow REST best practices and conventions
Claude generates a comprehensive API design:
Resource Structure
# API Resources and Endpoints
GET /users # List users (admin only)
GET /users/{id} # Get user profile
PUT /users/{id} # Update user
DELETE /users/{id} # Delete account
GET /projects # List user's projects
POST /projects # Create project
GET /projects/{id} # Get project details
PUT /projects/{id} # Update project
DELETE /projects/{id} # Delete project
GET /projects/{pid}/tasks # List tasks
POST /projects/{pid}/tasks # Create task
GET /tasks/{id} # Get task
PUT /tasks/{id} # Update task
DELETE /tasks/{id} # Delete task
PATCH /tasks/{id}/status # Update status only
GET /tasks/{tid}/comments # List comments
POST /tasks/{tid}/comments # Add comment
PUT /comments/{id} # Edit comment
DELETE /comments/{id} # Delete comment
HTTP Conventions
200 OK - Successful GET/PUT
201 Created - Successful POST
204 No Content - Successful DELETE
400 Bad Request - Invalid input
401 Unauthorized - Missing/invalid auth
403 Forbidden - No permission
404 Not Found - Resource not exist
409 Conflict - Duplicate/conflict
429 Too Many Requests - Rate limited
Authorization: Bearer <token>
Content-Type: application/json
X-RateLimit-Remaining: 100
X-RateLimit-Reset: 1642345678
?page=1&limit=20 # Pagination
?sort=created_at&order=desc # Sorting
?filter[status]=active # Filtering
?include=user,project # Relationships
?fields=id,title,status # Sparse fields
> Implement the task management API using FastAPI:
> - Use SQLAlchemy for database
> - Pydantic for validation
> - JWT for authentication
> - Include middleware for CORS and rate limiting
Claude generates a complete implementation:
from fastapi import FastAPI, Depends, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.orm import Session
from app.core.config import settings
from app.core.security import get_current_user
from app.api import auth, users, projects, tasks
from app.db.base import engine, Base
from app.middleware.rate_limit import RateLimitMiddleware
Base.metadata. create_all ( bind = engine )
title = " Task Management API " ,
description = " RESTful API for task management " ,
allow_origins = settings.ALLOWED_ORIGINS ,
period = 3600 # 100 calls per hour
app. include_router ( auth.router , prefix = " /api/auth " , tags = [ " auth " ] )
app. include_router ( users.router , prefix = " /api/users " , tags = [ " users " ] )
app. include_router ( projects.router , prefix = " /api/projects " , tags = [ " projects " ] )
app. include_router ( tasks.router , prefix = " /api/tasks " , tags = [ " tasks " ] )
async def health_check () :
return { " status " : " healthy " }
if __name__ == " __main__ " :
uvicorn. run ( app , host = " 0.0.0.0 " , port = 8000 )
from datetime import timedelta
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from app.core.security import create_access_token, verify_password, get_password_hash
from app.core.config import settings
from app.db.session import get_db
from app.models.user import User
from app.schemas.auth import Token, UserCreate, UserLogin
@router.post ( " /register " , response_model = Token )
db : Session = Depends ( get_db )
if db. query ( User ). filter ( User.email == user_data.email ). first ():
status_code = status.HTTP_409_CONFLICT ,
detail = " Email already registered "
username = user_data.username ,
hashed_password = get_password_hash ( user_data.password )
access_token = create_access_token (
expires_delta = timedelta ( minutes = settings.ACCESS_TOKEN_EXPIRE_MINUTES )
" access_token " : access_token,
@router.post ( " /login " , response_model = Token )
form_data : OAuth2PasswordRequestForm = Depends (),
db : Session = Depends ( get_db )
user = db. query ( User ). filter ( User.email == form_data.username ). first ()
if not user or not verify_password ( form_data.password , user.hashed_password ):
status_code = status.HTTP_401_UNAUTHORIZED ,
detail = " Incorrect email or password "
access_token = create_access_token ( subject = str ( user.id ))
" access_token " : access_token,
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from app.db.session import get_db
from app.models.task import Task
from app.models.user import User
from app.schemas.task import TaskCreate, TaskUpdate, TaskResponse
from app.core.security import get_current_user
from app.core.pagination import paginate
@router.get ( " / " , response_model = List [ TaskResponse ])
page : int = Query ( 1 , ge = 1 ),
limit : int = Query ( 20 , ge = 1 , le = 100 ),
status : Optional[ str ] = None ,
project_id : Optional[ int ] = None ,
current_user : User = Depends ( get_current_user ),
db : Session = Depends ( get_db )
query = db. query ( Task ). filter ( Task.user_id == current_user.id )
query = query. filter ( Task.status == status )
query = query. filter ( Task.project_id == project_id )
return paginate ( query , page , limit )
@router.post ( " / " , response_model = TaskResponse , status_code = 201 )
current_user : User = Depends ( get_current_user ),
db : Session = Depends ( get_db )
project = db. query ( Project ). filter (
Project.id == task_data.project_id ,
Project.user_id == current_user.id
detail = " Project not found "
@router.put ( " / {task_id} " , response_model = TaskResponse )
current_user : User = Depends ( get_current_user ),
db : Session = Depends ( get_db )
task = db. query ( Task ). filter (
Task.user_id == current_user.id
raise HTTPException ( status_code = 404 , detail = " Task not found " )
for field, value in task_update. dict ( exclude_unset = True ). items ():
setattr ( task , field , value )
> Implement advanced pagination and filtering for the tasks endpoint:
> - Cursor-based pagination for real-time data
> - Complex filtering with operators
> - Sorting by multiple fields
# Advanced query handling
from typing import Optional, List
from fastapi import Query
from sqlalchemy import or_, and_
cursor : Optional[ str ] = None ,
limit : int = Query ( 20 , le = 100 ),
status : Optional[List[ str ]] = Query ( None ),
priority : Optional[ str ] = Query ( None ),
assigned_to : Optional[ int ] = None ,
created_after : Optional[datetime] = None ,
created_before : Optional[datetime] = None ,
search : Optional[ str ] = None ,
sort_by : List[ str ] = Query ( [ " created_at " ] ),
sort_order : List[ str ] = Query ( [ " desc " ] )
self .filters = self . _build_filters ( locals ())
self .sorting = list ( zip ( sort_by , sort_order ))
def apply_to_query ( self , query ) :
query = query. filter ( and_ ( * self .filters ))
Task.title. ilike ( f "% { self .search} %" ) ,
Task.description. ilike ( f "% { self .search} %" )
for field, order in self .sorting:
column = getattr ( Task , field )
column. desc () if order == " desc " else column
# Apply cursor pagination
query = query. filter ( Task.id > self . decode_cursor ( self .cursor ))
return query. limit ( self .limit + 1 )
from fastapi import Request, HTTPException
from starlette.middleware.base import BaseHTTPMiddleware
from collections import defaultdict
class RateLimitMiddleware ( BaseHTTPMiddleware ):
def __init__ ( self , app , calls : int = 100 , period : int = 3600 ) :
self .clients = defaultdict ( list )
self .cleanup_task = asyncio. create_task ( self . cleanup ())
async def dispatch ( self , request : Request, call_next ) :
client_id = request.client.host
if auth := request.headers. get ( " Authorization " ):
# Use user ID for authenticated requests
client_id = self . get_user_id_from_token ( auth )
requests = self .clients[client_id]
requests = [ req for req in requests if req > now - self .period ]
if len ( requests ) >= self .calls:
detail = " Rate limit exceeded " ,
" X-RateLimit-Limit " : str ( self .calls ) ,
" X-RateLimit-Remaining " : " 0 " ,
" X-RateLimit-Reset " : str ( int ( requests [ 0 ] + self .period ))
self .clients[client_id] = requests
response = await call_next ( request )
response.headers[ " X-RateLimit-Limit " ] = str ( self .calls )
response.headers[ " X-RateLimit-Remaining " ] = str ( self .calls - len ( requests ))
> Design a GraphQL schema for the same task management system.
> - Type definitions for all entities
> - Queries for reading data
> - Mutations for modifications
> - Subscriptions for real-time updates
> - Custom scalars for dates
Claude generates a comprehensive GraphQL schema:
attachments : [ Attachment ! ] !
priority : TaskPriority = MEDIUM
hasPreviousPage : Boolean !
project ( id : ID ! ): Project
register ( input : RegisterInput ! ): AuthPayload !
login ( email : String ! , password : String ! ): AuthPayload !
refreshToken ( token : String ! ): AuthPayload !
createProject ( input : CreateProjectInput ! ): Project !
updateProject ( id : ID ! , input : UpdateProjectInput ! ): Project !
deleteProject ( id : ID ! ): Boolean !
createTask ( input : CreateTaskInput ! ): Task !
updateTask ( id : ID ! , input : UpdateTaskInput ! ): Task !
deleteTask ( id : ID ! ): Boolean !
batchUpdateTasks ( ids : [ ID ! ] ! , input : UpdateTaskInput ! ): [ Task ! ] !
uploadAttachment ( taskId : ID ! , file : Upload ! ): Attachment !
taskCreated ( projectId : ID ! ): Task !
taskUpdated ( id : ID ! ): Task !
taskDeleted ( id : ID ! ): ID !
projectUpdated ( id : ID ! ): Project !
userPresence ( projectId : ID ! ): PresenceEvent !
> Implement the GraphQL API using Strawberry framework:
> - Use SQLAlchemy for database
> - Add DataLoader for N+1 prevention
> - Implement authentication
> - Add subscription support
from typing import List, Optional
from datetime import datetime
from strawberry.types import Info
from app.graphql.types import User, Project, Task
from app.graphql.resolvers import (
from app.graphql.mutations import AuthMutations, TaskMutations
async def me ( self , info : Info ) -> User:
return await get_current_user ( info )
after : Optional[ str ] = None
user = await get_current_user ( info )
return await resolve_projects ( user , first , after )
after : Optional[ str ] = None ,
status : Optional[TaskStatus] = None ,
project_id : Optional[strawberry. ID ] = None
user = await get_current_user ( info )
return await resolve_tasks (
user , first , after , status , project_id
class Mutation ( AuthMutations , TaskMutations ):
self , info : Info, project_id : strawberry. ID
async for task in task_created_generator ( project_id ):
schema = strawberry. Schema (
subscription = Subscription
from strawberry.dataloader import DataLoader
from sqlalchemy.orm import Session
class UserLoader ( DataLoader ):
def __init__ ( self , db : Session ) :
super (). __init__ ( load_fn = self .load_users )
async def load_users ( self , user_ids : List[ int ] ) -> List[User]:
users = self .db. query ( User ). filter (
user_map = {user.id: user for user in users}
return [ user_map. get ( uid ) for uid in user_ids ]
class ProjectLoader ( DataLoader ):
def __init__ ( self , db : Session ) :
super (). __init__ ( load_fn = self .load_projects )
async def load_projects ( self , project_ids : List[ int ] ) -> List[Project]:
projects = self .db. query ( Project ). filter (
Project.id. in_ ( project_ids )
project_map = {p.id: p for p in projects}
return [ project_map. get ( pid ) for pid in project_ids ]
async def get_context ( request ) :
" user_loader " : UserLoader ( get_db ()),
" project_loader " : ProjectLoader ( get_db ()),
" current_user " : await get_current_user_from_request ( request )
from typing import AsyncGenerator
from broadcaster import Broadcast
broadcast = Broadcast ( " redis://localhost:6379 " )
class TaskCreatedSubscription :
project_id : strawberry. ID
) -> AsyncGenerator[Task, None ]:
async with broadcast. subscribe ( f "project: {project_id} :tasks" ) as subscriber:
async for event in subscriber:
if event[ " type " ] == " task_created " :
yield Task. from_dict ( event [ " data " ])
async def publish_task_created ( task : Task ) :
f "project: {task.project_id} :tasks" ,
> Generate comprehensive tests for our API:
> - Unit tests for each endpoint
> - Integration tests with database
> - Authentication/authorization tests
from fastapi.testclient import TestClient
from sqlalchemy.orm import Session
from app.tests.utils import (
def test_create_task ( self , db : Session ) :
user = create_test_user ( db )
project = create_test_project ( db , user )
headers = get_auth_headers ( user )
" description " : " Test description " ,
" project_id " : project.id,
assert response.status_code == 201
assert data[ " title " ] == " Test Task "
assert data[ " priority " ] == " HIGH "
assert data[ " status " ] == " TODO "
def test_list_tasks_pagination ( self , db : Session ) :
user = create_test_user ( db )
project = create_test_project ( db , user )
create_test_task ( db , project , f "Task {i} " )
headers = get_auth_headers ( user )
" /api/tasks/?limit=10&page=1 " ,
assert response.status_code == 200
assert len ( data [ " items " ]) == 10
assert data[ " total " ] == 25
" /api/tasks/?limit=10&page=2 " ,
assert len ( response. json () [ " items " ] ) == 10
def test_update_task_authorization ( self , db : Session ) :
user1 = create_test_user ( db , " user1@test.com " )
user2 = create_test_user ( db , " user2@test.com " )
project = create_test_project ( db , user1 )
task = create_test_task ( db , project )
# Try to update another user's task
headers = get_auth_headers ( user2 )
json = { " title " : " Hacked! " }
assert response.status_code == 404 # Not found for unauthorized user
@pytest.mark.parametrize ( " invalid_data " , [
{ " title " : "" }, # Empty title
{ " title " : " A " * 256 }, # Too long
{ " priority " : " INVALID " }, # Invalid enum
{ " project_id " : 999999 } # Non-existent project
def test_create_task_validation ( self , db : Session, invalid_data ) :
user = create_test_user ( db )
headers = get_auth_headers ( user )
assert response.status_code == 422 # Validation error
from strawberry.test import GraphQLTestClient
from app.graphql.schema import schema
from app.tests.utils import create_test_user, create_test_project
def test_query_projects ( self , client : GraphQLTestClient ) :
query GetProjects($first: Int!) {
projects(first: $first) {
context_value = { " user " : create_test_user () }
assert " projects " in result.data
def test_create_task_mutation ( self , client : GraphQLTestClient ) :
mutation CreateTask($input: CreateTaskInput!) {
createTask(input: $input) {
user = create_test_user ()
project = create_test_project ( user )
" projectId " : str ( project.id ) ,
context_value = { " user " : user}
task = result.data[ " createTask " ]
assert task[ " title " ] == " New Task "
assert task[ " status " ] == " TODO "
def test_subscription ( self , client : GraphQLTestClient ) :
subscription OnTaskCreated($projectId: ID!) {
taskCreated(projectId: $projectId) {
# Test subscription setup
variables = { " projectId " : " 1 " }
# In real test, would trigger task creation
# and assert subscription receives event
from locust import HttpUser, task, between
wait_time = between ( 1 , 3 )
response = self .client. post ( " /api/auth/login " , json = {
" username " : " test@example.com " ,
self .token = response. json () [ " access_token " ]
self .headers = { " Authorization " : f "Bearer { self .token } " }
self .client. get ( " /api/tasks/ " , headers = self .headers )
self .client. get ( " /api/projects/1 " , headers = self .headers )
" title " : f "Task {time. time () } " ,
# Run with: locust -f test_performance.py --host=http://localhost:8000
> Implement comprehensive security for our API:
> - JWT with refresh tokens
> - API key authentication for services
> - Role-based access control
> - Rate limiting per user/IP
from datetime import datetime, timedelta
from typing import Optional, Union
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
pwd_context = CryptContext ( schemes = [ " bcrypt " ], deprecated = " auto " )
oauth2_scheme = OAuth2PasswordBearer ( tokenUrl = " /api/auth/login " )
def __init__ ( self , secret_key : str , algorithm : str = " HS256 " ) :
self .secret_key = secret_key
self .algorithm = algorithm
subject : Union[ str , int ],
expires_delta : Optional[timedelta] = None ,
additional_claims : dict = None
expire = datetime. utcnow () + (
expires_delta or timedelta ( minutes = 15 )
" iat " : datetime. utcnow (),
to_encode. update ( additional_claims )
return jwt. encode ( to_encode , self .secret_key , self .algorithm )
def create_refresh_token ( self , subject : Union[ str , int ] ) -> str :
expire = datetime. utcnow () + timedelta ( days = 7 )
return jwt. encode ( to_encode , self .secret_key , self .algorithm )
def decode_token ( self , token : str ) -> dict :
token , self .secret_key , algorithms = [ self .algorithm ]
except jwt.ExpiredSignatureError:
status_code = status.HTTP_401_UNAUTHORIZED ,
detail = " Token has expired "
status_code = status.HTTP_401_UNAUTHORIZED ,
detail = " Could not validate credentials "
def __init__ ( self , resource : str , action : str ) :
def check ( self , user : User ) -> bool :
# Check if user has permission
return self in user. get_permissions ()
def __init__ ( self , permission : Permission ) :
self .permission = permission
def __call__ ( self , current_user : User = Depends ( get_current_user ) ) :
if not self .permission. check ( current_user ):
status_code = status.HTTP_403_FORBIDDEN ,
detail = " Insufficient permissions "
dependencies = [ Depends ( RequirePermission ( Permission ( " task " , " delete " ))) ]
async def delete_task ( task_id : int ) :
from pydantic import BaseModel, validator, constr, conint
from typing import Optional
def __get_validators__ ( cls ) :
if not isinstance ( v , str ):
raise TypeError ( " String required " )
# Remove dangerous HTML/scripts
return bleach. clean ( v , tags = [], strip = True )
class TaskCreateSchema ( BaseModel ):
title: constr ( min_length = 1 , max_length = 200 )
description: Optional[SanitizedStr] = None
priority: conint ( ge = 1 , le = 4 )
def validate_title ( cls , v ) :
raise ValueError ( " Title cannot be empty " )
> Generate comprehensive API documentation:
> - Request/response examples
> - Authentication details
> - Error response catalog
# main.py - Enhanced documentation
from fastapi import FastAPI
from fastapi.openapi.utils import get_openapi
return app.openapi_schema
openapi_schema = get_openapi (
title = " Task Management API " ,
RESTful API for task management with real-time updates.
Uses JWT Bearer tokens. Include in Authorization header:
Authorization: Bearer <token>
- 100 requests per hour for authenticated users
- 20 requests per hour for unauthenticated
Standard HTTP status codes. Error responses include:
"detail": "Error description",
"field": "field_name" // For validation errors
openapi_schema[ " components " ] [ " securitySchemes " ] = {
openapi_schema[ " paths " ] [ " /api/tasks/ " ][ " post " ][ " requestBody " ][ " content " ][ " application/json " ][ " examples " ] = {
" summary " : " Simple task " ,
" summary " : " Complete task " ,
" title " : " Implement feature " ,
" description " : " Add user authentication " ,
" due_date " : " 2024-12-31T23:59:59Z " ,
app.openapi_schema = openapi_schema
return app.openapi_schema
app.openapi = custom_openapi
from fastapi import APIRouter
v1_router = APIRouter ( prefix = " /api/v1 " )
v2_router = APIRouter ( prefix = " /api/v2 " )
app. include_router ( v1_router )
app. include_router ( v2_router )
# Version negotiation via headers
async def api_version_middleware ( request : Request, call_next ) :
version = request.headers. get ( " API-Version " , " v2 " )
request.state.api_version = version
response = await call_next ( request )
response.headers[ " API-Version " ] = version
You’ve learned how to leverage Claude Code for rapid API development, from design to implementation to testing. The key is letting Claude handle the boilerplate while you focus on business logic and architecture decisions.
Remember: A great API is more than just endpoints - it’s about consistency, security, performance, and developer experience. Use Claude Code to enforce best practices across all these dimensions, creating APIs that are a joy to build and use.