Not getting all Instruments details from Websocket while fetching live data

For the below code is working fine while the market is offline. However, when the market is online, I’m encountering an issue. Despite sending requests for 41 instruments with mode:full, I’m only receiving details for half of the instruments in each loop.

import asyncio
import json
import ssl
import upstox_client
import websockets
from google.protobuf.json_format import MessageToDict
from mkt_up_func import *
from threading import Thread
import pandas as pd
import time, os
import MarketDataFeed_pb2 as pb

start_time = time.time()

pd.set_option('display.max_rows', 10)  # Set the maximum number of rows to display
pd.set_option('display.max_columns', 10)

op_str1_file_entry = "op_strategy_entry.csv"
op_str1_file = "op_strategy1.csv"
op_str1_file_bkp = "op_strategy1_bkp.csv"
op_str2_entry = False

global instrumentKeys

# instrumentKeys = ["NSE_INDEX|Nifty Bank", "NSE_INDEX|Nifty 50"]
instrumentKeys = ["NSE_INDEX|Nifty Bank"]

token = token_file('Test')
if token == "Get Token":
    token = get_new_token()

def get_opt_bank_instrument_list(token, instrumentKeys, expiry):
    try:
        response = get_instrument_price(token, instrumentKeys)
    except Exception as e:
        print("Exception get_instrument_price: ", e)
    price = response['data']["NSE_INDEX:Nifty Bank"]['last_price']
    price = round(price / 100) * 100
    symbol_list = []
    strike_price_list = []
    ce = pe = price
    for i in range(20):
        ce = ce + 100
        pe = pe - 100
        symbol_list.append(expiry + str(ce) + "CE")
        symbol_list.append(expiry + str(pe) + "PE")
        strike_price_list.append(ce)
        strike_price_list.append(pe)
    # op_instrumentKeys = get_instrument_key_option(symbol_list)
    op_df = pd.DataFrame({'tr_symbol': symbol_list, 'strike_price': strike_price_list})
    op_df = get_instrument_key_option(op_df)
    instrumentKeys.append("Nifty Bank")
    instrumentKeys.append(0)
    new_df = pd.DataFrame([instrumentKeys], columns=op_df.columns)
    instrument_df = pd.concat([op_df, new_df], ignore_index=True)
    return instrument_df

expiry = "BANKNIFTY24APR"
# instrumentKeys updated with options Keys
instrument_df = get_opt_bank_instrument_list(token, instrumentKeys, expiry)
instrumentKeys = instrument_df['instrument_key'].to_list()

def get_datafram_from_websocket(json_data):
    # Parse JSON data
    data = json_data

    # Initialize an empty list to store data for all symbols
    rows = []
    try:
        # Iterate through each symbol in the feeds dictionary
        for code, symbol_data in data['feeds'].items():
            # if code == 'NSE_INDEX|Nifty Bank':
            #     print("Here")
            # Extract relevant data for the symbol
            if 'marketFF' in symbol_data.get('ff', {}):
                ltpc_data = symbol_data['ff']['marketFF'].get('ltpc', {})
                option_greeks_data = symbol_data['ff']['marketFF'].get('optionGreeks', {})
                values = {
                    'code': code,
                    'ltp': ltpc_data.get('ltp'),
                    'ltt': ltpc_data.get('ltt'),
                    'ltq': int(ltpc_data.get('ltq', 0)),
                    'cp': ltpc_data.get('cp'),
                    'delta': option_greeks_data.get('delta'),
                    'theta': option_greeks_data.get('theta'),
                    'gamma': option_greeks_data.get('gamma'),
                    'vega': option_greeks_data.get('vega')
                }
                # Append the data for the symbol to the list
                rows.append(values)
            elif 'indexFF' in symbol_data.get('ff', {}):
                ltpc_data = symbol_data['ff']['indexFF'].get('ltpc', {})
                values = {
                    'code': code,
                    'ltp': ltpc_data.get('ltp'),
                    'ltt': ltpc_data.get('ltt'),
                    'ltq': int(ltpc_data.get('ltq', 0)),
                    'cp': ltpc_data.get('cp'),
                    'delta': 0,
                    'theta': 0,
                    'gamma': 0,
                    'vega': 0
                }
                rows.append(values)
    except Exception as e:
        print("Exception in 'get_datafram_from_websocket' : ", e)
    # Create DataFrame from the list of dictionaries
    df = pd.DataFrame(rows)
    # print(df)
    return df

def get_market_data_feed_authorize(api_version, configuration):
    """Get authorization for market data feed."""
    api_instance = upstox_client.WebsocketApi(
        upstox_client.ApiClient(configuration))
    api_response = api_instance.get_market_data_feed_authorize(api_version)
    return api_response

def decode_protobuf(buffer):
    """Decode protobuf message."""
    feed_response = pb.FeedResponse()
    feed_response.ParseFromString(buffer)
    return feed_response

async def fetch_market_data():
    """Fetch market data using WebSocket and print it."""
    global data_dict
    # Create default SSL context
    ssl_context = ssl.create_default_context()
    ssl_context.check_hostname = False
    ssl_context.verify_mode = ssl.CERT_NONE

    # Configure OAuth2 access token for authorization
    configuration = upstox_client.Configuration()

    api_version = '2.0'

    configuration.access_token = token

    # Get market data feed authorization
    response = get_market_data_feed_authorize(
        api_version, configuration)

    # Connect to the WebSocket with SSL context
    async with websockets.connect(response.data.authorized_redirect_uri, ssl=ssl_context) as websocket:
        print('Connection established')
        await asyncio.sleep(1)  # Wait for 1 second
        # Data to be sent over the WebSocket
        data = {
            "guid": "someguid",
            "method": "sub",
            "data": {
                "mode": "full",
                "instrumentKeys": instrumentKeys
            }
        }

        # Convert data to binary and send over WebSocket
        binary_data = json.dumps(data).encode('utf-8')
        await websocket.send(binary_data)

        # Continuously receive and decode data from WebSocket
        while True:
            message = await websocket.recv()
            decoded_data = decode_protobuf(message)

            # Convert the decoded data to a dictionary
            data_dict = MessageToDict(decoded_data)

            # Print the dictionary representation
            # print(json.dumps(data_dict))
            # exit()

def run_websocket():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(fetch_market_data())

websocket_thread = Thread(target=run_websocket)
websocket_thread.start()

while True:
    # print(websocket_thread)
    time.sleep(2)
    df = get_datafram_from_websocket(data_dict)
    print(df.shape[0])

@Srihari,

Thank you for reaching out.

To assist you further, we require additional details. Please provide the instrument_keys that have not been updated and their relevant timeline information.

Additionally, we suggest upgrading your SDK and utilizing the Websocket Streamer Interfaces introduced in our most recent Python SDK release. This will facilitate a smoother connection to the websocket, simplifying the technical aspects.

Thank you!