Skip to content

Building Custom MCP Servers

Learn how to build custom MCP servers that bring your unique tools, APIs, and data sources into your AI-assisted development environment.

While the MCP ecosystem offers many pre-built servers, you might need custom integration for:

  • Internal APIs specific to your organization
  • Proprietary tools not available publicly
  • Custom workflows unique to your team
  • Legacy systems requiring special handling
  • Security requirements demanding full control

MCP follows a client-server architecture where:

  • Clients (Cursor, Claude Code) connect to servers
  • Servers expose tools and resources
  • Transport handles communication (stdio, SSE, HTTP)
graph LR A[AI Assistant] --> B[MCP Client] B --> C[Transport Layer] C --> D[MCP Server] D --> E[Your Tool/API]
TransportUse CaseDeploymentSecurity
stdioLocal toolsRuns on user machineProcess isolation
SSERemote servicesHTTP serverOAuth/API keys
HTTPEnterpriseREST endpointsFull auth control
  1. Initialize project

    Terminal window
    mkdir my-mcp-server
    cd my-mcp-server
    npm init -y
    npm install @modelcontextprotocol/sdk zod
  2. Create basic server (index.js)

    import { Server } from '@modelcontextprotocol/sdk/server/index.js';
    import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
    import { z } from 'zod';
    // Create server instance
    const server = new Server({
    name: 'my-custom-mcp',
    version: '1.0.0',
    });
    // Define a tool
    server.tool(
    'get_greeting',
    'Returns a personalized greeting',
    {
    name: z.string().describe('Name to greet'),
    language: z.string().optional().describe('Language for greeting'),
    },
    async ({ name, language = 'en' }) => {
    const greetings = {
    en: `Hello, ${name}!`,
    es: `¡Hola, ${name}!`,
    fr: `Bonjour, ${name}!`,
    de: `Hallo, ${name}!`,
    };
    return {
    content: [
    {
    type: 'text',
    text: greetings[language] || greetings.en,
    },
    ],
    };
    }
    );
    // Start server
    const transport = new StdioServerTransport();
    await server.connect(transport);
  3. Add to package.json

    {
    "type": "module",
    "bin": {
    "my-mcp-server": "./index.js"
    }
    }
  4. Test locally

    Terminal window
    # Make executable
    chmod +x index.js
    # Add shebang to index.js first line
    #!/usr/bin/env node
    # Test with Claude Code
    claude mcp add my-server -- /path/to/my-mcp-server/index.js
  1. Set up project

    Terminal window
    mkdir my-mcp-server-py
    cd my-mcp-server-py
    pip install mcp pydantic
  2. Create server (server.py)

    #!/usr/bin/env python3
    import asyncio
    from mcp.server import Server, StdioServerTransport
    from mcp.server.models import InitializationOptions
    from mcp.types import Tool, TextContent
    from pydantic import BaseModel
    class GreetingParams(BaseModel):
    name: str
    language: str = "en"
    async def main():
    server = Server("my-python-mcp")
    @server.list_tools()
    async def list_tools() -> list[Tool]:
    return [
    Tool(
    name="get_greeting",
    description="Returns a personalized greeting",
    inputSchema={
    "type": "object",
    "properties": {
    "name": {"type": "string"},
    "language": {"type": "string", "default": "en"}
    },
    "required": ["name"]
    }
    )
    ]
    @server.call_tool()
    async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    if name == "get_greeting":
    params = GreetingParams(**arguments)
    greetings = {
    "en": f"Hello, {params.name}!",
    "es": f"¡Hola, {params.name}!",
    "fr": f"Bonjour, {params.name}!",
    }
    return [TextContent(
    type="text",
    text=greetings.get(params.language, greetings["en"])
    )]
    async with StdioServerTransport() as transport:
    await server.run(
    transport,
    InitializationOptions(
    server_name="my-python-mcp",
    server_version="1.0.0"
    )
    )
    if __name__ == "__main__":
    asyncio.run(main())
  3. Make executable and test

    Terminal window
    chmod +x server.py
    claude mcp add my-py-server -- /path/to/server.py

