Skip to content

REST/GraphQL APIs

Building APIs is where backend development meets the real world. Whether you’re creating a simple REST endpoint or a complex GraphQL schema, Claude Code transforms API development from tedious boilerplate into elegant, well-tested services. This lesson explores how to leverage AI assistance for API design, implementation, and maintenance.

Scenario: Your startup needs to build an API serving mobile apps, web clients, and third-party integrations. Requirements include real-time updates, complex authorization, rate limiting, and support for 100K daily active users. Traditional approach: months of development. With Claude Code: weeks.

Week 1-2: API Design
- Manual OpenAPI spec writing
- Debate REST vs GraphQL
- Design authentication flow
Week 3-4: Implementation
- Write boilerplate for each endpoint
- Implement validation manually
- Add error handling everywhere
Week 5-6: Testing & Documentation
- Write tests for each endpoint
- Create Postman collections
- Document each parameter
Week 7+: Refinement
- Fix inconsistencies
- Add missing features
- Performance optimization
> Design a RESTful API for a task management system with:
> - User authentication and authorization
> - Projects, tasks, and comments
> - File attachments
> - Real-time notifications
> Follow REST best practices and conventions

Claude generates a comprehensive API design:

Resource Structure

# API Resources and Endpoints
## Authentication
POST /auth/register
POST /auth/login
POST /auth/refresh
POST /auth/logout
## Users
GET /users # List users (admin only)
GET /users/{id} # Get user profile
PUT /users/{id} # Update user
DELETE /users/{id} # Delete account
## Projects
GET /projects # List user's projects
POST /projects # Create project
GET /projects/{id} # Get project details
PUT /projects/{id} # Update project
DELETE /projects/{id} # Delete project
## Tasks
GET /projects/{pid}/tasks # List tasks
POST /projects/{pid}/tasks # Create task
GET /tasks/{id} # Get task
PUT /tasks/{id} # Update task
DELETE /tasks/{id} # Delete task
PATCH /tasks/{id}/status # Update status only
## Comments
GET /tasks/{tid}/comments # List comments
POST /tasks/{tid}/comments # Add comment
PUT /comments/{id} # Edit comment
DELETE /comments/{id} # Delete comment

HTTP Conventions

## Status Codes
200 OK - Successful GET/PUT
201 Created - Successful POST
204 No Content - Successful DELETE
400 Bad Request - Invalid input
401 Unauthorized - Missing/invalid auth
403 Forbidden - No permission
404 Not Found - Resource not exist
409 Conflict - Duplicate/conflict
429 Too Many Requests - Rate limited
## Headers
Authorization: Bearer <token>
Content-Type: application/json
X-Request-ID: <uuid>
X-RateLimit-Remaining: 100
X-RateLimit-Reset: 1642345678
## Query Parameters
?page=1&limit=20 # Pagination
?sort=created_at&order=desc # Sorting
?filter[status]=active # Filtering
?include=user,project # Relationships
?fields=id,title,status # Sparse fields
> Implement the task management API using FastAPI:
> - Use SQLAlchemy for database
> - Pydantic for validation
> - JWT for authentication
> - Include middleware for CORS and rate limiting

Claude generates a complete implementation:

