Skip to main content

Queue overview

Opinix Trade uses Redis as a message queue to decouple order submission from order execution. This enables:
  • Fast API responses: Orders are queued immediately and processed asynchronously
  • Horizontal scaling: Multiple workers can process orders in parallel
  • Reliability: Orders are persisted in Redis and won’t be lost on crashes
  • Load balancing: Distribute work across multiple engine instances
The queue system is implemented in the @repo/order-queue package and used by both the backend API and the matching engine worker.

Queue architecture

1

API receives order

User submits order via POST /order/initiate
2

Order enqueued

Backend pushes order to ORDER_QUEUE in Redis using lPush
3

API responds

Backend immediately returns success response to client
4

Worker polls queue

Engine worker continuously polls queue using lPop
5

Order processed

Worker executes matching logic and updates state
6

Results published

Worker publishes results to Redis Pub/Sub for WebSocket distribution

Redis manager

The RedisManager provides a singleton interface for all Redis operations:
// From packages/order-queue/src/classes/RedisManager.ts:4
export class RedisManager {
  private client: RedisClientType;
  private static instance: RedisManager;

  constructor() {
    this.client = createClient();
    console.log("Redis connected log inside Redis Manager")
    this.client.connect();
  }

  public getClient(): RedisClientType {
    return this.client;
  }

  public static getInstance() {
    if (!this.instance) {
      this.instance = new RedisManager();
    }
    return this.instance;
  }

  public pushMessage(message: DbMessage) {
    this.client.lPush("db_processor", JSON.stringify(message));
  }

  public publishMessage(channel: string, message: WsMessage) {
    this.client.publish(channel, JSON.stringify(message));
  }

  public sendToApi(clientId: string, message: MessageToApi) {
    this.client.publish(clientId, JSON.stringify(message));
  }
}
Redis clients maintain persistent connections. Creating multiple instances would waste resources and create connection overhead. The singleton pattern ensures only one Redis connection per process.

Adding orders to queue

The backend API enqueues orders using the addToOrderQueue function:
// From packages/order-queue/src/queues/orderQueue.ts:9
const QUEUE_NAME = "ORDER_QUEUE";

export const addToOrderQueue = async (order: object) => {
  try {
    await redisClient.lPush(QUEUE_NAME, JSON.stringify(order));
    logger.info(`Order added to queue: ${JSON.stringify(order)}`);
  } catch (err) {
    if (err instanceof Error)
      logger.error(`Error adding order to queue: ${err.message}`);
  }
};

Usage in backend

// From apps/server/src/controllers/order/index.ts:16
export const placeHandler = AsyncWrapper(
  async (req: Request<{}, {}, TPlaceOrderReq>, res) => {
    const { event_id, l1_expected_price, l1_order_quantity, offer_type, userid } = req.body;

    const data = {
      type: uuid4(),
      data: {
        market: event_id,
        price: l1_expected_price,
        type: "CREATE_ORDER",
        quantity: l1_order_quantity,
        side: offer_type,
        userId: userid.toString(),
      },
    };

    await addToOrderQueue(data);
    let response = new SuccessResponse("Order placed successfully", 201);
    return res.status(201).json(response);
  }
);
The API returns immediately after enqueuing. The actual order execution happens asynchronously in the worker.

Processing orders from queue

The worker continuously polls the queue and processes orders:
// From packages/order-queue/src/queues/orderQueue.ts:19
export const processOrderQueue = async () => {
  while (true) {
    try {
      const order = await redisClient.lPop(QUEUE_NAME);
      if (order) {
        const orderObj: MessageFromApi = JSON.parse(order);
        const userid =
          orderObj.type == CREATE_ORDER ? orderObj.data.userId : null;
        if (!userid) {
          logger.error(`Error processing order: userId not found`);
          continue;
        }
        // Process order with engine
        // engine.processOrders({ message: orderObj, clientId: userid });
      }
    } catch (err) {
      if (err instanceof Error) {
        logger.error(`Error processing order: ${err.message}`);
      }
    }
  }
};
The infinite while (true) loop continuously polls Redis. In production, consider adding a small delay or using blPop (blocking pop) to reduce CPU usage.

Message types

All messages going through the queue conform to the MessageFromApi type:
// From packages/types/src/index.ts:57
export type MessageFromApi =
  | {
      type: typeof CREATE_ORDER;
      data: {
        market: string;
        price: number;
        quantity: number;
        side: "yes" | "no";
        userId: string;
      };
    }
  | {
      type: typeof CANCEL_ORDER;
      data: {
        orderId: string;
        market: string;
      };
    }
  | {
      type: typeof ON_RAMP;
      data: {
        amount: number;
        userId: string;
        txnId: string;
      };
    }
  | {
      type: typeof GET_DEPTH;
      data: {
        market: string;
      };
    }
  | {
      type: typeof GET_OPEN_ORDERS;
      data: {
        userId: string;
        market: string;
      };
    };
  • CREATE_ORDER: Place a new order
  • CANCEL_ORDER: Cancel an existing order
  • ON_RAMP: Add funds to user balance
  • GET_DEPTH: Request current order book depth
  • GET_OPEN_ORDERS: Request user’s open orders

Response flow

