# Import necessary modules
import asyncio
import json
import ssl
import upstox_client
import websockets
from google.protobuf.json_format import MessageToDict
from pprint import pprint
import MarketDataFeed_pb2 as pb
from cs50 import SQL
# Configure CS50 Library to use SQLite database
db = SQL("sqlite:///finance.db")
def get_market_data_feed_authorize(api_version, configuration):
"""Get authorization for market data feed."""
api_instance = upstox_client.WebsocketApi(
upstox_client.ApiClient(configuration))
api_response = api_instance.get_market_data_feed_authorize(api_version)
return api_response
def decode_protobuf(buffer):
"""Decode protobuf message."""
feed_response = pb.FeedResponse()
feed_response.ParseFromString(buffer)
return feed_response
async def fetch_market_data(subscribe_instruments):
"""Fetch market data using WebSocket and print it."""
# Create default SSL context
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
# Configure OAuth2 access token for authorization
API_VERSION = "2.0"
# Get market data feed authorization
configuration = upstox_client.Configuration()
ACCESS_TOKEN = db.execute("SELECT ACCESS_TOKEN FROM credentials WHERE CLIENT_ID = 1")[0]["ACCESS_TOKEN"]
configuration.access_token = ACCESS_TOKEN
response = get_market_data_feed_authorize(API_VERSION, configuration)
# Connect to the WebSocket with SSL context
async with websockets.connect(response.data.authorized_redirect_uri, ssl=ssl_context) as websocket:
print('Connection established')
await asyncio.sleep(1) # Wait for 1 second
await subscribe_to_instruments(websocket, subscribe_instruments)
# Continuously receive and decode data from WebSocket
try:
while True:
message = await websocket.recv()
decoded_data = decode_protobuf(message)
# Convert the decoded data to a dictionary
data_dict = MessageToDict(decoded_data)
# Print the dictionary representation
print(json.dumps(data_dict))
#ltp_value = data_dict['feeds']['NSE_INDEX|Nifty 50']['ltpc']['ltp']
#print(ltp_value)
except asyncio.CancelledError:
# Perform cleanup or termination tasks here
print("WebSocket connection is being gracefully shutdown.")
await websocket.close()
async def subscribe_to_instruments(websocket, instruments):
"""Subscribe to a list of instruments."""
data = {
"guid": "someguid",
"method": "sub",
"data": {
"mode": 'ltpc',
"instrumentKeys": instruments
}
}
# Convert data to binary and send over WebSocket
binary_data = json.dumps(data).encode('utf-8')
await websocket.send(binary_data)
async def update_subscription(websocket, new_instruments):
"""Update the subscription to a new list of instruments."""
await websocket.send(json.dumps({"method": "unsub"}).encode('utf-8'))
await asyncio.sleep(1) # Wait for the unsubscription to take effect
await subscribe_to_instruments(websocket, new_instruments)
async def main():
subscribe_instruments = ['NSE_INDEX|Nifty 50']
await fetch_market_data(subscribe_instruments)
# # # Dynamically update subscription to new instruments
new_subscribe_instruments = ['NSE_INDEX|Nifty Bank']
await update_subscription(websocket, new_subscribe_instruments)
if __name__ == "__main__":
# Execute the function to fetch market data
asyncio.run(main())
Thank you for reaching out to us.
Currently, our repository lacks an example of dynamic or on-demand subscription, where a user can initially subscribe to one instrument key and subsequently add more as needed. Iāll take this opportunity to develop and share a sample code illustrating this functionality.
Thanks!
Thanks @shanmu . Iāll be waiting for your code
Here you go: Upstox API - Market Feeder - Sample for Dynamic Subscription Ā· GitHub.
Hope this example clearly showcases how dynamic subscriptions can be handled. This is just one way to do it, so feel free to adjust it to your needs or create a fresh approach as you see fit.
Happy coding, and let me know if it works out for you!
This code work very fine. Thanks @shanmu . I got better understanding how async-await, websocket works. Thanks a lot for fast reply.
Thanks for the code snippet @shanmu . Its working smooth . I have one quick question , How can we subscribe or un subscribe the tokens from the our main program. This file itself run forever unless we close since its all runs in async await.
I am trying to sub or unsub on flight but could not access the methods at runtime. Can you please help me here.
@Vasu_Devan - since you have understood and implemented something . Can you please let me know how you have handled the sub or unsub the tokens while running the algos.
@sajith_raj I dont know why you want change token. maybe you have two upstox account. assuming you have two account, you request token and save it a table or variable. then use it as you need
Thank you @shanmu & @Vasu_Devan for the sharing the code here,
Itās working fine, just one doubt how can we subscribe or unsubscribe insturments from main program as itās talking only with first subscribed instruments only.
Could you please help me on this as I tried multiple ways but not worked for me.