The market data feed response requires decoding using protobuf, utilizing the provided proto file. You'll need to translate the Market Data Proto File into the appropriate format compatible with your specific programming language

Hi,
Assume I want to subscribe 500 Instrument.

I have initiated websocket to rec tick data of the 500 subscribed instrument

As per documentation “The market data feed response requires decoding using protobuf, utilizing the provided proto file. You’ll need to translate the Market Data Proto File into the appropriate format compatible with your specific programming language.”
here is the return

Instrument list key: [
  'NSE_FO|56785', 'NSE_FO|44897', 'NSE_FO|44896',
  'NSE_FO|44887', 'NSE_FO|44886', 'NSE_FO|44885',
  'NSE_FO|44884', 'NSE_FO|44889', 'NSE_FO|44888',
  'NSE_FO|44893', 'NSE_FO|44892', 'NSE_FO|44854',
  'NSE_FO|44853', 'NSE_FO|44852', 'NSE_FO|44858',
  'NSE_FO|44857', 'NSE_FO|44856', 'NSE_FO|44855',
  'NSE_FO|44859', 'NSE_FO|44861', 'NSE_FO|44860',
  'NSE_FO|44843', 'NSE_FO|44842', 'NSE_FO|44845',
  'NSE_FO|44844', 'NSE_FO|44875', 'NSE_FO|44874',
  'NSE_FO|44873', 'NSE_FO|44879', 'NSE_FO|44878',
  'NSE_FO|44883', 'NSE_FO|44882', 'NSE_FO|44881',
  'NSE_FO|44880', 'NSE_FO|44863', 'NSE_FO|44862',
  'NSE_FO|44869', 'NSE_FO|44868', 'NSE_FO|44867',
  'NSE_FO|44866', 'NSE_FO|44872', 'NSE_FO|44871',
  'NSE_FO|44870'
]
Received message: FeedResponse {
  feeds: {
    'NSE_FO|44854': Feed { ltpc: [LTPC] },
    'NSE_FO|44897': Feed { ltpc: [LTPC] },
    'NSE_FO|44853': Feed { ltpc: [LTPC] },
    'NSE_FO|44875': Feed { ltpc: [LTPC] },
    'NSE_FO|44896': Feed { ltpc: [LTPC] },
    'NSE_FO|44852': Feed { ltpc: [LTPC] },
    'NSE_FO|44874': Feed { ltpc: [LTPC] },
    'NSE_FO|44873': Feed { ltpc: [LTPC] },
    'NSE_FO|44858': Feed { ltpc: [LTPC] },
    'NSE_FO|44857': Feed { ltpc: [LTPC] },
    'NSE_FO|44879': Feed { ltpc: [LTPC] },
    'NSE_FO|44856': Feed { ltpc: [LTPC] },
    'NSE_FO|44878': Feed { ltpc: [LTPC] },
    'NSE_FO|44855': Feed { ltpc: [LTPC] },
    'NSE_FO|44859': Feed { ltpc: [LTPC] },
    'NSE_FO|56785': Feed { ltpc: [LTPC] },
    'NSE_FO|44861': Feed { ltpc: [LTPC] },
    'NSE_FO|44883': Feed { ltpc: [LTPC] },
    'NSE_FO|44860': Feed { ltpc: [LTPC] },
    'NSE_FO|44882': Feed { ltpc: [LTPC] },
    'NSE_FO|44881': Feed { ltpc: [LTPC] },
    'NSE_FO|44880': Feed { ltpc: [LTPC] },
    'NSE_FO|44887': Feed { ltpc: [LTPC] },
    'NSE_FO|44843': Feed { ltpc: [LTPC] },
    'NSE_FO|44886': Feed { ltpc: [LTPC] },
    'NSE_FO|44842': Feed { ltpc: [LTPC] },
    'NSE_FO|44885': Feed { ltpc: [LTPC] },
    'NSE_FO|44863': Feed { ltpc: [LTPC] },
    'NSE_FO|44884': Feed { ltpc: [LTPC] },
    'NSE_FO|44862': Feed { ltpc: [LTPC] },
    'NSE_FO|44869': Feed { ltpc: [LTPC] },
    'NSE_FO|44868': Feed { ltpc: [LTPC] },
    'NSE_FO|44889': Feed { ltpc: [LTPC] },
    'NSE_FO|44845': Feed { ltpc: [LTPC] },
    'NSE_FO|44867': Feed { ltpc: [LTPC] },
    'NSE_FO|44888': Feed { ltpc: [LTPC] },
    'NSE_FO|44844': Feed { ltpc: [LTPC] },
    'NSE_FO|44866': Feed { ltpc: [LTPC] },
    'NSE_FO|44872': Feed { ltpc: [LTPC] },
    'NSE_FO|44893': Feed { ltpc: [LTPC] },
    'NSE_FO|44871': Feed { ltpc: [LTPC] },
    'NSE_FO|44892': Feed { ltpc: [LTPC] },
    'NSE_FO|44870': Feed { ltpc: [LTPC] }
  },
  currentTs: Long { low: 1399965911, high: 407, unsigned: false }
}

