I’m creating a web socket connection with upstox to fetch real time market data using the following code -
// Import required modules
import { ApiClient, WebsocketApi } from "upstox-js-sdk";
import { WebSocket } from "ws";
import pkg from "protobufjs";
const {load} = pkg;
// Initialize global variables
let protobufRoot = null;
let defaultClient = ApiClient.instance;
let apiVersion = "2.0";
let OAUTH2 = defaultClient.authentications["OAUTH2"];
// Function to authorize the market data feed
const getMarketFeedUrl = async () => {
return new Promise((resolve, reject) => {
let apiInstance = new WebsocketApi(); // Create new Websocket API instance
// Call the getMarketDataFeedAuthorize function from the API
apiInstance.getMarketDataFeedAuthorize(
apiVersion,
(error, data, response) => {
if (error) reject(error); // If there's an error, reject the promise
else resolve(data.data.authorizedRedirectUri); // Else, resolve the promise with the authorized URL
}
);
});
};
// Function to establish WebSocket connection
const connectWebSocket = async (wsUrl, instrumentKeys, callback) => {
return new Promise((resolve, reject) => {
const ws = new WebSocket(wsUrl, {
headers: {
"Api-Version": apiVersion,
Authorization: "Bearer " + OAUTH2.accessToken,
},
followRedirects: true,
});
// WebSocket event handlers
ws.on("open", () => {
console.log("connected");
resolve(ws); // Resolve the promise once connected
// Set a timeout to send a subscription message after 1 second
setTimeout(() => {
const data = {
guid: "someguid",
method: "sub",
data: {
mode: "full",
instrumentKeys: instrumentKeys,
},
};
ws.send(Buffer.from(JSON.stringify(data)));
}, 1000);
});
ws.on("close", () => {
console.log("disconnected");
});
ws.on("message", (data) => {
const decodedData = decodeProfobuf(data);
const information = Object.keys(decodedData.feeds).map(
(key) => ({
instrumentKey: key,
ltp: decodedData.feeds[key]?.ff?.indexFF?.ltpc?.ltp,
open1D: decodedData.feeds[key]?.ff?.indexFF?.marketOHLC?.ohlc[0].open,
close1D: decodedData.feeds[key]?.ff?.indexFF?.marketOHLC?.ohlc[0].close
})
);
callback(JSON.stringify(information));
});
ws.on("error", (error) => {
console.log("error:", error);
reject(error); // Reject the promise on error
});
});
};
// Function to initialize the protobuf part
const initProtobuf = async () => {
protobufRoot = await load("./utils/MarketDataFeed.proto");
console.log("Protobuf part initialization complete");
};
// Function to decode protobuf message
const decodeProfobuf = (buffer) => {
if (!protobufRoot) {
console.warn("Protobuf part not initialized yet!");
return null;
}
const FeedResponse = protobufRoot.lookupType(
"com.upstox.marketdatafeeder.rpc.proto.FeedResponse"
);
return FeedResponse.decode(buffer);
};
// Initialize the protobuf part and establish the WebSocket connection
export const getMarketDataFeed = (async (accessToken, instrumentKeys, callback) => {
OAUTH2.accessToken = accessToken;
try {
await initProtobuf(); // Initialize protobuf
const wsUrl = await getMarketFeedUrl(); // Get the market feed URL
const ws = await connectWebSocket(wsUrl, instrumentKeys, callback); // Connect to the WebSocket
}
catch (error) {
console.error("An error occurred:", error);
}
});
I’m calling the method getMarketDataFeed
to initialize the protobuf, establish a web socket connection to fetch real time market data. The callback passed to the method is the following -
(data) => {
socket.emit('market data', data);
}
The first time I call this method (when the application is brought up), everything seems fine. The second time onwards, I start getting the following error -
error: Error: Unexpected
server response: 403
at ClientRequest.<anonymous>
at ClientRequest.emit (node:events:514:28)
at HTTPParser.parserOnIncomingClient [as onIncoming] (node:_http_client:700:27)
at HTTPParser.parserOnHeadersComplete (node:_http_common:119:17)
at TLSSocket.socketOnData (node:_http_client:541:22)
at TLSSocket.emit (node:events:514:28)
at addChunk (node:internal/streams/readable:324:12)
at readableAddChunk (node:internal/streams/readable:297:9)
at Readable.push (node:internal/streams/readable:234:10)
at TLSWrap.onStreamRead (node:internal/stream_base_commons:190:23)
I don’t see anything wrong in the code. Can you please help me with this?