Here, I am issue related to web socket. I am successfully connecting with web socket and also receiving market feed as well but not getting further feeds like live feed. Can you please suggest me proper solution for it?
Here is the my code snip which I have implemented for web socket.
def run(cron_job = true, run_by = "scheduler")
cron_data = CronData.create(start_at: Time.now, cron_title: "Sockets::UpstoxClient.start_socket_connection", run_by: run_by)
begin
EM.run {
if EM.connection_count < 1
WebsocketActivity.where(status: "open").update_all(stopped_at: DateTime.now, status: "stopped")
web_socket_activity = WebsocketActivity.create!(title: "Sockets::UpstoxClient.run", status: "open", start_at: DateTime.now)
token = "Bearer #{ENV['UPSTOX_ACCESS_TOKEN']}"
redirect_url = get_redirect_url(token)
ws = Faye::WebSocket::Client.new(redirect_url,
['json'],
headers: {
'Authorization' => token,
'Accept' => '*/*'
}
)
symbol_keys = ['NSE_INDEX|Nifty 50'] #CacheModule.get_all_upstox_instrument_keys
puts "symbol_keys: #{symbol_keys}"
ws.on :open do |event|
puts "✅ Upstox WebSocket Connected"
subscription_payload = {
guid: SecureRandom.uuid,
method: 'sub',
data: {
mode: 'ltpc',
instrumentKeys: symbol_keys #['MCX_FO|220230']
}
}
ws.send(subscription_payload.to_json)
# EM.add_periodic_timer(30) {
# puts "🫀 Heartbeat at #{Time.now}"
# # Upstox doesn’t require ping unless explicitly stated
# }
puts "📩 Sent subscription"
end
ws.on :message do |event|
begin
puts "📡 Received binary data"
# Decode using protobuf here
raw = event.data
binary_data = raw.is_a?(Array) ? raw.pack('C*') : raw
decoded = Com::Upstox::Marketdatafeederv3udapi::Rpc::Proto::FeedResponse.decode(binary_data)
puts "Decoded data #{decoded}"
puts "📥 Raw binary size: #{binary_data.bytesize} bytes"
puts "📦 Decoded type: #{decoded.type}"
puts "📊 Instruments in feed: #{decoded.feeds.keys}"
rescue JSON::ParserError => e
puts "JSON Parse Error: #{e.message}"
ActionCable.server.broadcast "messages", { error: "Error parsing message" }
end
end
ws.on :close do |event|
web_socket_activity.update!(status: "closed", stopped_at: DateTime.now)
puts "❌ Upstox WebSocket Closed (#{event})"
ws = nil
EM.stop
StockCompanies::UpstoxWebsocketClientStartWorker.perform_at(30.seconds.from_now)
end
ws.on :error do |e|
puts "❌ WebSocket Error: #{e.message}"
end
end
}
cron_data.update(status: "success", end_at: Time.now)
rescue => e
cron_data.update(status: "error", error: e.message, end_at: Time.now)
puts "⚠️ WebSocket Exception: #{e.message}"
end
end
private
def get_redirect_url(token)
url = URI("https://api.upstox.com/v3/feed/market-data-feed/authorize")
https = Net::HTTP.new(url.host, url.port)
https.use_ssl = true
request = Net::HTTP::Get.new(url)
request["Authorization"] = token
request["Accept"] = "application/json"
response = https.request(request)
JSON.parse(response.read_body)["data"]["authorized_redirect_uri"]
end