Websockets Issue in RUST, not able to get real time feed

when i try to subscribe to the data i am getting the below error connection is succeeded but not able to get the data
{“data”:{“instrumentKeys”:[“NSE_EQ|INE002A01018”],“mode”:“full”},“guid”:“samyoga_sub_1”,“method”:“sub”}

2026-01-05T05:04:04.437751Z INFO samyoga_kernel::adapters::upstox::adapter: [UpstoxAdapter] :satellite_antenna: Subscribed to 1 instruments

2026-01-05T05:04:04.437798Z INFO samyoga_engine: [Ingest] :satellite_antenna: Stream Active (Epoch 1)

2026-01-05T05:04:04.437873Z INFO samyoga_kernel::adapters::upstox::adapter: [UpstoxAdapter] :package: Binary message (165 bytes)

2026-01-05T05:04:04.437934Z INFO samyoga_kernel::adapters::upstox::normalizer: [Normalizer] Raw bytes (first 32): 080218daf6a9e4b8332299010a0b0a074e53455f434f4d10020a0a0a064e4344

2026-01-05T05:04:04.437992Z INFO samyoga_kernel::adapters::upstox::normalizer: [Normalizer] Decoded: type=2, feeds=0, currentTs=1767589444442

can any one please help here.??

Hi @Nandaluri_23644267,

Looking at your logs, I see the WebSocket connection succeeds and you receive the initial subscription confirmation, but the data stream isn’t flowing. Here are a few things to check:

  1. Message Pump Loop: Ensure your Rust WebSocket client has a proper message pump/receive loop. The logs show you’re receiving a binary message (165 bytes), but your code might not be actively polling/waiting for subsequent messages.

  2. Blocking vs Async: Are you using blocking WebSockets or async (tokio)? With blocking sockets, you need to continuously call read() in a loop. With async, ensure you’re properly awaiting the message stream.

  3. Buffer/Stream Handling: The 165-byte message you’re receiving is likely market data. You need to:

    • Keep the receive loop running
    • Handle binary frames (not just text frames)
    • Implement proper error handling if the connection drops
  4. Subscription Persistence: After subscription, the stream should continuously push data. If it stops after the first message, check:

    • If there’s an exception being silently caught
    • If the receive thread is blocking on something else
    • Connection timeout settings
  5. Debug Suggestion: Add detailed logging to the receive loop to see if messages are arriving but not being processed, or if they’re not arriving at all.

Can you share your message receive loop code? That would help identify where the data is being lost.

Also, check the Upstox WebSocket documentation for proper keep-alive/ping-pong frame handling in Rust.

Thanks for the response.

We verified the receive loop thoroughly:

  • Continuous async receive loop is active

  • Binary + text frames are logged

  • Ping/pong handled correctly

We observe identical behavior in:

  • Rust (tokio-tungstenite)

  • Python websocket-client

In all cases:

  • market_info (type=2) is received

  • feeds=0

  • no live_feed (type=1) frames ever arrive

  • connection remains alive with pings

REST market-quote APIs return live data for the same instruments and token.

This strongly suggests an account/app-level permission issue for Market Data Feed v3 streaming.

Could you please confirm:

  1. Whether WebSocket live streaming requires separate enablement

  2. Whether certain app types or plans are restricted from receiving live_feed data

  3. Whether feeds=0 indicates streaming is disabled for the app/account

pub async fn next_raw(&mut self) -> Option<RawUpstoxMessage> {
    let socket = self.socket.as_mut()?;
    
    loop {
        let msg = match socket.next().await {
            Some(Ok(m)) => m,
            Some(Err(e)) => {
                error!("Read Error: {}", e);
                return None;
            }
            None => {
                warn!("Socket closed");
                return None;
            }
        };
        
        match msg {
            Message::Binary(bytes) => {
                info!("Binary ({} bytes)", bytes.len());
                return Some(RawUpstoxMessage { bytes, received_at_ns: now });
            }
            Message::Ping(payload) => {
                info!("Ping -> Pong");
                socket.send(Message::Pong(payload)).await.ok();
            }
            Message::Text(text) => info!("Text: {}", text),
            Message::Close(_) => return None,
            _ => {}
        }
    }
}

can you please help here VENKATA_RA_1347175, i have been working on this for multiple days

I hope you are using .proto !

Based on the logs you shared, your WebSocket connection is successful and the subscription request is acknowledged by Upstox. However, the issue appears when decoding the binary feed. The raw bytes indicate that the payload being received is protobuf-encoded market-data, but you are not decoding it using the Upstox market-feed protobuf schema.

.proto

Hi @Nandaluri_23644267,

Great investigation! @RAJA_NAGA_5275531 nailed it - the issue is protobuf decoding. Your WebSocket connection and receive loop are working perfectly, but the binary frames need to be deserialized using Upstox’s market-feed protobuf schema.

The Root Cause:
Your logs show you’re receiving binary data correctly (165 bytes), but you’re just logging raw bytes. That payload IS the market data - it’s just encoded in protobuf format, not plain text.

Complete Rust Solution:

  1. Add to Cargo.toml:
[dependencies]
tokio = { version = "1", features = ["full"] }
tokio-tungstenite = "0.20"
prost = "0.12"
log = "0.4"

[build-dependencies]
prost-build = "0.12"
  1. Download the .proto file:
    Download from: https://assets.upstox.com/feed/market-data-feed/v3/MarketDataFeed.proto
    Place it in: src/proto/MarketDataFeed.proto

  2. Create build.rs:

use prost_build::Config;

fn main() {
    Config::new()
        .compile_protos(
            &["src/proto/MarketDataFeed.proto"],
            &["src/proto"],
        )
        .unwrap();
}
  1. Update your next_raw function:
pub async fn next_raw(&mut self) -> Option<RawUpstoxMessage> {
    let socket = self.socket.as_mut()?;
    
    loop {
        let msg = match socket.next().await {
            Some(Ok(m)) => m,
            Some(Err(e)) => {
                error!("Read Error: {}", e);
                return None;
            }
            None => {
                warn!("Socket closed");
                return None;
            }
        };

        match msg {
            Message::Binary(bytes) => {
                // CRITICAL: Decode protobuf data
                match prost::Message::decode(&*bytes) {
                    Ok(feed_data) => {
                        info!("✓ Decoded MarketFeed: {:?}", feed_data);
                        // NOW you'll see live_feed (type=1) flowing
                        return Some(RawUpstoxMessage {
                            bytes,
                            received_at_ns: now,
                        });
                    }
                    Err(e) => {
                        error!("Protobuf decode failed: {}", e);
                        continue; // Skip malformed frames
                    }
                }
            }
            Message::Ping(payload) => {
                let _ = socket.send(Message::Pong(payload)).await;
            }
            Message::Text(text) => info!("Text: {}", text),
            Message::Close(_) => return None,
            _ => {}
        }
    }
}

Why This Works:

  • Your code structure is correct (async loop, ping-pong handling, binary frame handling)
  • The issue is only the missing deserialization step
  • Once decoded, you’ll see actual market data (LTP, volume, bid/ask, Greeks, etc.)
  • Same data the REST APIs return, now in real-time

Verification:
After implementing:

  • feeds will show actual feed count (not 0)
  • live_feed (type=1) messages will arrive continuously
  • Each message will contain market ticks with decoded data
  • No more raw bytes - you’ll get structured market data

Let me know if you hit any issues with the protobuf setup. Also good practice to add proper error handling for malformed frames in production.

-VENKATA