Hi I need a help with the below code. Below code helps in running the websocket while I execute my strategy in a separate code. The problem I face is to close the websocket connection. I want to define a websocket close function and also make the ‘instr’ attribute used to declare the list of instruments to subscribe to dynamic. This way when my strategy takes a new position, the new instrument gets added to instr and then next iteration of websocket pulls the data for the new instrument id as well.
def get_market_data_feed_authorize(api_version, configuration):
api_instance = upstox_client.WebsocketApi(
upstox_client.ApiClient(configuration))
api_response = api_instance.get_market_data_feed_authorize(api_version)
return api_response
def decode_protobuf(buffer):
feed_response = pb.FeedResponse()
feed_response.ParseFromString(buffer)
return feed_response
for slop in options_sold:
if slop[‘SL’] ==0 or slop[‘SL’] is None :
slop[‘SL’]= (get_factor(slop[‘selling_price’]) * slop[‘selling_price’])
async def fetch_market_data():
global livdata, upband, lowband, exp_date, bnf_spot, cce_s_instr, hdg_put, cpe_b_instr, adj_count, b_spot, data_dict, er, current_time, current_day, options_sold, bmark, shift, move, adj_strk, new_strk, eod_min, eod_adj_strk, pe_score, ce_score, ce_decay
global pe_decay, trig_cond, new_instruments, ce_decay, pe_decay, profit, curr_profit, dfz, away_pt, ce_sl, pe_sl, new_strk_p, adj_pr, p_che, u_che, profit_tgt, prev_cp, latest_volume, latest_ts, filtered_data
global sl_dist, upgap, dngap, sl_gap, sl_che, sl_time, ub_time, db_time, buy_trig, buy_pos, strt_ce_buy, strt_pe_buy, close_buy, other_profit, slo_time, new_strk_p_hdg, hdg_vue, tot_profit, carry, ov_sl, tot_qsold, max_profit, sl_pts
global stop_websocket
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
ping_timeout = 100 # in seconds
# Configure OAuth2 access token for authorization: OAUTH2
configuration = upstox_client.Configuration()
api_version = '2.0'
configuration.access_token = access_token
# Get market data feed authorize
market_data_feed_authorize = get_market_data_feed_authorize(
api_version, configuration)
# async def ping(websocket):
# while True:
# await websocket.send('{"message":"PING"}')
# print('------ ping')
# await asyncio.sleep(5)
# Connect to the WebSocket with SSL context
async with websockets.connect(market_data_feed_authorize.data.authorized_redirect_uri, ssl=ssl_context, ping_timeout=60) as websocket:
#rc = rc + 1
now = datetime.datetime.now()
current_time = now.strftime("%H:%M")
current_day = now.weekday() # 0 for Monday, 1 for Tuesday, ..., 6 for Sunday
#clear_output(wait=True)
data = {
"guid": "someguid",
"method": "sub",
"data": {
"mode": "full","instrumentKeys": instr
}
}
binary_data = json.dumps(data).encode('utf-8')
await websocket.send(binary_data)
while True:
message = await websocket.recv()
decoded_data = decode_protobuf(message)
# Convert the decoded data to a dictionary
data_dict = MessageToDict(decoded_data)
def run_websocket():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(fetch_market_data())
Start the WebSocket connection in a separate thread
websocket_thread = Thread(target=run_websocket)
websocket_thread.start()
def stop_websocket(stop_event):
stop_event.set()
websocket_thread.join() # Wait for the thread to finish