Create servers with multiple capabilities:

// Database inspection tool
server.tool(
'inspect_table',
'Get schema information for a database table',
{
tableName: z.string(),
includeIndexes: z.boolean().optional(),
},
async ({ tableName, includeIndexes }) => {
const schema = await db.getTableSchema(tableName);
const indexes = includeIndexes ? await db.getIndexes(tableName) : [];
return {
content: [{
type: 'text',
text: JSON.stringify({ schema, indexes }, null, 2),
}],
};
}
);
// Query execution tool
server.tool(
'execute_query',
'Run a read-only SQL query',
{
query: z.string(),
limit: z.number().optional().default(100),
},
async ({ query, limit }) => {
// Validate query is read-only
if (!isReadOnlyQuery(query)) {
throw new Error('Only SELECT queries are allowed');
}
const results = await db.query(`${query} LIMIT ${limit}`);
return {
content: [{
type: 'text',
text: formatQueryResults(results),
}],
};
}
);

Expose browseable resources:

// List available resources
server.listResources(async () => {
const documents = await getDocumentList();
return documents.map(doc => ({
uri: `doc:///${doc.id}`,
name: doc.title,
description: doc.summary,
mimeType: 'text/markdown',
}));
});
// Read specific resource
server.readResource(async (uri) => {
const docId = uri.replace('doc:///', '');
const content = await getDocumentContent(docId);
return {
contents: [{
uri,
mimeType: 'text/markdown',
text: content,
}],
};
});

Implement secure authentication:

// Environment-based auth
const API_KEY = process.env.MY_SERVICE_API_KEY;
if (!API_KEY) {
console.error('MY_SERVICE_API_KEY environment variable required');
process.exit(1);
}
// OAuth for remote servers
server.tool(
'fetch_user_data',
'Get current user information',
{},
async (params, { authToken }) => {
const response = await fetch('https://api.service.com/user', {
headers: {
'Authorization': `Bearer ${authToken}`,
},
});
const data = await response.json();
return {
content: [{ type: 'text', text: JSON.stringify(data) }],
};
}
);

Implement robust error handling:

server.tool(
'process_data',
'Process data with comprehensive error handling',
{ data: z.string() },
async ({ data }) => {
try {
const result = await processData(data);
return {
content: [{ type: 'text', text: result }],
};
} catch (error) {
if (error instanceof ValidationError) {
return {
content: [{
type: 'text',
text: `Validation error: ${error.message}`,
}],
isError: true,
};
} else if (error instanceof NetworkError) {
return {
content: [{
type: 'text',
text: `Network error: Please check your connection`,
}],
isError: true,
};
} else {
// Log unexpected errors
console.error('Unexpected error:', error);
return {
content: [{
type: 'text',
text: 'An unexpected error occurred',
}],
isError: true,
};
}
}
}
);

Connect to your company’s internal services:

import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
import { z } from 'zod';
import fetch from 'node-fetch';
const server = new Server({
name: 'company-api-mcp',
version: '1.0.0',
});
const API_BASE = process.env.INTERNAL_API_URL || 'https://api.company.internal';
const API_KEY = process.env.INTERNAL_API_KEY;
// Search internal knowledge base
server.tool(
'search_knowledge_base',
'Search company knowledge base and documentation',
{
query: z.string(),
limit: z.number().optional().default(10),
category: z.enum(['docs', 'wiki', 'tickets', 'all']).optional(),
},
async ({ query, limit, category = 'all' }) => {
const response = await fetch(`${API_BASE}/search`, {
method: 'POST',
headers: {
'Authorization': `Bearer ${API_KEY}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({ query, limit, category }),
});
const results = await response.json();
return {
content: [{
type: 'text',
text: formatSearchResults(results),
}],
};
}
);
// Get service status
server.tool(
'get_service_status',
'Check status of internal services',
{
service: z.string().optional(),
},
async ({ service }) => {
const endpoint = service
? `${API_BASE}/status/${service}`
: `${API_BASE}/status`;
const response = await fetch(endpoint, {
headers: { 'Authorization': `Bearer ${API_KEY}` },
});
const status = await response.json();
return {
content: [{
type: 'text',
text: formatStatusReport(status),
}],
};
}
);
const transport = new StdioServerTransport();
await server.connect(transport);

Modernize access to legacy systems:

#!/usr/bin/env python3
import asyncio
import subprocess
from mcp.server import Server, StdioServerTransport
from mcp.types import Tool, TextContent
class LegacySystemMCP:
def __init__(self):
self.server = Server("legacy-system-mcp")
self.setup_tools()
def setup_tools(self):
@self.server.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="query_mainframe",
description="Query legacy mainframe system",
inputSchema={
"type": "object",
"properties": {
"command": {"type": "string"},
"parameters": {"type": "array", "items": {"type": "string"}}
},
"required": ["command"]
}
),
Tool(
name="convert_ebcdic",
description="Convert EBCDIC data to UTF-8",
inputSchema={
"type": "object",
"properties": {
"data": {"type": "string"}
},
"required": ["data"]
}
)
]
@self.server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
if name == "query_mainframe":
return await self.query_mainframe(
arguments["command"],
arguments.get("parameters", [])
)
elif name == "convert_ebcdic":
return await self.convert_ebcdic(arguments["data"])
async def query_mainframe(self, command: str, parameters: list) -> list[TextContent]:
# Wrap legacy CLI tool
cmd = ["legacy-cli", command] + parameters
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=30
)
if result.returncode != 0:
return [TextContent(
type="text",
text=f"Error: {result.stderr}"
)]
return [TextContent(
type="text",
text=result.stdout
)]
except subprocess.TimeoutExpired:
return [TextContent(
type="text",
text="Query timed out after 30 seconds"
)]
async def convert_ebcdic(self, data: str) -> list[TextContent]:
# Convert EBCDIC encoded data
try:
converted = data.encode('cp500').decode('utf-8')
return [TextContent(
type="text",
text=converted
)]
except Exception as e:
return [TextContent(
type="text",
text=f"Conversion error: {str(e)}"
)]
async def run(self):
async with StdioServerTransport() as transport:
await self.server.run(transport)
if __name__ == "__main__":
mcp = LegacySystemMCP()
asyncio.run(mcp.run())

Test individual tools:

test/server.test.ts
import { describe, it, expect } from 'vitest';
import { MyMCPServer } from '../src/server';
describe('MCP Server Tools', () => {
const server = new MyMCPServer();
it('should return greeting in correct language', async () => {
const result = await server.callTool('get_greeting', {
name: 'Alice',
language: 'es',
});
expect(result.content[0].text).toBe('¡Hola, Alice!');
});
it('should handle missing language gracefully', async () => {
const result = await server.callTool('get_greeting', {
name: 'Bob',
language: 'unknown',
});
expect(result.content[0].text).toBe('Hello, Bob!');
});
});

Test with actual MCP client:

test-integration.sh
#!/bin/bash
# Start server in background
./my-mcp-server &
SERVER_PID=$!
# Test with Claude Code
echo "Testing greeting tool..." | claude --mcp-server="test:./my-mcp-server"
# Cleanup
kill $SERVER_PID

Enable debug logging:

const DEBUG = process.env.DEBUG === 'true';
function log(...args: any[]) {
if (DEBUG) {
console.error('[MCP Debug]', ...args);
}
}
server.tool(
'complex_operation',
'Performs complex operation with debugging',
{ input: z.string() },
async ({ input }) => {
log('Starting complex operation with input:', input);
try {
const step1 = await processStep1(input);
log('Step 1 complete:', step1);
const step2 = await processStep2(step1);
log('Step 2 complete:', step2);
const result = await finalStep(step2);
log('Operation complete:', result);
return {
content: [{ type: 'text', text: result }],
};
} catch (error) {
log('Error in complex operation:', error);
throw error;
}
}
);

Package for easy installation:

package.json
{
"name": "@mycompany/mcp-server",
"version": "1.0.0",
"bin": {
"mycompany-mcp": "./dist/index.js"
},
"scripts": {
"build": "tsc",
"prepublishOnly": "npm run build"
}
}

Install globally:

Terminal window
npm install -g @mycompany/mcp-server
claude mcp add mycompany -- mycompany-mcp

Deploy as HTTP/SSE service:

http-server.ts
import express from 'express';
import { createSSEServer } from '@modelcontextprotocol/sdk/server/sse.js';
const app = express();
const mcp = createSSEServer(server, {
endpoint: '/sse',
});
// Add authentication middleware
app.use('/sse', authenticateRequest);
// Mount MCP SSE endpoint
app.use(mcp);
// Health check
app.get('/health', (req, res) => {
res.json({ status: 'healthy' });
});
app.listen(process.env.PORT || 3000);

Input Validation

Always validate and sanitize inputs. Never trust data from the AI client without verification.

Least Privilege

Run servers with minimal permissions. Use read-only database access where possible.

Secrets Management

Never hardcode credentials. Use environment variables or secure vaults for sensitive data.

Audit Logging

Log all operations for security auditing. Include timestamps and parameters (excluding sensitive data).

  1. Implement timeouts

    const TIMEOUT_MS = 30000;
    server.tool('long_operation', params, async (input) => {
    const timeout = setTimeout(() => {
    throw new Error('Operation timed out');
    }, TIMEOUT_MS);
    try {
    const result = await performOperation(input);
    clearTimeout(timeout);
    return result;
    } catch (error) {
    clearTimeout(timeout);
    throw error;
    }
    });
  2. Cache expensive operations

    const cache = new Map();
    const CACHE_TTL = 300000; // 5 minutes
    server.tool('expensive_query', params, async (input) => {
    const cacheKey = JSON.stringify(input);
    const cached = cache.get(cacheKey);
    if (cached && cached.expires > Date.now()) {
    return cached.result;
    }
    const result = await performExpensiveQuery(input);
    cache.set(cacheKey, {
    result,
    expires: Date.now() + CACHE_TTL,
    });
    return result;
    });
  3. Stream large responses

    server.tool('large_data_export', params, async function* (input) {
    const totalRows = await getTotalRows(input);
    const batchSize = 1000;
    for (let offset = 0; offset < totalRows; offset += batchSize) {
    const batch = await fetchBatch(input, offset, batchSize);
    yield {
    content: [{
    type: 'text',
    text: JSON.stringify(batch),
    }],
    };
    }
    });

Always include comprehensive documentation:

/**
* My Custom MCP Server
*
* Provides integration with internal systems for AI-assisted development.
*
* ## Available Tools
*
* - `search_knowledge_base`: Search internal documentation
* - `get_service_status`: Check service health
* - `query_metrics`: Retrieve system metrics
*
* ## Configuration
*
* Required environment variables:
* - `INTERNAL_API_KEY`: API key for internal services
* - `INTERNAL_API_URL`: Base URL for internal API (default: https://api.internal)
*
* ## Installation
*
* ```bash
* npm install -g @company/mcp-server
* claude mcp add company -- company-mcp
* ```
*/
IssueSolution
Server won’t startCheck shebang line, file permissions, and dependencies
”No tools available”Verify tool registration and server initialization
Authentication failuresConfirm environment variables and token validity
Timeout errorsIncrease timeout limits or optimize slow operations
Memory issuesImplement streaming for large data sets

Add comprehensive debugging:

// Enable with DEBUG=true environment variable
if (process.env.DEBUG) {
server.on('tool_call', ({ tool, params }) => {
console.error(`[MCP] Tool called: ${tool}`, params);
});
server.on('error', (error) => {
console.error('[MCP] Server error:', error);
});
process.on('uncaughtException', (error) => {
console.error('[MCP] Uncaught exception:', error);
process.exit(1);
});
}