here to the ltp of those should I need to loop to all one by one??

It means in all tick I need to loop 500 times here in my ex, is it correct??

If yes then its a huge process in each ticker just to extract the ltp.

if no please suggest the way to do in node js

Also Stuck in another issue while trying to use node sdk

import { UpstoxClient } from 'upstox-js-sdk';
let defaultClient = UpstoxClient.ApiClient.instance;

// Configure OAuth2 access token for authorization: OAUTH2
let OAUTH2 = defaultClient.authentications['OAUTH2'];
OAUTH2.accessToken = 'eyJ0eXAiOiJKV1QiLCJrZXlfaWQiOiJza192MS4wIiwiYWxnIjoiSFMyNTYifQ.eyJzdWIiOiIyQkNDRlUiLCJqdGkiOiI2ODQ2ODBlOGZmNjkzZTA0NGQyMzBhMTQiLCJpc011bHRpQ2xpZW50IjpmYWxzZSwiaXNQbHVzUGxhbiI6dHJ1ZSwiaWF0IjoxNzQ5NDUwOTg0LCJpc3MiOiJ1ZGFwaS1nYXRld2F5LXNlcnZpY2UiLCJleHAiOjE3NDk1MDY0MDB9.tdhE-83DKtkyktchwh56Wb9_CentYqIFyU4iWc78cKc';

let apiInstance = new UpstoxClient.ExpiredInstrumentApi();
let instrumentKey = "NSE_INDEX|Nifty 50"; // String | Instrument Key of asset

apiInstance.getExpiries(instrumentKey, (error, data, response) => {
  if (error) {
    console.error(error);
  } else {
    console.log('API called successfully. Returned data: ' + data);
  }
});

here i am getting issue. I am using esmodule because of indicator is only support es module.

dhiraj@Dhirajs-MacBook-Pro multi-user-algo % node getExpiries.js
file:///opt/homebrew/var/www/TGA-Algo/multi-user-algo/getExpiries.js:1
import { UpstoxClient } from 'upstox-js-sdk';
         ^^^^^^^^^^^^
SyntaxError: Named export 'UpstoxClient' not found. The requested module 'upstox-js-sdk' is a CommonJS module, which may not support all module.exports as named exports.
CommonJS modules can always be imported via the default export, for example using:

import pkg from 'upstox-js-sdk';
const { UpstoxClient } = pkg;

    at ModuleJob._instantiate (node:internal/modules/esm/module_job:128:21)
    at async ModuleJob.run (node:internal/modules/esm/module_job:194:5)
    at async Promise.all (index 0)
    at async ESMLoader.import (node:internal/modules/esm/loader:385:24)
    at async loadESM (node:internal/process/esm_loader:88:5)
    at async handleMainPromise (node:internal/modules/run_main:61:12)
dhiraj@Dhirajs-MacBook-Pro multi-user-algo % 

Here is my package.json

"dependencies": {
    "@debut/indicators": "^1.3.22",
    "axios": "^1.9.0",
    "crypto-js": "^4.2.0",
    "dotenv": "^16.5.0",
    "express": "^5.1.0",
    "fs-extra": "^11.3.0",
    "moment": "^2.30.1",
    "node-fetch": "^3.3.2",
    "otplib": "^12.0.1",
    "path": "^0.12.7",
    "protobufjs": "^7.5.1",
    "rxjs": "^7.8.2",
    "serve-index": "^1.9.1",
    "speakeasy": "^2.0.0",
    "upstox-js-sdk": "^2.17.0",
    "ws": "^8.18.2"
  },

@MAHESWARI_31155756 If you’re subscribing to the LTPC mode, you can refer to the response structure provided here.

Yes, you should use a HashMap to manage data for each instrument, rather than iterating over them one by one.

To resolve your syntax error, you can refer to the CommonJS syntax example below.

Thank you for your response and I resolve the error.

I have another doubt in doc.

for websocket - v3

which step to follow as suggestion.

or


HI

I am facing this below error everyday when ever I am starting the streamer

after this error i must need to restart my application, once its restart it resolved.

here is my code

import UpstoxClient from 'upstox-js-sdk';
import { accessToken } from './dataToken.js'; // accessToken should be a Promise
import { EventEmitter } from 'events';
import { logUniversal } from './utils/logger.js';
import { wss } from './app.js'; // Import the WebSocket server instance

export const feedEmitter = new EventEmitter();
// export let wss 