After processing an order, the engine sends responses back to the API:
// From services/engine/src/trade/Engine.ts:78
RedisManager.getInstance().sendToApi(clientId, {
  type: "ORDER_PLACED",
  payload: {
    orderId,
    executedQty,
    fills,
  },
});
Response types:
// From packages/types/src/index.ts:135
export type MessageToApi =
  | {
      type: "DEPTH";
      payload: {
        bids: [string, string][];
        asks: [string, string][];
      };
    }
  | {
      type: "ORDER_PLACED";
      payload: {
        orderId: string;
        executedQty: number;
        fills: {
          price: number;
          qty: number;
          tradeId: string;
        }[];
      };
    }
  | {
      type: "ORDER_CANCELLED";
      payload: {
        orderId: string;
        executedQty: number;
        remainingQty: number;
      };
    }
  | {
      type: "OPEN_ORDERS";
      payload: Order[];
    };

Database persistence queue

Separate from the order queue, there’s a database processing queue:
// From packages/order-queue/src/classes/RedisManager.ts:25
public pushMessage(message: DbMessage) {
  this.client.lPush("db_processor", JSON.stringify(message));
}
This queue stores database operations:
// From packages/types/src/index.ts:99
export type DbMessage =
  | {
      type: typeof TRADE_ADDED;
      data: {
        id: string;
        isBuyerMaker: boolean;
        price: number;
        quantity: number;
        timestamp: number;
        market: string;
      };
    }
  | {
      type: typeof ORDER_UPDATE;
      data: {
        orderId: string;
        executedQty: number;
        market?: string;
        price?: string;
        quantity?: string;
        side?: "yes" | "no";
      };
    };
Database writes are also asynchronous to prevent slow disk I/O from blocking the matching engine.

Publishing to WebSocket

After order execution, the engine publishes updates to Redis Pub/Sub:
// From packages/order-queue/src/classes/RedisManager.ts:29
public publishMessage(channel: string, message: WsMessage) {
  this.client.publish(channel, JSON.stringify(message));
}
Example usage:
// From services/engine/src/trade/Engine.ts:308
RedisManager.getInstance().pushMessage({
  type: TRADE_ADDED,
  data: {
    market: market,
    id: fill.tradeId.toString(),
    isBuyerMaker: fill.otherUserId === userId,
    price: fill.price,
    quantity: fill.qty,
    timestamp: Date.now(),
  },
});

Queue patterns

Producer-consumer pattern

import { addToOrderQueue } from "@repo/order-queue";

// Produce: Add order to queue
await addToOrderQueue({
  type: "CREATE_ORDER",
  data: {
    market: "bitcoin-event",
    price: 5.5,
    quantity: 100,
    side: "yes",
    userId: "user123"
  }
});

Pub/Sub pattern

import { RedisManager } from "@repo/order-queue";

// Publish depth update
RedisManager.getInstance().publishMessage("depth@bitcoin-event", {
  stream: "depth@bitcoin-event",
  data: {
    e: "depth",
    b: [["5.5", "100"]],
    a: [["6.0", "150"]]
  }
});

Error handling

Queue failures

export const addToOrderQueue = async (order: object) => {
  try {
    await redisClient.lPush(QUEUE_NAME, JSON.stringify(order));
    logger.info(`Order added to queue: ${JSON.stringify(order)}`);
  } catch (err) {
    if (err instanceof Error) {
      logger.error(`Error adding order to queue: ${err.message}`);
      // In production, implement retry logic or dead letter queue
      throw err;
    }
  }
};
If Redis is unavailable, orders will fail to enqueue. Implement circuit breakers and retry logic in production.

Processing failures

export const processOrderQueue = async () => {
  while (true) {
    try {
      const order = await redisClient.lPop(QUEUE_NAME);
      if (order) {
        const orderObj: MessageFromApi = JSON.parse(order);
        // Process order
        engine.processOrders({ message: orderObj, clientId: userid });
      }
    } catch (err) {
      if (err instanceof Error) {
        logger.error(`Error processing order: ${err.message}`);
        // Log error but continue processing
        // Consider adding failed order to dead letter queue
      }
    }
  }
};

Performance optimization

Batch processing

For high-throughput scenarios, process orders in batches:
const BATCH_SIZE = 10;

async function processBatch() {
  const orders = [];
  
  for (let i = 0; i < BATCH_SIZE; i++) {
    const order = await redisClient.lPop("ORDER_QUEUE");
    if (order) orders.push(JSON.parse(order));
  }
  
  // Process all orders in parallel
  await Promise.all(orders.map(order => engine.processOrders(order)));
}

Multiple workers

Scale horizontally by running multiple engine workers:
# Terminal 1
npm run worker

# Terminal 2
npm run worker

# Terminal 3
npm run worker
Each worker polls the same queue, and Redis ensures each order is processed exactly once.
With multiple workers, ensure your engine logic is stateless or use distributed locks for shared state.

Monitoring

Queue length

Monitor queue depth to detect backlogs:
const queueLength = await redisClient.lLen("ORDER_QUEUE");

if (queueLength > 1000) {
  logger.warn(`Queue backlog detected: ${queueLength} orders pending`);
}

Processing rate

Track orders processed per second:
let ordersProcessed = 0;
let lastCheck = Date.now();

setInterval(() => {
  const now = Date.now();
  const elapsed = (now - lastCheck) / 1000;
  const rate = ordersProcessed / elapsed;
  
  logger.info(`Processing rate: ${rate.toFixed(2)} orders/sec`);
  
  ordersProcessed = 0;
  lastCheck = now;
}, 10000); // Check every 10 seconds
In production, use monitoring tools like Prometheus, Grafana, or Redis monitoring solutions to track queue metrics.