Opinix Trade uses WebSockets for real-time bidirectional communication between the server and clients. The WebSocket server broadcasts order book updates, trade events, and ticker data.
The WebSocket server runs as a separate service on a different port from the main API, allowing it to scale independently.
// From services/wss/src/index.ts:1import { WebSocketServer } from "ws";import { config } from "dotenv";import { UserManager } from "./classes/UserManager";config();const port = process.env.PORT as unknown as number;const wss = new WebSocketServer({ port: port });wss.on("listening", () => { console.log(`WebSocket server is running on port ws://localhost:${wss.options.port}`);});wss.on("connection", (ws) => { UserManager.getInstance().addUser(ws);});
// From services/wss/src/classes/SubscriptionManager.ts:20export class SubscriptionManager { private subscriptions: Map<string, string[]> = new Map(); private reverseSubscriptions: Map<string, string[]> = new Map(); private redisClient: RedisClientType;}
subscriptions: Maps user ID → list of channels reverseSubscriptions: Maps channel → list of user IDs
Why use reverse subscriptions?
When a message arrives on a channel, we need to find all users subscribed to that channel. Without reverse subscriptions, we’d have to iterate through all users checking if they’re subscribed.With reverse subscriptions, it’s O(1) lookup:
// From services/wss/src/classes/SubscriptionManager.ts:37subscribe(userId: string, subscription: string) { // Check if already subscribed if (this.subscriptions.get(userId)?.includes(subscription)) { return; } // Add to user's subscription list const newSubscription = (this.subscriptions.get(userId) || []).concat(subscription); this.subscriptions.set(userId, newSubscription); // Add to reverse subscription map const newRevSubscription = (this.reverseSubscriptions.get(subscription) || []).concat(userId) this.reverseSubscriptions.set(subscription, newRevSubscription) // Subscribe to Redis channel if this is the first subscriber if (this.reverseSubscriptions.get(subscription)?.length === 1) { this.redisClient.subscribe(subscription, this.redisCallbackHandler); }}
The WebSocket server only subscribes to a Redis channel when the first user subscribes. This prevents unnecessary Redis subscriptions for inactive markets.
// From services/wss/src/classes/SubscriptionManager.ts:59unsubscribe(userId: string, subscription: string) { const subscriptions = this.subscriptions.get(userId); if (subscriptions) { // Remove from user's subscription list this.subscriptions.set(userId, subscriptions.filter(s => s !== subscription) ); } const reverseSubscriptions = this.reverseSubscriptions.get(subscription); if (reverseSubscriptions) { this.reverseSubscriptions.set( subscription, reverseSubscriptions.filter(s => s !== userId) ); // Unsubscribe from Redis if no more users are subscribed if (this.reverseSubscriptions.get(subscription)?.length === 0) { this.reverseSubscriptions.delete(subscription); this.redisClient.unsubscribe(subscription); } }}