// Add this helper here 👇
function broadcastStreamStatus(statusObj) {
  const msg = JSON.stringify({ streamStatus: statusObj });
  wss.clients.forEach(client => {
    if (client.readyState === client.OPEN) {
      client.send(msg);
    }
  });
}
// Helper to broadcast any message to all UI clients
export function broadcastToUI(data) {
  const msg = JSON.stringify(data);
  wss.clients.forEach(client => {
    if (client.readyState === client.OPEN) {
      client.send(msg);
    }
  });
}

class MarketStreamService {
  constructor() {
    this.streamer = null;
    this.isConnected = false;
    this.isConnecting = false;
    this.retryCount = 0;
    this.maxRetries = 5;
    this.retryDelay = 10000; // ms
    this.currentToken = null;
    this.pendingInstrumentKeys = [];
    this.pendingMode = 'full';
    // this.broadcastToUI = null;
  }

  async init() {
    if (this.streamer) {
      return { success: false, message: "Already initialized." };
    }
    // this.currentToken = await accessToken;
    // let defaultClient = UpstoxClient.ApiClient.instance;
    // let OAUTH2 = defaultClient.authentications["OAUTH2"];
    // OAUTH2.accessToken = this.currentToken;
    console.log("Token : ", this.currentToken);
    this.streamer = new UpstoxClient.MarketDataStreamerV3();
    this.streamer.autoReconnect(true, 10, 5);
    // this.streamer.autoReconnect(false);


    // Set up event listeners
    this.streamer.on("open", () => {
      this.isConnected = true;
      this.isConnecting = false;
      this.retryCount = 0;
      if (this.pendingInstrumentKeys.length > 0) {
        this.streamer.subscribe(this.pendingInstrumentKeys, this.pendingMode);
        const logMsg = `Connected to Market Data Streamer. Subscribed to ${this.pendingInstrumentKeys.length} instruments in '${this.pendingMode}' mode.`;
        console.log(logMsg);
        logUniversal(logMsg);
        broadcastStreamStatus({ status: "on", message: "Streaming started" });
      } else {
        console.log("Connected to Market Data Streamer. No instruments to subscribe.");
      }
    });

    this.streamer.on("message", (data) => {
      const feed = data.toString("utf-8");
      try {
        const parsedFeed = JSON.parse(feed);
        // Broadcast to strategies
        feedEmitter.emit("tick", parsedFeed.feeds, parsedFeed.currentTs);
        // console.log("Received feed:", parsedFeed.feeds); //51003
        // Broadcast to UI
        broadcastToUI({
          type: "market_feed",
          data: parsedFeed.feeds
        });
        // --- Timer logic: Stop stream at 15:35 IST ---
        const now = new Date();
        // Add 5.5 hours to UTC to get IST
        const istTime = new Date(now.getTime() + (5.5 * 60 * 60 * 1000));
        const istHour = istTime.getUTCHours();
        const istMinute = istTime.getUTCMinutes();
        if (istHour === 15 && istMinute === 30) {
          this.stopStream();
          logUniversal("Market stream stopped automatically at 15:35 IST.");
        }
        // --- End timer logic ---
      } catch (e) {
        console.error("Feed parse error:", e, feed);
        logUniversal(`Feed parse error: ${e.message} - ${feed}`);
        broadcastStreamStatus({ status: "error", message: `Feed parse error: ${e.message}` });
      }
    });

    this.streamer.on("error", (error) => {
      this.isConnected = false;
      this.isConnecting = false;
      console.error("Streamer error:", error);
      logUniversal(`Streamer error: ${error.message}`);
      const result = { event: "error", success: false, message: `Streamer error: ${error.message}` };
      console.log(result);
      broadcastStreamStatus({ status: "error", message: error.message });
      // this._retryConnection();
    });

    this.streamer.on("close", () => {
      this.isConnected = false;
      this.isConnecting = false;
      const result = { event: "close", success: true, message: "Streamer connection closed." };
      console.log(result);
      broadcastStreamStatus({ status: "off", message: "Streaming stopped" });
      logUniversal("Streamer connection closed.");
    });

    this.streamer.on("reconnecting", () => {
      this.isConnecting = true;
      const result = { event: "reconnecting", success: true, message: "Streamer is reconnecting..." };
      console.log(result);
      broadcastStreamStatus({ status: "retrying", message: "Reconnecting..." });
      logUniversal("Streamer is reconnecting...");
    });

    this.streamer.on("autoReconnectStopped", (data) => {
      this.isConnected = false;
      this.isConnecting = false;
      const result = { event: "autoReconnectStopped", success: false, message: "Auto-reconnect stopped." };
      broadcastStreamStatus({ status: "off", message: "Auto-reconnect stopped" });
      console.log(result);
      console.log(data);
      logUniversal(`Auto-reconnect stopped: ${data.message}`);
    });


    return { success: true, message: "Initialized." };

  }