main.py
from fastapi import FastAPI, Depends, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.orm import Session
import uvicorn
from app.core.config import settings
from app.core.security import get_current_user
from app.api import auth, users, projects, tasks
from app.db.base import engine, Base
from app.middleware.rate_limit import RateLimitMiddleware
# Create database tables
Base.metadata.create_all(bind=engine)
# Initialize FastAPI app
app = FastAPI(
title="Task Management API",
description="RESTful API for task management",
version="1.0.0",
docs_url="/api/docs",
redoc_url="/api/redoc"
)
# Add middleware
app.add_middleware(
CORSMiddleware,
allow_origins=settings.ALLOWED_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.add_middleware(
RateLimitMiddleware,
calls=100,
period=3600 # 100 calls per hour
)
# Include routers
app.include_router(auth.router, prefix="/api/auth", tags=["auth"])
app.include_router(users.router, prefix="/api/users", tags=["users"])
app.include_router(projects.router, prefix="/api/projects", tags=["projects"])
app.include_router(tasks.router, prefix="/api/tasks", tags=["tasks"])
@app.get("/api/health")
async def health_check():
return {"status": "healthy"}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
> Implement advanced pagination and filtering for the tasks endpoint:
> - Cursor-based pagination for real-time data
> - Complex filtering with operators
> - Full-text search
> - Sorting by multiple fields
# Advanced query handling
from typing import Optional, List
from fastapi import Query
from sqlalchemy import or_, and_
class TaskQueryParams:
def __init__(
self,
# Pagination
cursor: Optional[str] = None,
limit: int = Query(20, le=100),
# Filtering
status: Optional[List[str]] = Query(None),
priority: Optional[str] = Query(None),
assigned_to: Optional[int] = None,
created_after: Optional[datetime] = None,
created_before: Optional[datetime] = None,
# Search
search: Optional[str] = None,
# Sorting
sort_by: List[str] = Query(["created_at"]),
sort_order: List[str] = Query(["desc"])
):
self.cursor = cursor
self.limit = limit
self.filters = self._build_filters(locals())
self.search = search
self.sorting = list(zip(sort_by, sort_order))
def apply_to_query(self, query):
# Apply filters
if self.filters:
query = query.filter(and_(*self.filters))
# Apply search
if self.search:
query = query.filter(
or_(
Task.title.ilike(f"%{self.search}%"),
Task.description.ilike(f"%{self.search}%")
)
)
# Apply sorting
for field, order in self.sorting:
column = getattr(Task, field)
query = query.order_by(
column.desc() if order == "desc" else column
)
# Apply cursor pagination
if self.cursor:
query = query.filter(Task.id > self.decode_cursor(self.cursor))
return query.limit(self.limit + 1)
middleware/rate_limit.py
from fastapi import Request, HTTPException
from starlette.middleware.base import BaseHTTPMiddleware
import time
from collections import defaultdict
import asyncio
class RateLimitMiddleware(BaseHTTPMiddleware):
def __init__(self, app, calls: int = 100, period: int = 3600):
super().__init__(app)
self.calls = calls
self.period = period
self.clients = defaultdict(list)
self.cleanup_task = asyncio.create_task(self.cleanup())
async def dispatch(self, request: Request, call_next):
# Get client identifier
client_id = request.client.host
if auth := request.headers.get("Authorization"):
# Use user ID for authenticated requests
client_id = self.get_user_id_from_token(auth)
# Check rate limit
now = time.time()
requests = self.clients[client_id]
requests = [req for req in requests if req > now - self.period]
if len(requests) >= self.calls:
raise HTTPException(
status_code=429,
detail="Rate limit exceeded",
headers={
"X-RateLimit-Limit": str(self.calls),
"X-RateLimit-Remaining": "0",
"X-RateLimit-Reset": str(int(requests[0] + self.period))
}
)
# Record request
requests.append(now)
self.clients[client_id] = requests
# Add rate limit headers
response = await call_next(request)
response.headers["X-RateLimit-Limit"] = str(self.calls)
response.headers["X-RateLimit-Remaining"] = str(self.calls - len(requests))
return response
> Design a GraphQL schema for the same task management system.
> Include:
> - Type definitions for all entities
> - Queries for reading data
> - Mutations for modifications
> - Subscriptions for real-time updates
> - Custom scalars for dates

Claude generates a comprehensive GraphQL schema:

schema.graphql
scalar DateTime
scalar Upload
# User type
type User {
id: ID!
email: String!
username: String!
createdAt: DateTime!
updatedAt: DateTime!
# Relationships
projects(
first: Int = 10
after: String
filter: ProjectFilter
): ProjectConnection!
tasks(
first: Int = 10
after: String
filter: TaskFilter
): TaskConnection!
}
# Project type
type Project {
id: ID!
title: String!
description: String
color: String
createdAt: DateTime!
updatedAt: DateTime!
# Relationships
owner: User!
tasks(
first: Int = 10
after: String
filter: TaskFilter
orderBy: TaskOrderBy
): TaskConnection!
members: [User!]!
# Computed fields
taskCount: Int!
completedTaskCount: Int!
progress: Float!
}
# Task type
type Task {
id: ID!
title: String!
description: String
status: TaskStatus!
priority: TaskPriority!
dueDate: DateTime
completedAt: DateTime
createdAt: DateTime!
updatedAt: DateTime!
# Relationships
project: Project!
assignee: User
creator: User!
comments(
first: Int = 10
after: String
): CommentConnection!
attachments: [Attachment!]!
tags: [String!]!
}
# Enums
enum TaskStatus {
TODO
IN_PROGRESS
IN_REVIEW
COMPLETED
ARCHIVED
}
enum TaskPriority {
LOW
MEDIUM
HIGH
URGENT
}
# Input types
input CreateTaskInput {
title: String!
description: String
projectId: ID!
assigneeId: ID
priority: TaskPriority = MEDIUM
dueDate: DateTime
tags: [String!]
}
input UpdateTaskInput {
title: String
description: String
status: TaskStatus
priority: TaskPriority
assigneeId: ID
dueDate: DateTime
tags: [String!]
}
# Pagination types
type PageInfo {
hasNextPage: Boolean!
hasPreviousPage: Boolean!
startCursor: String
endCursor: String
}
type TaskConnection {
edges: [TaskEdge!]!
pageInfo: PageInfo!
totalCount: Int!
}
type TaskEdge {
node: Task!
cursor: String!
}
# Root types
type Query {
# User queries
me: User!
user(id: ID!): User
users(
first: Int = 10
after: String
search: String
): UserConnection!
# Project queries
project(id: ID!): Project
projects(
first: Int = 10
after: String
filter: ProjectFilter
): ProjectConnection!
# Task queries
task(id: ID!): Task
tasks(
first: Int = 10
after: String
filter: TaskFilter
orderBy: TaskOrderBy
): TaskConnection!
# Search
search(
query: String!
types: [SearchType!]
): SearchResult!
}
type Mutation {
# Auth mutations
register(input: RegisterInput!): AuthPayload!
login(email: String!, password: String!): AuthPayload!
logout: Boolean!
refreshToken(token: String!): AuthPayload!
# Project mutations
createProject(input: CreateProjectInput!): Project!
updateProject(id: ID!, input: UpdateProjectInput!): Project!
deleteProject(id: ID!): Boolean!
# Task mutations
createTask(input: CreateTaskInput!): Task!
updateTask(id: ID!, input: UpdateTaskInput!): Task!
deleteTask(id: ID!): Boolean!
# Batch operations
batchUpdateTasks(ids: [ID!]!, input: UpdateTaskInput!): [Task!]!
# File upload
uploadAttachment(taskId: ID!, file: Upload!): Attachment!
}
type Subscription {
# Task subscriptions
taskCreated(projectId: ID!): Task!
taskUpdated(id: ID!): Task!
taskDeleted(id: ID!): ID!
# Project subscriptions
projectUpdated(id: ID!): Project!
# Presence
userPresence(projectId: ID!): PresenceEvent!
}
> Implement the GraphQL API using Strawberry framework:
> - Use SQLAlchemy for database
> - Add DataLoader for N+1 prevention
> - Implement authentication
> - Add subscription support
graphql/schema.py
import strawberry
from typing import List, Optional
from datetime import datetime
from strawberry.types import Info
from app.graphql.types import User, Project, Task
from app.graphql.resolvers import (
get_current_user,
resolve_projects,
resolve_tasks
)
from app.graphql.mutations import AuthMutations, TaskMutations
@strawberry.type
class Query:
@strawberry.field
async def me(self, info: Info) -> User:
return await get_current_user(info)
@strawberry.field
async def projects(
self,
info: Info,
first: int = 10,
after: Optional[str] = None
) -> ProjectConnection:
user = await get_current_user(info)
return await resolve_projects(user, first, after)
@strawberry.field
async def tasks(
self,
info: Info,
first: int = 10,
after: Optional[str] = None,
status: Optional[TaskStatus] = None,
project_id: Optional[strawberry.ID] = None
) -> TaskConnection:
user = await get_current_user(info)
return await resolve_tasks(
user, first, after, status, project_id
)
@strawberry.type
class Mutation(AuthMutations, TaskMutations):
pass
@strawberry.type
class Subscription:
@strawberry.subscription
async def task_created(
self, info: Info, project_id: strawberry.ID
) -> Task:
async for task in task_created_generator(project_id):
yield task
schema = strawberry.Schema(
query=Query,
mutation=Mutation,
subscription=Subscription
)
> Generate comprehensive tests for our API:
> - Unit tests for each endpoint
> - Integration tests with database
> - Authentication/authorization tests
> - Error handling tests
> - Performance tests
tests/test_api_tasks.py
import pytest
from fastapi.testclient import TestClient
from sqlalchemy.orm import Session
from app.main import app
from app.tests.utils import (
create_test_user,
create_test_project,
create_test_task,
get_auth_headers
)
client = TestClient(app)
class TestTaskAPI:
def test_create_task(self, db: Session):
user = create_test_user(db)
project = create_test_project(db, user)
headers = get_auth_headers(user)
response = client.post(
"/api/tasks/",
headers=headers,
json={
"title": "Test Task",
"description": "Test description",
"project_id": project.id,
"priority": "HIGH"
}
)
assert response.status_code == 201
data = response.json()
assert data["title"] == "Test Task"
assert data["priority"] == "HIGH"
assert data["status"] == "TODO"
def test_list_tasks_pagination(self, db: Session):
user = create_test_user(db)
project = create_test_project(db, user)
# Create 25 tasks
for i in range(25):
create_test_task(db, project, f"Task {i}")
headers = get_auth_headers(user)
# First page
response = client.get(
"/api/tasks/?limit=10&page=1",
headers=headers
)
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 10
assert data["total"] == 25
assert data["page"] == 1
# Second page
response = client.get(
"/api/tasks/?limit=10&page=2",
headers=headers
)
assert len(response.json()["items"]) == 10
def test_update_task_authorization(self, db: Session):
user1 = create_test_user(db, "user1@test.com")
user2 = create_test_user(db, "user2@test.com")
project = create_test_project(db, user1)
task = create_test_task(db, project)
# Try to update another user's task
headers = get_auth_headers(user2)
response = client.put(
f"/api/tasks/{task.id}",
headers=headers,
json={"title": "Hacked!"}
)
assert response.status_code == 404 # Not found for unauthorized user
@pytest.mark.parametrize("invalid_data", [
{"title": ""}, # Empty title
{"title": "A" * 256}, # Too long
{"priority": "INVALID"}, # Invalid enum
{"project_id": 999999} # Non-existent project
])
def test_create_task_validation(self, db: Session, invalid_data):
user = create_test_user(db)
headers = get_auth_headers(user)
response = client.post(
"/api/tasks/",
headers=headers,
json=invalid_data
)
assert response.status_code == 422 # Validation error
tests/test_performance.py
import asyncio
import time
from locust import HttpUser, task, between
class APIUser(HttpUser):
wait_time = between(1, 3)
def on_start(self):
# Login to get token
response = self.client.post("/api/auth/login", json={
"username": "test@example.com",
"password": "testpass"
})
self.token = response.json()["access_token"]
self.headers = {"Authorization": f"Bearer {self.token}"}
@task(3)
def list_tasks(self):
self.client.get("/api/tasks/", headers=self.headers)
@task(2)
def get_project(self):
self.client.get("/api/projects/1", headers=self.headers)
@task(1)
def create_task(self):
self.client.post(
"/api/tasks/",
headers=self.headers,
json={
"title": f"Task {time.time()}",
"project_id": 1
}
)
# Run with: locust -f test_performance.py --host=http://localhost:8000
> Implement comprehensive security for our API:
> - JWT with refresh tokens
> - OAuth2 integration
> - API key authentication for services
> - Role-based access control
> - Rate limiting per user/IP
core/security.py
from datetime import datetime, timedelta
from typing import Optional, Union
import jwt
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login")
class SecurityManager:
def __init__(self, secret_key: str, algorithm: str = "HS256"):
self.secret_key = secret_key
self.algorithm = algorithm
def create_access_token(
self,
subject: Union[str, int],
expires_delta: Optional[timedelta] = None,
additional_claims: dict = None
) -> str:
expire = datetime.utcnow() + (
expires_delta or timedelta(minutes=15)
)
to_encode = {
"sub": str(subject),
"exp": expire,
"iat": datetime.utcnow(),
"type": "access"
}
if additional_claims:
to_encode.update(additional_claims)
return jwt.encode(to_encode, self.secret_key, self.algorithm)
def create_refresh_token(self, subject: Union[str, int]) -> str:
expire = datetime.utcnow() + timedelta(days=7)
to_encode = {
"sub": str(subject),
"exp": expire,
"type": "refresh"
}
return jwt.encode(to_encode, self.secret_key, self.algorithm)
def decode_token(self, token: str) -> dict:
try:
payload = jwt.decode(
token, self.secret_key, algorithms=[self.algorithm]
)
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token has expired"
)
except jwt.JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials"
)
# Permission system
class Permission:
def __init__(self, resource: str, action: str):
self.resource = resource
self.action = action
def check(self, user: User) -> bool:
# Check if user has permission
return self in user.get_permissions()
class RequirePermission:
def __init__(self, permission: Permission):
self.permission = permission
def __call__(self, current_user: User = Depends(get_current_user)):
if not self.permission.check(current_user):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Insufficient permissions"
)
return current_user
# Usage in endpoints
@router.delete(
"/{task_id}",
dependencies=[Depends(RequirePermission(Permission("task", "delete")))]
)
async def delete_task(task_id: int):
pass
schemas/validators.py
from pydantic import BaseModel, validator, constr, conint
import bleach
from typing import Optional
class SanitizedStr(str):
@classmethod
def __get_validators__(cls):
yield cls.validate
@classmethod
def validate(cls, v):
if not isinstance(v, str):
raise TypeError("String required")
# Remove dangerous HTML/scripts
return bleach.clean(v, tags=[], strip=True)
class TaskCreateSchema(BaseModel):
title: constr(min_length=1, max_length=200)
description: Optional[SanitizedStr] = None
priority: conint(ge=1, le=4)
@validator("title")
def validate_title(cls, v):
if not v.strip():
raise ValueError("Title cannot be empty")
return v.strip()
class Config:
# Prevent extra fields
extra = "forbid"
> Generate comprehensive API documentation:
> - OpenAPI/Swagger spec
> - Request/response examples
> - Authentication details
> - Error response catalog
> - Versioning strategy
# main.py - Enhanced documentation
from fastapi import FastAPI
from fastapi.openapi.utils import get_openapi
app = FastAPI()
def custom_openapi():
if app.openapi_schema:
return app.openapi_schema
openapi_schema = get_openapi(
title="Task Management API",
version="2.0.0",
description="""
## Overview
RESTful API for task management with real-time updates.
## Authentication
Uses JWT Bearer tokens. Include in Authorization header:
Authorization: Bearer <token>
## Rate Limiting
- 100 requests per hour for authenticated users
- 20 requests per hour for unauthenticated
## Errors
Standard HTTP status codes. Error responses include:
{
"detail": "Error description",
"code": "ERROR_CODE",
"field": "field_name" // For validation errors
}
""",
routes=app.routes,
)
# Add security scheme
openapi_schema["components"]["securitySchemes"] = {
"BearerAuth": {
"type": "http",
"scheme": "bearer",
"bearerFormat": "JWT",
}
}
# Add examples
openapi_schema["paths"]["/api/tasks/"]["post"]["requestBody"]["content"]["application/json"]["examples"] = {
"simple": {
"summary": "Simple task",
"value": {
"title": "Review PR",
"project_id": 1
}
},
"complete": {
"summary": "Complete task",
"value": {
"title": "Implement feature",
"description": "Add user authentication",
"priority": 3,
"due_date": "2024-12-31T23:59:59Z",
"assigned_to": 5
}
}
}
app.openapi_schema = openapi_schema
return app.openapi_schema
app.openapi = custom_openapi
api/v1/routes.py
from fastapi import APIRouter
v1_router = APIRouter(prefix="/api/v1")
# api/v2/routes.py
v2_router = APIRouter(prefix="/api/v2")
# main.py
app.include_router(v1_router)
app.include_router(v2_router)
# Version negotiation via headers
@app.middleware("http")
async def api_version_middleware(request: Request, call_next):
version = request.headers.get("API-Version", "v2")
request.state.api_version = version
response = await call_next(request)
response.headers["API-Version"] = version
return response

You’ve learned how to leverage Claude Code for rapid API development, from design to implementation to testing. The key is letting Claude handle the boilerplate while you focus on business logic and architecture decisions.

Remember: A great API is more than just endpoints - it’s about consistency, security, performance, and developer experience. Use Claude Code to enforce best practices across all these dimensions, creating APIs that are a joy to build and use.