Advanced Usage Patterns
Master advanced OmniMCP patterns for building robust, scalable, and production-ready MCP integrations.
Custom Transport Implementation
Create custom transports for specialized communication needs:
import { Transport, TransportEvents } from '@omnimcp/core'; import WebSocket from 'ws'; class WebSocketTransport implements Transport { private ws?: WebSocket; private handlers = new Map<string, Function>(); async connect(options: { url: string }): Promise<void> { return new Promise((resolve, reject) => { this.ws = new WebSocket(options.url); this.ws.on('open', () => { this.emit('connected'); resolve(); }); this.ws.on('message', (data) => { const message = JSON.parse(data.toString()); this.emit('message', message); }); this.ws.on('error', (error) => { this.emit('error', error); reject(error); }); this.ws.on('close', () => { this.emit('disconnected'); }); }); } async send(message: any): Promise<void> { if (!this.ws || this.ws.readyState !== WebSocket.OPEN) { throw new Error('WebSocket not connected'); } this.ws.send(JSON.stringify(message)); } async disconnect(): Promise<void> { this.ws?.close(); } on(event: string, handler: Function): void { this.handlers.set(event, handler); } private emit(event: string, ...args: any[]): void { this.handlers.get(event)?.(...args); } }
Middleware System
Implement middleware for request/response processing:
type Middleware = (context: MiddlewareContext, next: () => Promise<void>) => Promise<void>; interface MiddlewareContext { request?: any; response?: any; error?: Error; metadata: Record<string, any>; } class MCPClientWithMiddleware extends MCPClient { private middlewares: Middleware[] = []; use(middleware: Middleware): void { this.middlewares.push(middleware); } protected async executeWithMiddleware( operation: () => Promise<any> ): Promise<any> { const context: MiddlewareContext = { metadata: {} }; const runMiddlewares = async (index: number): Promise<void> => { if (index >= this.middlewares.length) { try { context.response = await operation(); } catch (error) { context.error = error as Error; } return; } await this.middlewares[index](context, () => runMiddlewares(index + 1)); }; await runMiddlewares(0); if (context.error) { throw context.error; } return context.response; } } // Usage example const client = new MCPClientWithMiddleware('app', '1.0.0'); // Logging middleware client.use(async (ctx, next) => { console.log('Request:', ctx.request); const start = Date.now(); await next(); console.log(`Response time: ${Date.now() - start}ms`); }); // Authentication middleware client.use(async (ctx, next) => { ctx.metadata.auth = await getAuthToken(); await next(); });
Event Sourcing Pattern
Track all MCP interactions for audit and replay:
interface MCPEvent { id: string; timestamp: Date; type: 'tool_call' | 'resource_access' | 'error'; data: any; metadata: Record<string, any>; } class EventSourcedMCPClient extends MCPClient { private events: MCPEvent[] = []; private eventStore?: EventStore; constructor(name: string, version: string, eventStore?: EventStore) { super(name, version); this.eventStore = eventStore; } async tools.call(request: CallToolRequest): Promise<any> { const event: MCPEvent = { id: crypto.randomUUID(), timestamp: new Date(), type: 'tool_call', data: request, metadata: { user: getCurrentUser() } }; try { const result = await super.tools.call(request); event.data.result = result; event.data.success = true; } catch (error) { event.data.error = error; event.data.success = false; throw error; } finally { await this.recordEvent(event); } } private async recordEvent(event: MCPEvent): Promise<void> { this.events.push(event); await this.eventStore?.save(event); this.emit('event', event); } async replay(filter?: (event: MCPEvent) => boolean): Promise<void> { const eventsToReplay = filter ? this.events.filter(filter) : this.events; for (const event of eventsToReplay) { if (event.type === 'tool_call' && event.data.success) { await super.tools.call(event.data); } } } }
Resource Pooling and Caching
Efficiently manage resources with pooling and caching:
class CachedMCPClient extends MCPClient { private resourceCache = new Map<string, { data: any; expires: number }>(); private pendingRequests = new Map<string, Promise<any>>(); async resources.read(uri: string): Promise<any> { // Check cache const cached = this.resourceCache.get(uri); if (cached && cached.expires > Date.now()) { return cached.data; } // Check if request is already pending const pending = this.pendingRequests.get(uri); if (pending) { return pending; } // Make request const request = super.resources.read(uri); this.pendingRequests.set(uri, request); try { const data = await request; // Cache result this.resourceCache.set(uri, { data, expires: Date.now() + 60000 // 1 minute }); return data; } finally { this.pendingRequests.delete(uri); } } invalidateCache(pattern?: string): void { if (pattern) { // Invalidate matching URIs for (const [uri] of this.resourceCache) { if (uri.includes(pattern)) { this.resourceCache.delete(uri); } } } else { // Clear all cache this.resourceCache.clear(); } } }
Distributed Tracing
Implement distributed tracing for debugging complex flows:
import { Tracer, Span } from '@opentelemetry/api'; class TracedMCPClient extends MCPClient { constructor( name: string, version: string, private tracer: Tracer ) { super(name, version); } async tools.call(request: CallToolRequest): Promise<any> { const span = this.tracer.startSpan('mcp.tool.call', { attributes: { 'tool.name': request.name, 'tool.arguments': JSON.stringify(request.arguments) } }); try { const result = await super.tools.call(request); span.setStatus({ code: SpanStatusCode.OK }); return result; } catch (error) { span.setStatus({ code: SpanStatusCode.ERROR, message: error.message }); span.recordException(error); throw error; } finally { span.end(); } } } // Usage with Jaeger import { NodeTracerProvider } from '@opentelemetry/sdk-trace-node'; import { JaegerExporter } from '@opentelemetry/exporter-jaeger'; const provider = new NodeTracerProvider(); const exporter = new JaegerExporter({ endpoint: 'http://localhost:14268/api/traces', }); provider.addSpanProcessor(new BatchSpanProcessor(exporter)); provider.register(); const tracer = provider.getTracer('mcp-client'); const client = new TracedMCPClient('app', '1.0.0', tracer);
Load Balancing
Distribute requests across multiple MCP servers:
class LoadBalancedMCPClient { private clients: MCPClient[] = []; private currentIndex = 0; async addServer(config: TransportConfig): Promise<void> { const client = new MCPClient(`lb-client-${this.clients.length}`, '1.0.0'); await client.connect(config); this.clients.push(client); } private getNextClient(): MCPClient { // Round-robin selection const client = this.clients[this.currentIndex]; this.currentIndex = (this.currentIndex + 1) % this.clients.length; return client; } async callTool(request: CallToolRequest): Promise<any> { const client = this.getNextClient(); try { return await client.tools.call(request); } catch (error) { // Try another server on failure if (this.clients.length > 1) { const fallbackClient = this.getNextClient(); return await fallbackClient.tools.call(request); } throw error; } } } // Health checking class HealthCheckedLoadBalancer extends LoadBalancedMCPClient { private healthStatus = new Map<MCPClient, boolean>(); async startHealthChecks(): Promise<void> { setInterval(async () => { for (const client of this.clients) { try { await client.ping(); this.healthStatus.set(client, true); } catch { this.healthStatus.set(client, false); } } }, 30000); // Check every 30 seconds } protected getNextClient(): MCPClient { // Only return healthy clients const healthyClients = this.clients.filter( client => this.healthStatus.get(client) !== false ); if (healthyClients.length === 0) { throw new Error('No healthy servers available'); } return healthyClients[Math.floor(Math.random() * healthyClients.length)]; } }
Testing Strategies
Mock MCP Server
import { MCPServer } from '@omnimcp/test-utils'; describe('MCP Integration Tests', () => { let mockServer: MCPServer; let client: MCPClient; beforeEach(async () => { // Create mock server mockServer = new MCPServer(); // Define mock tools mockServer.addTool({ name: 'get_user', description: 'Get user by ID', inputSchema: { type: 'object', properties: { id: { type: 'string' } }, required: ['id'] }, handler: async ({ id }) => ({ id, name: 'Test User', email: 'test@example.com' }) }); // Start server await mockServer.start(); // Connect client client = new MCPClient('test', '1.0.0'); await client.connect(mockServer.getConfig()); }); afterEach(async () => { await client.disconnect(); await mockServer.stop(); }); test('should call mock tool', async () => { const result = await client.tools.call({ name: 'get_user', arguments: { id: '123' } }); expect(result).toEqual({ id: '123', name: 'Test User', email: 'test@example.com' }); }); });
Integration Testing
import { TestHarness } from '@omnimcp/test-utils'; describe('E2E MCP Tests', () => { const harness = new TestHarness(); beforeAll(async () => { await harness.setup({ servers: [ { name: 'auth-server', command: 'auth-mcp-server' }, { name: 'data-server', command: 'data-mcp-server' } ] }); }); afterAll(async () => { await harness.teardown(); }); test('cross-server communication', async () => { const authClient = harness.getClient('auth-server'); const dataClient = harness.getClient('data-server'); // Get auth token const { token } = await authClient.tools.call({ name: 'login', arguments: { username: 'test', password: 'test' } }); // Use token for data access const data = await dataClient.tools.call({ name: 'get_protected_data', arguments: { token } }); expect(data).toBeDefined(); }); });
Production Monitoring
import { StatsD } from 'node-statsd'; class MonitoredMCPClient extends MCPClient { private metrics: StatsD; constructor(name: string, version: string) { super(name, version); this.metrics = new StatsD({ host: 'localhost', port: 8125, prefix: 'mcp.' }); } async tools.call(request: CallToolRequest): Promise<any> { const startTime = Date.now(); const tags = [`tool:${request.name}`]; this.metrics.increment('tool.calls', 1, tags); try { const result = await super.tools.call(request); this.metrics.timing('tool.duration', Date.now() - startTime, tags); this.metrics.increment('tool.success', 1, tags); return result; } catch (error) { this.metrics.increment('tool.errors', 1, [...tags, `error:${error.errorCode}`]); throw error; } } } // Prometheus metrics import { register, Counter, Histogram } from 'prom-client'; const toolCallsTotal = new Counter({ name: 'mcp_tool_calls_total', help: 'Total number of tool calls', labelNames: ['tool', 'status'] }); const toolDuration = new Histogram({ name: 'mcp_tool_duration_seconds', help: 'Tool call duration in seconds', labelNames: ['tool'] }); register.registerMetric(toolCallsTotal); register.registerMetric(toolDuration);
Next Steps
- Review complete API reference
- Explore real-world examples
- Join our community discussions