  async startStream(instrumentKeys, mode = 'full') {
    // Always get a fresh token before starting/connecting
    this.currentToken = await accessToken;
    let defaultClient = UpstoxClient.ApiClient.instance;
    let OAUTH2 = defaultClient.authentications["OAUTH2"];
    OAUTH2.accessToken = this.currentToken;
    if (!this.streamer) {
      const initResult = await this.init();
      if (!initResult.success) return initResult;
    }
    if (this.isConnected) {
      return this.addInstruments(instrumentKeys, mode);
    }
    if (this.isConnecting) {
      logUniversal("Streamer is already connecting. Please wait.");
      return { success: false, message: "Streamer is already connecting. Please wait." };
    }
    console.log(`Starting stream with ${instrumentKeys.length} instruments in '${mode}' mode.`);
    this.pendingInstrumentKeys = instrumentKeys;

    this.pendingMode = mode;
    this.isConnecting = true;
    this.streamer.connect();
    logUniversal("Connecting and subscribing...");
    return { success: true, message: "Connecting and subscribing..." };
  }

  addInstruments(instrumentKeys, mode = 'full') {
    if (!this.isConnected) {
      logUniversal("Not connected to stream. Cannot add instruments.");
      return { success: false, message: "Not connected to stream. Cannot add instruments." };
    }
    this.streamer.subscribe(instrumentKeys, mode);
    logUniversal(`Subscribed to ${instrumentKeys.length} instruments in '${mode}' mode.`);
    return { success: true, message: `Added ${instrumentKeys.length} instruments in '${mode}' mode.` };
  }

  removeInstruments(instrumentKeys) {
    if (!this.isConnected) {
      logUniversal("Not connected to stream. Cannot remove instruments.");
      return { success: false, message: "Not connected to stream. Cannot remove instruments." };
    }
    this.streamer.unsubscribe(instrumentKeys);
    logUniversal(`Unsubscribed from ${instrumentKeys.length} instruments.`);
    return { success: true, message: `Unsubscribed from ${instrumentKeys.length} instruments.` };
  }

  changeMode(instrumentKeys, mode) {
    if (!this.isConnected) {
      logUniversal("Not connected to stream. Cannot change mode.");
      return { success: false, message: "Not connected to stream. Cannot change mode." };
    }
    this.streamer.changeMode(instrumentKeys, mode);
    logUniversal(`Changed mode for ${instrumentKeys.length} instruments to '${mode}'.`);
    return { success: true, message: `Changed mode for ${instrumentKeys.length} instruments to '${mode}'.` };
  }

  stopStream() {
    console.log("Stopping Market Data Streamer... HITTTTTT");
    if (this.streamer && (this.isConnected || this.isConnecting)) {
      this.streamer.disconnect();
      this.isConnected = false;
      this.isConnecting = false;
      logUniversal("Disconnected from Market Data Streamer.");
      return { success: true, message: "Disconnected from Market Data Streamer." };
    }
    logUniversal("Streamer is not active. No action taken.");
    return { success: false, message: "Streamer is not active. No action taken." };
  }

  getStatus() {
    let count = 0;
    if (
      this.streamer &&
      this.streamer.subscriptions &&
      this.pendingMode &&
      this.streamer.subscriptions[this.pendingMode]
    ) {
      count = this.streamer.subscriptions[this.pendingMode].size;
    }
    return {
      initialized: !!this.streamer,
      connected: this.isConnected,
      connecting: this.isConnecting,
      subscribedInstrumentsCount: count,
      autoReconnectEnabled: this.streamer ? this.streamer.autoReconnectEnabled : false,
    };
  }

  _retryConnection() {
    if (this.retryCount >= this.maxRetries) {
      return;
    }
    this.retryCount++;
    const delay = this.retryDelay * this.retryCount;
    setTimeout(() => {
      this.isConnecting = true;
      this.streamer.connect();
    }, delay);
  }

}

export const marketStreamService = new MarketStreamService();


Hi Anand Sajankar

any update please reply

@MAHESWARI_31155756 It seems you’re using an expired or invalid token when initiating the WebSocket connection. When you restart the app, it likely fetches or uses a fresh token, which is why the stream works again.

Please note: The access_token obtained through has a specific validity period that lasts until 3:30 AM the following day

Could you please confirm if the accessToken is being refreshed before establishing the WebSocket connection?

Thanks

Yes before trigger the stream i was generate the token 1st

100% sure about it.

for 401 I was also in the same page and do code change too

but the same issue

even before the token generation and straming I restarted my entire application is aws. Kind of reinitializing everything.

this also wont help full.

same token but 1st strean request fail but 2nd works, TOmorrow i will share the token details