Not getting proper data over the websocket

I established the web-socket connection using Market Data Feed | Upstox Developer API.

But, when I decoded message then not able to received proper stream.

FeedResponse {
feeds: { ‘NSE_EQ|INE040A01034’: Feed { ff: [FullFeed] } },
currentTs: Long { low: -602496299, high: 402, unsigned: false }
}
I received above information during live market.

Could you please share the approximate time when you received this data from the websocket? This will help us investigate further.

It’s possible that the issue may be related to decoding on your end when handling binary data. To rule out any local issues, please refer to the streamer functionalities available in various languages:

Thank you!

@Ketan Thank you for the reply. I did it around 11:00 AM. It was working fine but suddenly stop working provided me this kind of data. I also provided code which I am using. I took the reference from your upstox github repo.

const { v4: uuidv4 } = require(“uuid”);
const WebSocket = require(‘ws’);
const protobuf = require(“protobufjs”);
const PlatformService = require(‘./platformService’);
const config = require(‘…/config/websocketConfig’);
const feedHandler = require(‘…/handlers/feedHandler’);
const redisClient = require(‘…/utils/redisClient’); // Singleton Redis client

const logger = require(“…/…/…/paper-trade/src/utils/logger”);
class UpstoxService extends PlatformService {
constructor(config) {
super(‘Upstox’);
this.config = config;
this.protobufRoot = null;
this.userId = config.userId;
this.sId = config.sId;
this.stocksArr = ;
this.initProtobuf();
}

connect() {
    return new Promise((resolve, reject) => {
        const wssURL = this.config?.url ? this.config.url : config.upstox.url;
        const apiVersion = this.config.apiVersion || config.upstox.api_version;

        logger.info(JSON.stringify({ wssURL, apiVersion, token: this.config.accessToken }));

        this.ws = new WebSocket(wssURL, {
            headers: {
                "Api-Version": apiVersion,
                Authorization: "Bearer " + this.config.accessToken,
            },
            followRedirects: true,
        });

        this.ws.on('open', () => {
            console.log(`[Upstox Service] WebSocket connection established`);
            resolve();
        });

        this.ws.on('message', (data) => {
            this.onMessage(data);
        });

        this.ws.on('error', (error) => {
            reject(error);
        });

        this.ws.on('close', () => {
            console.log(`[Upstox Service] WebSocket connection closed`);
            this.reconnect();
        });

    });
}

// Function to initialize the protobuf part
async initProtobuf() {
    this.protobufRoot = await protobuf.load(__dirname + "/UpstoxMarketDataFeed.proto");
    console.log("Protobuf part initialization complete");
}

subscribeStocks(stocksArr) {
    if (!this.ws) throw new Error('Web socket connection is not established for upstox.');

    if (!stocksArr || stocksArr.length === 0) throw new Error('You must provide instrumental keys to get the data from upstox.');

    this.stocksArr = stocksArr;

    const data = {
        guid: uuidv4(),
        method: "sub",
        data: {
            mode: "full",
            instrumentKeys: stocksArr,
        },
    };
    this.ws.send(Buffer.from(JSON.stringify(data)));
}

unsubscribeStocks(stocksArr = []) {
    if (!this.ws) throw new Error('Web socket connection is not established for upstox.');
    const data = {
        guid: uuidv4(),
        method: "unsub",
        data: {
            mode: "full",
            instrumentKeys: stocksArr,
        },
    };
    this.ws.send(Buffer.from(JSON.stringify(data)));
}

async decodeProtobuf(buffer) {
    if (!this.protobufRoot) {
        console.warn("Protobuf part not initialized yet!");
        return null;
    }

    const FeedResponse = this.protobufRoot.lookupType(
        "com.upstox.marketdatafeeder.rpc.proto.FeedResponse"
    );

    const decoded = await FeedResponse.decode(buffer);
    console.info(decoded);
    return decoded;
};

async onMessage(message) {
    const decodedData =  await this.decodeProtobuf(message);
    const parsedMessage = JSON.parse(decodedData);
    // Process the feed and pass to the handler
    if (parsedMessage) {
        // Publish feed data to Redis using the common Redis client (Singleton)
        await redisClient.publish(`UPSTOX_DATA_FEED_${this.sId}`, JSON.stringify(parsedMessage));
        console.log("[Upstox Service] Published feed data to Redis" + JSON.stringify(parsedMessage));
    }
}

async reconnect() {
    try {
        await this.connect();
        if (this.stocksArr.length > 0) {
            this.subscribeStocks(this.stocksArr);
        }
    } catch (error) {
        console.error("Reconnection error:", error);
    }
}

}
module.exports = UpstoxService;