Skip to main content

WebSocket overview

Opinix Trade uses WebSockets for real-time bidirectional communication between the server and clients. The WebSocket server broadcasts order book updates, trade events, and ticker data.
The WebSocket server runs as a separate service on a different port from the main API, allowing it to scale independently.

Connection flow

1

Client connects

Client establishes WebSocket connection to ws://localhost:{PORT}
2

User registered

UserManager assigns a unique ID and tracks the connection
3

Subscribe to channels

Client sends subscribe messages for specific event channels
4

Receive updates

Server broadcasts updates to subscribed channels

Server initialization

The WebSocket server uses the ws library:
// From services/wss/src/index.ts:1
import { WebSocketServer } from "ws";
import { config } from "dotenv";
import { UserManager } from "./classes/UserManager";

config();

const port = process.env.PORT as unknown as number;
const wss = new WebSocketServer({ port: port });

wss.on("listening", () => {
  console.log(`WebSocket server is running on port ws://localhost:${wss.options.port}`);
});

wss.on("connection", (ws) => {
  UserManager.getInstance().addUser(ws);
});

Message types

All WebSocket messages follow a consistent structure with a stream and data field.

Depth update message

Sent when the order book changes:
// From packages/types/src/index.ts:184
export type DepthUpdateMessage = {
  stream: string;          // e.g., "depth@bitcoin-event"
  data: {
    b?: [string, string][];  // Bids [[price, quantity], ...]
    a?: [string, string][];  // Asks [[price, quantity], ...]
    e: "depth";              // Event type
  };
};
Example:
{
  "stream": "depth@bitcoin-event",
  "data": {
    "e": "depth",
    "b": [["5.5", "100"], ["5.0", "250"]],
    "a": [["6.0", "150"], ["6.5", "200"]]
  }
}

Trade added message

Sent when a trade executes:
// From packages/types/src/index.ts:193
export type TradeAddedMessage = {
  stream: string;       // e.g., "trade@bitcoin-event"
  data: {
    e: "trade";        // Event type
    t: string;         // Trade ID
    m: boolean;        // Is buyer maker?
    p: number;         // Price
    q: string;         // Quantity
    s: string;         // Symbol/market
  };
};
Example:
{
  "stream": "trade@bitcoin-event",
  "data": {
    "e": "trade",
    "t": "550e8400-e29b-41d4-a716-446655440000",
    "m": true,
    "p": 5.5,
    "q": "50",
    "s": "bitcoin-event"
  }
}

Ticker update message

Sent periodically with market statistics:
// From packages/types/src/index.ts:170
export type TickerUpdateMessage = {
  stream: string;
  data: {
    c?: string;      // Close price
    h?: string;      // High price
    l?: string;      // Low price
    v?: string;      // Volume
    V?: string;      // Quote volume
    s?: string;      // Symbol
    id: number;      // Event ID
    e: "ticker";     // Event type
  };
};
All message types are defined in @opinix/types and shared between the engine, WebSocket server, and clients.

Subscription management

The SubscriptionManager handles efficient subscription tracking using two maps.

Data structures

// From services/wss/src/classes/SubscriptionManager.ts:20
export class SubscriptionManager {
  private subscriptions: Map<string, string[]> = new Map();
  private reverseSubscriptions: Map<string, string[]> = new Map();
  private redisClient: RedisClientType;
}
subscriptions: Maps user ID → list of channels
reverseSubscriptions: Maps channel → list of user IDs
When a message arrives on a channel, we need to find all users subscribed to that channel. Without reverse subscriptions, we’d have to iterate through all users checking if they’re subscribed.With reverse subscriptions, it’s O(1) lookup:
this.reverseSubscriptions.get(channel)?.forEach(userId => {
  UserManager.getInstance().getUser(userId)?.emitMessage(message);
});

Subscribe flow

// From services/wss/src/classes/SubscriptionManager.ts:37
subscribe(userId: string, subscription: string) {
  // Check if already subscribed
  if (this.subscriptions.get(userId)?.includes(subscription)) {
    return;
  }
  
  // Add to user's subscription list
  const newSubscription = (this.subscriptions.get(userId) || []).concat(subscription);
  this.subscriptions.set(userId, newSubscription);

  // Add to reverse subscription map
  const newRevSubscription = (this.reverseSubscriptions.get(subscription) || []).concat(userId)
  this.reverseSubscriptions.set(subscription, newRevSubscription)

  // Subscribe to Redis channel if this is the first subscriber
  if (this.reverseSubscriptions.get(subscription)?.length === 1) {
    this.redisClient.subscribe(subscription, this.redisCallbackHandler);
  }
}
The WebSocket server only subscribes to a Redis channel when the first user subscribes. This prevents unnecessary Redis subscriptions for inactive markets.

Unsubscribe flow

