Walidacja inputów
Zawsze waliduj i sanityzuj inputy. Nigdy nie ufaj danym z klienta AI bez weryfikacji.
Naucz się budować niestandardowe serwery MCP, które wprowadzają Twoje unikalne narzędzia, API i źródła danych do środowiska programistycznego wspomaganego przez AI.
Podczas gdy ekosystem MCP oferuje wiele gotowych serwerów, możesz potrzebować niestandardowej integracji dla:
MCP podąża za architekturą klient-serwer gdzie:
Transport | Przypadek użycia | Wdrożenie | Bezpieczeństwo |
---|---|---|---|
stdio | Narzędzia lokalne | Działa na maszynie użytkownika | Izolacja procesów |
SSE | Usługi zdalne | Serwer HTTP | OAuth/klucze API |
HTTP | Enterprise | Punkty końcowe REST | Pełna kontrola uwierzytelniania |
Inicjalizuj projekt
mkdir my-mcp-servercd my-mcp-servernpm init -ynpm install @modelcontextprotocol/sdk zod
Utwórz podstawowy serwer (index.js
)
import { Server } from '@modelcontextprotocol/sdk/server/index.js';import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';import { z } from 'zod';
// Utwórz instancję serweraconst server = new Server({ name: 'my-custom-mcp', version: '1.0.0',});
// Zdefiniuj narzędzieserver.tool( 'get_greeting', 'Zwraca spersonalizowane powitanie', { name: z.string().describe('Imię do powitania'), language: z.string().optional().describe('Język powitania'), }, async ({ name, language = 'en' }) => { const greetings = { en: `Hello, ${name}!`, es: `¡Hola, ${name}!`, fr: `Bonjour, ${name}!`, de: `Hallo, ${name}!`, pl: `Cześć, ${name}!`, };
return { content: [ { type: 'text', text: greetings[language] || greetings.en, }, ], }; });
// Uruchom serwerconst transport = new StdioServerTransport();await server.connect(transport);
Dodaj do package.json
{ "type": "module", "bin": { "my-mcp-server": "./index.js" }}
Testuj lokalnie
# Ustaw jako wykonywalnychmod +x index.js
# Dodaj shebang do pierwszej linii index.js#!/usr/bin/env node
# Testuj z Claude Codeclaude mcp add my-server -- /path/to/my-mcp-server/index.js
Skonfiguruj projekt
mkdir my-mcp-server-pycd my-mcp-server-pypip install mcp pydantic
Utwórz serwer (server.py
)
#!/usr/bin/env python3import asynciofrom mcp.server import Server, StdioServerTransportfrom mcp.server.models import InitializationOptionsfrom mcp.types import Tool, TextContentfrom pydantic import BaseModel
class GreetingParams(BaseModel): name: str language: str = "en"
async def main(): server = Server("my-python-mcp")
@server.list_tools() async def list_tools() -> list[Tool]: return [ Tool( name="get_greeting", description="Zwraca spersonalizowane powitanie", inputSchema={ "type": "object", "properties": { "name": {"type": "string"}, "language": {"type": "string", "default": "en"} }, "required": ["name"] } ) ]
@server.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: if name == "get_greeting": params = GreetingParams(**arguments) greetings = { "en": f"Hello, {params.name}!", "es": f"¡Hola, {params.name}!", "fr": f"Bonjour, {params.name}!", "pl": f"Cześć, {params.name}!", }
return [TextContent( type="text", text=greetings.get(params.language, greetings["en"]) )]
async with StdioServerTransport() as transport: await server.run( transport, InitializationOptions( server_name="my-python-mcp", server_version="1.0.0" ) )
if __name__ == "__main__": asyncio.run(main())
Ustaw jako wykonywalny i testuj
chmod +x server.pyclaude mcp add my-py-server -- /path/to/server.py
Twórz serwery z wieloma możliwościami:
// Narzędzie inspekcji bazy danychserver.tool( 'inspect_table', 'Pobierz informacje o schemacie dla tabeli bazy danych', { tableName: z.string(), includeIndexes: z.boolean().optional(), }, async ({ tableName, includeIndexes }) => { const schema = await db.getTableSchema(tableName); const indexes = includeIndexes ? await db.getIndexes(tableName) : [];
return { content: [{ type: 'text', text: JSON.stringify({ schema, indexes }, null, 2), }], }; });
// Narzędzie wykonywania zapytańserver.tool( 'execute_query', 'Uruchom zapytanie SQL tylko do odczytu', { query: z.string(), limit: z.number().optional().default(100), }, async ({ query, limit }) => { // Waliduj że zapytanie jest tylko do odczytu if (!isReadOnlyQuery(query)) { throw new Error('Dozwolone są tylko zapytania SELECT'); }
const results = await db.query(`${query} LIMIT ${limit}`); return { content: [{ type: 'text', text: formatQueryResults(results), }], }; });
Udostępnij przeglądalne zasoby:
// Lista dostępnych zasobówserver.listResources(async () => { const documents = await getDocumentList(); return documents.map(doc => ({ uri: `doc:///${doc.id}`, name: doc.title, description: doc.summary, mimeType: 'text/markdown', }));});
// Odczytaj konkretny zasóbserver.readResource(async (uri) => { const docId = uri.replace('doc:///', ''); const content = await getDocumentContent(docId);
return { contents: [{ uri, mimeType: 'text/markdown', text: content, }], };});
Implementuj bezpieczne uwierzytelnianie:
// Uwierzytelnianie oparte na zmiennych środowiskowychconst API_KEY = process.env.MY_SERVICE_API_KEY;if (!API_KEY) { console.error('Wymagana zmienna środowiskowa MY_SERVICE_API_KEY'); process.exit(1);}
// OAuth dla serwerów zdalnychserver.tool( 'fetch_user_data', 'Pobierz informacje o aktualnym użytkowniku', {}, async (params, { authToken }) => { const response = await fetch('https://api.service.com/user', { headers: { 'Authorization': `Bearer ${authToken}`, }, });
const data = await response.json(); return { content: [{ type: 'text', text: JSON.stringify(data) }], }; });
Implementuj solidną obsługę błędów:
server.tool( 'process_data', 'Przetwórz dane z kompleksową obsługą błędów', { data: z.string() }, async ({ data }) => { try { const result = await processData(data); return { content: [{ type: 'text', text: result }], }; } catch (error) { if (error instanceof ValidationError) { return { content: [{ type: 'text', text: `Błąd walidacji: ${error.message}`, }], isError: true, }; } else if (error instanceof NetworkError) { return { content: [{ type: 'text', text: `Błąd sieci: Sprawdź swoje połączenie`, }], isError: true, }; } else { // Loguj nieoczekiwane błędy console.error('Nieoczekiwany błąd:', error); return { content: [{ type: 'text', text: 'Wystąpił nieoczekiwany błąd', }], isError: true, }; } } });
Połącz się z wewnętrznymi usługami swojej firmy:
import { Server } from '@modelcontextprotocol/sdk/server/index.js';import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';import { z } from 'zod';import fetch from 'node-fetch';
const server = new Server({ name: 'company-api-mcp', version: '1.0.0',});
const API_BASE = process.env.INTERNAL_API_URL || 'https://api.company.internal';const API_KEY = process.env.INTERNAL_API_KEY;
// Przeszukuj wewnętrzną bazę wiedzyserver.tool( 'search_knowledge_base', 'Przeszukaj firmową bazę wiedzy i dokumentację', { query: z.string(), limit: z.number().optional().default(10), category: z.enum(['docs', 'wiki', 'tickets', 'all']).optional(), }, async ({ query, limit, category = 'all' }) => { const response = await fetch(`${API_BASE}/search`, { method: 'POST', headers: { 'Authorization': `Bearer ${API_KEY}`, 'Content-Type': 'application/json', }, body: JSON.stringify({ query, limit, category }), });
const results = await response.json();
return { content: [{ type: 'text', text: formatSearchResults(results), }], }; });
// Pobierz status usługserver.tool( 'get_service_status', 'Sprawdź status usług wewnętrznych', { service: z.string().optional(), }, async ({ service }) => { const endpoint = service ? `${API_BASE}/status/${service}` : `${API_BASE}/status`;
const response = await fetch(endpoint, { headers: { 'Authorization': `Bearer ${API_KEY}` }, });
const status = await response.json();
return { content: [{ type: 'text', text: formatStatusReport(status), }], }; });
const transport = new StdioServerTransport();await server.connect(transport);
Zmodernizuj dostęp do systemów dziedzictwa:
#!/usr/bin/env python3import asyncioimport subprocessfrom mcp.server import Server, StdioServerTransportfrom mcp.types import Tool, TextContent
class LegacySystemMCP: def __init__(self): self.server = Server("legacy-system-mcp") self.setup_tools()
def setup_tools(self): @self.server.list_tools() async def list_tools() -> list[Tool]: return [ Tool( name="query_mainframe", description="Zapytaj system mainframe dziedzictwa", inputSchema={ "type": "object", "properties": { "command": {"type": "string"}, "parameters": {"type": "array", "items": {"type": "string"}} }, "required": ["command"] } ), Tool( name="convert_ebcdic", description="Konwertuj dane EBCDIC do UTF-8", inputSchema={ "type": "object", "properties": { "data": {"type": "string"} }, "required": ["data"] } ) ]
@self.server.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: if name == "query_mainframe": return await self.query_mainframe( arguments["command"], arguments.get("parameters", []) ) elif name == "convert_ebcdic": return await self.convert_ebcdic(arguments["data"])
async def query_mainframe(self, command: str, parameters: list) -> list[TextContent]: # Wrapper narzędzia CLI dziedzictwa cmd = ["legacy-cli", command] + parameters
try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=30 )
if result.returncode != 0: return [TextContent( type="text", text=f"Błąd: {result.stderr}" )]
return [TextContent( type="text", text=result.stdout )] except subprocess.TimeoutExpired: return [TextContent( type="text", text="Zapytanie przekroczyło limit czasu po 30 sekundach" )]
async def convert_ebcdic(self, data: str) -> list[TextContent]: # Konwertuj dane zakodowane EBCDIC try: converted = data.encode('cp500').decode('utf-8') return [TextContent( type="text", text=converted )] except Exception as e: return [TextContent( type="text", text=f"Błąd konwersji: {str(e)}" )]
async def run(self): async with StdioServerTransport() as transport: await self.server.run(transport)
if __name__ == "__main__": mcp = LegacySystemMCP() asyncio.run(mcp.run())
Testuj poszczególne narzędzia:
import { describe, it, expect } from 'vitest';import { MyMCPServer } from '../src/server';
describe('Narzędzia serwera MCP', () => { const server = new MyMCPServer();
it('powinien zwrócić powitanie w prawidłowym języku', async () => { const result = await server.callTool('get_greeting', { name: 'Alice', language: 'es', });
expect(result.content[0].text).toBe('¡Hola, Alice!'); });
it('powinien gracefully obsłużyć brakujący język', async () => { const result = await server.callTool('get_greeting', { name: 'Bob', language: 'unknown', });
expect(result.content[0].text).toBe('Hello, Bob!'); });});
Testuj z rzeczywistym klientem MCP:
#!/bin/bash# Uruchom serwer w tle./my-mcp-server &SERVER_PID=$!
# Testuj z Claude Codeecho "Testowanie narzędzia powitania..." | claude --mcp-server="test:./my-mcp-server"
# Cleanupkill $SERVER_PID
Włącz logowanie debug:
const DEBUG = process.env.DEBUG === 'true';
function log(...args: any[]) { if (DEBUG) { console.error('[MCP Debug]', ...args); }}
server.tool( 'complex_operation', 'Wykonuje złożoną operację z debugowaniem', { input: z.string() }, async ({ input }) => { log('Rozpoczynanie złożonej operacji z inputem:', input);
try { const step1 = await processStep1(input); log('Krok 1 ukończony:', step1);
const step2 = await processStep2(step1); log('Krok 2 ukończony:', step2);
const result = await finalStep(step2); log('Operacja ukończona:', result);
return { content: [{ type: 'text', text: result }], }; } catch (error) { log('Błąd w złożonej operacji:', error); throw error; } });
Pakuj dla łatwej instalacji:
{ "name": "@mycompany/mcp-server", "version": "1.0.0", "bin": { "mycompany-mcp": "./dist/index.js" }, "scripts": { "build": "tsc", "prepublishOnly": "npm run build" }}
Instaluj globalnie:
npm install -g @mycompany/mcp-serverclaude mcp add mycompany -- mycompany-mcp
FROM node:20-slimWORKDIR /appCOPY package*.json ./RUN npm ci --only=productionCOPY . .EXPOSE 8080CMD ["node", "server.js"]
Uruchom kontener:
docker run -p 8080:8080 mycompany/mcp-serverclaude mcp add mycompany --transport sse http://localhost:8080/sse
Wdrażaj jako usługę HTTP/SSE:
import express from 'express';import { createSSEServer } from '@modelcontextprotocol/sdk/server/sse.js';
const app = express();const mcp = createSSEServer(server, { endpoint: '/sse',});
// Dodaj middleware uwierzytelnianiaapp.use('/sse', authenticateRequest);
// Zamontuj punkt końcowy MCP SSEapp.use(mcp);
// Health checkapp.get('/health', (req, res) => { res.json({ status: 'healthy' });});
app.listen(process.env.PORT || 3000);
Walidacja inputów
Zawsze waliduj i sanityzuj inputy. Nigdy nie ufaj danym z klienta AI bez weryfikacji.
Najmniejsze uprawnienia
Uruchamiaj serwery z minimalnymi uprawnieniami. Używaj dostępu tylko do odczytu bazy danych gdzie to możliwe.
Zarządzanie sekretami
Nigdy nie hardkoduj poświadczeń. Używaj zmiennych środowiskowych lub bezpiecznych magazynów dla wrażliwych danych.
Logowanie audytu
Loguj wszystkie operacje dla audytu bezpieczeństwa. Uwzględnij znaczniki czasu i parametry (wyłączając wrażliwe dane).
Implementuj timeouty
const TIMEOUT_MS = 30000;
server.tool('long_operation', params, async (input) => { const timeout = setTimeout(() => { throw new Error('Operacja przekroczyła limit czasu'); }, TIMEOUT_MS);
try { const result = await performOperation(input); clearTimeout(timeout); return result; } catch (error) { clearTimeout(timeout); throw error; }});
Buforuj kosztowne operacje
const cache = new Map();const CACHE_TTL = 300000; // 5 minut
server.tool('expensive_query', params, async (input) => { const cacheKey = JSON.stringify(input); const cached = cache.get(cacheKey);
if (cached && cached.expires > Date.now()) { return cached.result; }
const result = await performExpensiveQuery(input); cache.set(cacheKey, { result, expires: Date.now() + CACHE_TTL, });
return result;});
Streamuj duże odpowiedzi
server.tool('large_data_export', params, async function* (input) { const totalRows = await getTotalRows(input); const batchSize = 1000;
for (let offset = 0; offset < totalRows; offset += batchSize) { const batch = await fetchBatch(input, offset, batchSize); yield { content: [{ type: 'text', text: JSON.stringify(batch), }], }; }});
Zawsze dołączaj kompleksową dokumentację:
/** * Mój niestandardowy serwer MCP * * Zapewnia integrację z systemami wewnętrznymi dla programowania wspomaganego przez AI. * * ## Dostępne narzędzia * * - `search_knowledge_base`: Przeszukuj wewnętrzną dokumentację * - `get_service_status`: Sprawdź zdrowie usług * - `query_metrics`: Pobierz metryki systemu * * ## Konfiguracja * * Wymagane zmienne środowiskowe: * - `INTERNAL_API_KEY`: Klucz API dla usług wewnętrznych * - `INTERNAL_API_URL`: Bazowy URL dla wewnętrznego API (domyślnie: https://api.internal) * * ## Instalacja * * ```bash * npm install -g @company/mcp-server * claude mcp add company -- company-mcp * ``` */
Problem | Rozwiązanie |
---|---|
Serwer nie uruchamia się | Sprawdź linię shebang, uprawnienia plików i zależności |
”Brak dostępnych narzędzi” | Sprawdź rejestrację narzędzi i inicjalizację serwera |
Błędy uwierzytelniania | Potwierdź zmienne środowiskowe i ważność tokenów |
Błędy timeout | Zwiększ limity timeout lub zoptymalizuj powolne operacje |
Problemy z pamięcią | Implementuj streaming dla dużych zbiorów danych |
Dodaj kompleksowe debugowanie:
// Włącz z zmienną środowiskową DEBUG=trueif (process.env.DEBUG) { server.on('tool_call', ({ tool, params }) => { console.error(`[MCP] Wywołano narzędzie: ${tool}`, params); });
server.on('error', (error) => { console.error('[MCP] Błąd serwera:', error); });
process.on('uncaughtException', (error) => { console.error('[MCP] Nieprzechwycony wyjątek:', error); process.exit(1); });}