For the below code is working fine while the market is offline. However, when the market is online, I’m encountering an issue. Despite sending requests for 41 instruments with mode:full, I’m only receiving details for half of the instruments in each loop.
import asyncio
import json
import ssl
import upstox_client
import websockets
from google.protobuf.json_format import MessageToDict
from mkt_up_func import *
from threading import Thread
import pandas as pd
import time, os
import MarketDataFeed_pb2 as pb
start_time = time.time()
pd.set_option('display.max_rows', 10) # Set the maximum number of rows to display
pd.set_option('display.max_columns', 10)
op_str1_file_entry = "op_strategy_entry.csv"
op_str1_file = "op_strategy1.csv"
op_str1_file_bkp = "op_strategy1_bkp.csv"
op_str2_entry = False
global instrumentKeys
# instrumentKeys = ["NSE_INDEX|Nifty Bank", "NSE_INDEX|Nifty 50"]
instrumentKeys = ["NSE_INDEX|Nifty Bank"]
token = token_file('Test')
if token == "Get Token":
token = get_new_token()
def get_opt_bank_instrument_list(token, instrumentKeys, expiry):
try:
response = get_instrument_price(token, instrumentKeys)
except Exception as e:
print("Exception get_instrument_price: ", e)
price = response['data']["NSE_INDEX:Nifty Bank"]['last_price']
price = round(price / 100) * 100
symbol_list = []
strike_price_list = []
ce = pe = price
for i in range(20):
ce = ce + 100
pe = pe - 100
symbol_list.append(expiry + str(ce) + "CE")
symbol_list.append(expiry + str(pe) + "PE")
strike_price_list.append(ce)
strike_price_list.append(pe)
# op_instrumentKeys = get_instrument_key_option(symbol_list)
op_df = pd.DataFrame({'tr_symbol': symbol_list, 'strike_price': strike_price_list})
op_df = get_instrument_key_option(op_df)
instrumentKeys.append("Nifty Bank")
instrumentKeys.append(0)
new_df = pd.DataFrame([instrumentKeys], columns=op_df.columns)
instrument_df = pd.concat([op_df, new_df], ignore_index=True)
return instrument_df
expiry = "BANKNIFTY24APR"
# instrumentKeys updated with options Keys
instrument_df = get_opt_bank_instrument_list(token, instrumentKeys, expiry)
instrumentKeys = instrument_df['instrument_key'].to_list()
def get_datafram_from_websocket(json_data):
# Parse JSON data
data = json_data
# Initialize an empty list to store data for all symbols
rows = []
try:
# Iterate through each symbol in the feeds dictionary
for code, symbol_data in data['feeds'].items():
# if code == 'NSE_INDEX|Nifty Bank':
# print("Here")
# Extract relevant data for the symbol
if 'marketFF' in symbol_data.get('ff', {}):
ltpc_data = symbol_data['ff']['marketFF'].get('ltpc', {})
option_greeks_data = symbol_data['ff']['marketFF'].get('optionGreeks', {})
values = {
'code': code,
'ltp': ltpc_data.get('ltp'),
'ltt': ltpc_data.get('ltt'),
'ltq': int(ltpc_data.get('ltq', 0)),
'cp': ltpc_data.get('cp'),
'delta': option_greeks_data.get('delta'),
'theta': option_greeks_data.get('theta'),
'gamma': option_greeks_data.get('gamma'),
'vega': option_greeks_data.get('vega')
}
# Append the data for the symbol to the list
rows.append(values)
elif 'indexFF' in symbol_data.get('ff', {}):
ltpc_data = symbol_data['ff']['indexFF'].get('ltpc', {})
values = {
'code': code,
'ltp': ltpc_data.get('ltp'),
'ltt': ltpc_data.get('ltt'),
'ltq': int(ltpc_data.get('ltq', 0)),
'cp': ltpc_data.get('cp'),
'delta': 0,
'theta': 0,
'gamma': 0,
'vega': 0
}
rows.append(values)
except Exception as e:
print("Exception in 'get_datafram_from_websocket' : ", e)
# Create DataFrame from the list of dictionaries
df = pd.DataFrame(rows)
# print(df)
return df
def get_market_data_feed_authorize(api_version, configuration):
"""Get authorization for market data feed."""
api_instance = upstox_client.WebsocketApi(
upstox_client.ApiClient(configuration))
api_response = api_instance.get_market_data_feed_authorize(api_version)
return api_response
def decode_protobuf(buffer):
"""Decode protobuf message."""
feed_response = pb.FeedResponse()
feed_response.ParseFromString(buffer)
return feed_response
async def fetch_market_data():
"""Fetch market data using WebSocket and print it."""
global data_dict
# Create default SSL context
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
# Configure OAuth2 access token for authorization
configuration = upstox_client.Configuration()
api_version = '2.0'
configuration.access_token = token
# Get market data feed authorization
response = get_market_data_feed_authorize(
api_version, configuration)
# Connect to the WebSocket with SSL context
async with websockets.connect(response.data.authorized_redirect_uri, ssl=ssl_context) as websocket:
print('Connection established')
await asyncio.sleep(1) # Wait for 1 second
# Data to be sent over the WebSocket
data = {
"guid": "someguid",
"method": "sub",
"data": {
"mode": "full",
"instrumentKeys": instrumentKeys
}
}
# Convert data to binary and send over WebSocket
binary_data = json.dumps(data).encode('utf-8')
await websocket.send(binary_data)
# Continuously receive and decode data from WebSocket
while True:
message = await websocket.recv()
decoded_data = decode_protobuf(message)
# Convert the decoded data to a dictionary
data_dict = MessageToDict(decoded_data)
# Print the dictionary representation
# print(json.dumps(data_dict))
# exit()
def run_websocket():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(fetch_market_data())
websocket_thread = Thread(target=run_websocket)
websocket_thread.start()
while True:
# print(websocket_thread)
time.sleep(2)
df = get_datafram_from_websocket(data_dict)
print(df.shape[0])