Advanced Usage Patterns
Master advanced OmniMCP patterns for building robust, scalable, and production-ready MCP integrations.
Custom Transport Implementation
Create custom transports for specialized communication needs:
import { Transport, TransportEvents } from '@omnimcp/core';
import WebSocket from 'ws';
class WebSocketTransport implements Transport {
private ws?: WebSocket;
private handlers = new Map<string, Function>();
async connect(options: { url: string }): Promise<void> {
return new Promise((resolve, reject) => {
this.ws = new WebSocket(options.url);
this.ws.on('open', () => {
this.emit('connected');
resolve();
});
this.ws.on('message', (data) => {
const message = JSON.parse(data.toString());
this.emit('message', message);
});
this.ws.on('error', (error) => {
this.emit('error', error);
reject(error);
});
this.ws.on('close', () => {
this.emit('disconnected');
});
});
}
async send(message: any): Promise<void> {
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
throw new Error('WebSocket not connected');
}
this.ws.send(JSON.stringify(message));
}
async disconnect(): Promise<void> {
this.ws?.close();
}
on(event: string, handler: Function): void {
this.handlers.set(event, handler);
}
private emit(event: string, ...args: any[]): void {
this.handlers.get(event)?.(...args);
}
}Middleware System
Implement middleware for request/response processing:
type Middleware = (context: MiddlewareContext, next: () => Promise<void>) => Promise<void>;
interface MiddlewareContext {
request?: any;
response?: any;
error?: Error;
metadata: Record<string, any>;
}
class MCPClientWithMiddleware extends MCPClient {
private middlewares: Middleware[] = [];
use(middleware: Middleware): void {
this.middlewares.push(middleware);
}
protected async executeWithMiddleware(
operation: () => Promise<any>
): Promise<any> {
const context: MiddlewareContext = { metadata: {} };
const runMiddlewares = async (index: number): Promise<void> => {
if (index >= this.middlewares.length) {
try {
context.response = await operation();
} catch (error) {
context.error = error as Error;
}
return;
}
await this.middlewares[index](context, () => runMiddlewares(index + 1));
};
await runMiddlewares(0);
if (context.error) {
throw context.error;
}
return context.response;
}
}
// Usage example
const client = new MCPClientWithMiddleware('app', '1.0.0');
// Logging middleware
client.use(async (ctx, next) => {
console.log('Request:', ctx.request);
const start = Date.now();
await next();
console.log(`Response time: ${Date.now() - start}ms`);
});
// Authentication middleware
client.use(async (ctx, next) => {
ctx.metadata.auth = await getAuthToken();
await next();
});Event Sourcing Pattern
Track all MCP interactions for audit and replay:
interface MCPEvent {
id: string;
timestamp: Date;
type: 'tool_call' | 'resource_access' | 'error';
data: any;
metadata: Record<string, any>;
}
class EventSourcedMCPClient extends MCPClient {
private events: MCPEvent[] = [];
private eventStore?: EventStore;
constructor(name: string, version: string, eventStore?: EventStore) {
super(name, version);
this.eventStore = eventStore;
}
async tools.call(request: CallToolRequest): Promise<any> {
const event: MCPEvent = {
id: crypto.randomUUID(),
timestamp: new Date(),
type: 'tool_call',
data: request,
metadata: { user: getCurrentUser() }
};
try {
const result = await super.tools.call(request);
event.data.result = result;
event.data.success = true;
} catch (error) {
event.data.error = error;
event.data.success = false;
throw error;
} finally {
await this.recordEvent(event);
}
}
private async recordEvent(event: MCPEvent): Promise<void> {
this.events.push(event);
await this.eventStore?.save(event);
this.emit('event', event);
}
async replay(filter?: (event: MCPEvent) => boolean): Promise<void> {
const eventsToReplay = filter ? this.events.filter(filter) : this.events;
for (const event of eventsToReplay) {
if (event.type === 'tool_call' && event.data.success) {
await super.tools.call(event.data);
}
}
}
}Resource Pooling and Caching
Efficiently manage resources with pooling and caching:
class CachedMCPClient extends MCPClient {
private resourceCache = new Map<string, { data: any; expires: number }>();
private pendingRequests = new Map<string, Promise<any>>();
async resources.read(uri: string): Promise<any> {
// Check cache
const cached = this.resourceCache.get(uri);
if (cached && cached.expires > Date.now()) {
return cached.data;
}
// Check if request is already pending
const pending = this.pendingRequests.get(uri);
if (pending) {
return pending;
}
// Make request
const request = super.resources.read(uri);
this.pendingRequests.set(uri, request);
try {
const data = await request;
// Cache result
this.resourceCache.set(uri, {
data,
expires: Date.now() + 60000 // 1 minute
});
return data;
} finally {
this.pendingRequests.delete(uri);
}
}
invalidateCache(pattern?: string): void {
if (pattern) {
// Invalidate matching URIs
for (const [uri] of this.resourceCache) {
if (uri.includes(pattern)) {
this.resourceCache.delete(uri);
}
}
} else {
// Clear all cache
this.resourceCache.clear();
}
}
}Distributed Tracing
Implement distributed tracing for debugging complex flows:
import { Tracer, Span } from '@opentelemetry/api';
class TracedMCPClient extends MCPClient {
constructor(
name: string,
version: string,
private tracer: Tracer
) {
super(name, version);
}
async tools.call(request: CallToolRequest): Promise<any> {
const span = this.tracer.startSpan('mcp.tool.call', {
attributes: {
'tool.name': request.name,
'tool.arguments': JSON.stringify(request.arguments)
}
});
try {
const result = await super.tools.call(request);
span.setStatus({ code: SpanStatusCode.OK });
return result;
} catch (error) {
span.setStatus({
code: SpanStatusCode.ERROR,
message: error.message
});
span.recordException(error);
throw error;
} finally {
span.end();
}
}
}
// Usage with Jaeger
import { NodeTracerProvider } from '@opentelemetry/sdk-trace-node';
import { JaegerExporter } from '@opentelemetry/exporter-jaeger';
const provider = new NodeTracerProvider();
const exporter = new JaegerExporter({
endpoint: 'http://localhost:14268/api/traces',
});
provider.addSpanProcessor(new BatchSpanProcessor(exporter));
provider.register();
const tracer = provider.getTracer('mcp-client');
const client = new TracedMCPClient('app', '1.0.0', tracer);Load Balancing
Distribute requests across multiple MCP servers:
class LoadBalancedMCPClient {
private clients: MCPClient[] = [];
private currentIndex = 0;
async addServer(config: TransportConfig): Promise<void> {
const client = new MCPClient(`lb-client-${this.clients.length}`, '1.0.0');
await client.connect(config);
this.clients.push(client);
}
private getNextClient(): MCPClient {
// Round-robin selection
const client = this.clients[this.currentIndex];
this.currentIndex = (this.currentIndex + 1) % this.clients.length;
return client;
}
async callTool(request: CallToolRequest): Promise<any> {
const client = this.getNextClient();
try {
return await client.tools.call(request);
} catch (error) {
// Try another server on failure
if (this.clients.length > 1) {
const fallbackClient = this.getNextClient();
return await fallbackClient.tools.call(request);
}
throw error;
}
}
}
// Health checking
class HealthCheckedLoadBalancer extends LoadBalancedMCPClient {
private healthStatus = new Map<MCPClient, boolean>();
async startHealthChecks(): Promise<void> {
setInterval(async () => {
for (const client of this.clients) {
try {
await client.ping();
this.healthStatus.set(client, true);
} catch {
this.healthStatus.set(client, false);
}
}
}, 30000); // Check every 30 seconds
}
protected getNextClient(): MCPClient {
// Only return healthy clients
const healthyClients = this.clients.filter(
client => this.healthStatus.get(client) !== false
);
if (healthyClients.length === 0) {
throw new Error('No healthy servers available');
}
return healthyClients[Math.floor(Math.random() * healthyClients.length)];
}
}Testing Strategies
Mock MCP Server
import { MCPServer } from '@omnimcp/test-utils';
describe('MCP Integration Tests', () => {
let mockServer: MCPServer;
let client: MCPClient;
beforeEach(async () => {
// Create mock server
mockServer = new MCPServer();
// Define mock tools
mockServer.addTool({
name: 'get_user',
description: 'Get user by ID',
inputSchema: {
type: 'object',
properties: {
id: { type: 'string' }
},
required: ['id']
},
handler: async ({ id }) => ({
id,
name: 'Test User',
email: 'test@example.com'
})
});
// Start server
await mockServer.start();
// Connect client
client = new MCPClient('test', '1.0.0');
await client.connect(mockServer.getConfig());
});
afterEach(async () => {
await client.disconnect();
await mockServer.stop();
});
test('should call mock tool', async () => {
const result = await client.tools.call({
name: 'get_user',
arguments: { id: '123' }
});
expect(result).toEqual({
id: '123',
name: 'Test User',
email: 'test@example.com'
});
});
});Integration Testing
import { TestHarness } from '@omnimcp/test-utils';
describe('E2E MCP Tests', () => {
const harness = new TestHarness();
beforeAll(async () => {
await harness.setup({
servers: [
{ name: 'auth-server', command: 'auth-mcp-server' },
{ name: 'data-server', command: 'data-mcp-server' }
]
});
});
afterAll(async () => {
await harness.teardown();
});
test('cross-server communication', async () => {
const authClient = harness.getClient('auth-server');
const dataClient = harness.getClient('data-server');
// Get auth token
const { token } = await authClient.tools.call({
name: 'login',
arguments: { username: 'test', password: 'test' }
});
// Use token for data access
const data = await dataClient.tools.call({
name: 'get_protected_data',
arguments: { token }
});
expect(data).toBeDefined();
});
});Production Monitoring
import { StatsD } from 'node-statsd';
class MonitoredMCPClient extends MCPClient {
private metrics: StatsD;
constructor(name: string, version: string) {
super(name, version);
this.metrics = new StatsD({
host: 'localhost',
port: 8125,
prefix: 'mcp.'
});
}
async tools.call(request: CallToolRequest): Promise<any> {
const startTime = Date.now();
const tags = [`tool:${request.name}`];
this.metrics.increment('tool.calls', 1, tags);
try {
const result = await super.tools.call(request);
this.metrics.timing('tool.duration', Date.now() - startTime, tags);
this.metrics.increment('tool.success', 1, tags);
return result;
} catch (error) {
this.metrics.increment('tool.errors', 1, [...tags, `error:${error.errorCode}`]);
throw error;
}
}
}
// Prometheus metrics
import { register, Counter, Histogram } from 'prom-client';
const toolCallsTotal = new Counter({
name: 'mcp_tool_calls_total',
help: 'Total number of tool calls',
labelNames: ['tool', 'status']
});
const toolDuration = new Histogram({
name: 'mcp_tool_duration_seconds',
help: 'Tool call duration in seconds',
labelNames: ['tool']
});
register.registerMetric(toolCallsTotal);
register.registerMetric(toolDuration);Next Steps
- Review complete API reference
- Explore real-world examples
- Join our community discussions