// From services/wss/src/classes/SubscriptionManager.ts:59
unsubscribe(userId: string, subscription: string) {
  const subscriptions = this.subscriptions.get(userId);

  if (subscriptions) {
    // Remove from user's subscription list
    this.subscriptions.set(userId,
      subscriptions.filter(s => s !== subscription)
    );
  }

  const reverseSubscriptions = this.reverseSubscriptions.get(subscription);
  if (reverseSubscriptions) {
    this.reverseSubscriptions.set(
      subscription, 
      reverseSubscriptions.filter(s => s !== userId)
    );
    
    // Unsubscribe from Redis if no more users are subscribed
    if (this.reverseSubscriptions.get(subscription)?.length === 0) {
      this.reverseSubscriptions.delete(subscription);
      this.redisClient.unsubscribe(subscription);
    }
  }
}

Broadcasting messages

When a message arrives from Redis:
// From services/wss/src/classes/SubscriptionManager.ts:54
private redisCallbackHandler = (message: string, channel: string) => {
  const parsedMessage = JSON.parse(message);
  this.reverseSubscriptions.get(channel)?.forEach(s => 
    UserManager.getInstance().getUser(s)?.emitMessage(parsedMessage)
  );
}

Publishing updates from engine

The engine publishes messages to Redis after order execution:

Publishing depth updates

// From services/engine/src/trade/Engine.ts:349
publisWsDepthUpdates(
  fills: Fill[],
  price: number,
  side: "yes" | "no",
  market: string
) {
  const orderbook = this.orderbooks.find((o) => o.market === market);
  if (!orderbook) {
    return;
  }
  
  const depth = orderbook.getMarketDepth();
  
  if (side === "yes") {
    const updatedAsks = depth?.asks.filter((x) => fills.map((f) => f.price));
    const updatedBid = depth?.bids.find((x) => x[0] === price.toString());
    
    RedisManager.getInstance().publishMessage(`depth@${market}`, {
      stream: `depth@${market}`,
      data: {
        a: updatedAsks,
        b: updatedBid ? [updatedBid] : [],
        e: "depth",
      },
    });
  }
  
  if (side === "no") {
    const updatedBids = depth?.bids.filter((x) => fills.map((f) => f.price));
    const updatedAsk = depth?.asks.find((x) => x[0] === price.toString());
    
    RedisManager.getInstance().publishMessage(`depth@${market}`, {
      stream: `depth@${market}`,
      data: {
        a: updatedAsk ? [updatedAsk] : [],
        b: updatedBids,
        e: "depth",
      },
    });
  }
}

Publishing trade updates

// From services/engine/src/trade/Engine.ts:388
publishWsTrades(fills: Fill[], userId: string, market: string) {
  fills.forEach((fill) => {
    RedisManager.getInstance().publishMessage(`trade@${market}`, {
      stream: `trade@${market}`,
      data: {
        e: "trade",
        t: fill.tradeId,
        m: fill.otherUserId === userId,
        p: fill.price,
        q: fill.qty.toString(),
        s: market,
      },
    });
  });
}

Channel naming conventions

depth@{market-slug}

Examples:
- depth@bitcoin-event
- depth@eth-price-prediction
The market slug comes from the event’s URL-friendly identifier stored in the database.

Client-side integration

Clients should:
  1. Connect to WebSocket server
    const ws = new WebSocket('ws://localhost:8080');
    
  2. Subscribe to channels
    ws.send(JSON.stringify({
      type: 'subscribe',
      channel: 'depth@bitcoin-event'
    }));
    
  3. Handle incoming messages
    ws.onmessage = (event) => {
      const message = JSON.parse(event.data);
      
      if (message.data.e === 'depth') {
        updateOrderBook(message.data.b, message.data.a);
      } else if (message.data.e === 'trade') {
        addTradeToHistory(message.data);
      }
    };
    
  4. Unsubscribe when leaving
    ws.send(JSON.stringify({
      type: 'unsubscribe',
      channel: 'depth@bitcoin-event'
    }));
    

Error handling

Always handle WebSocket disconnections and implement reconnection logic:
let reconnectAttempts = 0;
const maxReconnectAttempts = 5;

function connectWebSocket() {
  const ws = new WebSocket('ws://localhost:8080');
  
  ws.onclose = () => {
    if (reconnectAttempts < maxReconnectAttempts) {
      reconnectAttempts++;
      setTimeout(connectWebSocket, 1000 * reconnectAttempts);
    }
  };
  
  ws.onerror = (error) => {
    console.error('WebSocket error:', error);
  };
  
  return ws;
}

Performance optimization

Message batching

For high-frequency updates, consider batching messages:
const messageQueue: WsMessage[] = [];
const BATCH_INTERVAL = 100; // ms

setInterval(() => {
  if (messageQueue.length > 0) {
    broadcastBatch(messageQueue);
    messageQueue.length = 0;
  }
}, BATCH_INTERVAL);

Throttling updates

Limit how often depth updates are sent:
const lastDepthUpdate = new Map<string, number>();
const THROTTLE_MS = 50;

function shouldSendDepthUpdate(market: string): boolean {
  const last = lastDepthUpdate.get(market) || 0;
  const now = Date.now();
  
  if (now - last >= THROTTLE_MS) {
    lastDepthUpdate.set(market, now);
    return true;
  }
  
  return false;
}
Throttling depth updates prevents overwhelming clients with too many messages during high-volume trading.