I want to dynamically update the subscription to websocket. Review the code give your solution

# Import necessary modules
import asyncio
import json
import ssl
import upstox_client
import websockets
from google.protobuf.json_format import MessageToDict
from pprint import pprint
import MarketDataFeed_pb2 as pb
from cs50 import SQL


# Configure CS50 Library to use SQLite database
db = SQL("sqlite:///finance.db")

def get_market_data_feed_authorize(api_version, configuration):
    """Get authorization for market data feed."""
    api_instance = upstox_client.WebsocketApi(
        upstox_client.ApiClient(configuration))
    api_response = api_instance.get_market_data_feed_authorize(api_version)
    return api_response


def decode_protobuf(buffer):
    """Decode protobuf message."""
    feed_response = pb.FeedResponse()
    feed_response.ParseFromString(buffer)
    return feed_response


async def fetch_market_data(subscribe_instruments):
    """Fetch market data using WebSocket and print it."""

    # Create default SSL context
    ssl_context = ssl.create_default_context()
    ssl_context.check_hostname = False
    ssl_context.verify_mode = ssl.CERT_NONE

    # Configure OAuth2 access token for authorization

    API_VERSION = "2.0"
    # Get market data feed authorization
    configuration = upstox_client.Configuration()
    ACCESS_TOKEN = db.execute("SELECT ACCESS_TOKEN FROM credentials WHERE CLIENT_ID = 1")[0]["ACCESS_TOKEN"]
    configuration.access_token = ACCESS_TOKEN

    response = get_market_data_feed_authorize(API_VERSION, configuration)

    # Connect to the WebSocket with SSL context
    async with websockets.connect(response.data.authorized_redirect_uri, ssl=ssl_context) as websocket:
        print('Connection established')

        await asyncio.sleep(1)  # Wait for 1 second

        await subscribe_to_instruments(websocket, subscribe_instruments)

        # Continuously receive and decode data from WebSocket
        try:
            while True:
                message = await websocket.recv()
                decoded_data = decode_protobuf(message)

                # Convert the decoded data to a dictionary
                data_dict = MessageToDict(decoded_data)

                # Print the dictionary representation
                print(json.dumps(data_dict))
                #ltp_value = data_dict['feeds']['NSE_INDEX|Nifty 50']['ltpc']['ltp']
                #print(ltp_value)
        except asyncio.CancelledError:
            # Perform cleanup or termination tasks here
            print("WebSocket connection is being gracefully shutdown.")
            await websocket.close()

async def subscribe_to_instruments(websocket, instruments):
    """Subscribe to a list of instruments."""
    data = {
        "guid": "someguid",
        "method": "sub",
        "data": {
            "mode": 'ltpc',
            "instrumentKeys": instruments
        }
    }

    # Convert data to binary and send over WebSocket
    binary_data = json.dumps(data).encode('utf-8')
    await websocket.send(binary_data)

async def update_subscription(websocket, new_instruments):
    """Update the subscription to a new list of instruments."""
    await websocket.send(json.dumps({"method": "unsub"}).encode('utf-8'))
    await asyncio.sleep(1)  # Wait for the unsubscription to take effect
    await subscribe_to_instruments(websocket, new_instruments)

async def main():
    subscribe_instruments = ['NSE_INDEX|Nifty 50']
    await fetch_market_data(subscribe_instruments)

    # # # Dynamically update subscription to new instruments
    new_subscribe_instruments = ['NSE_INDEX|Nifty Bank']
    await update_subscription(websocket, new_subscribe_instruments)

if __name__ == "__main__":
    # Execute the function to fetch market data
    asyncio.run(main())


@Vasu_Devan

Thank you for reaching out to us.

Currently, our repository lacks an example of dynamic or on-demand subscription, where a user can initially subscribe to one instrument key and subsequently add more as needed. I’ll take this opportunity to develop and share a sample code illustrating this functionality.

Thanks!

1 Like

Thanks @shanmu . I’ll be waiting for your code :grinning:

@Vasu_Devan

Here you go: Upstox API - Market Feeder - Sample for Dynamic Subscription · GitHub.

Hope this example clearly showcases how dynamic subscriptions can be handled. This is just one way to do it, so feel free to adjust it to your needs or create a fresh approach as you see fit.

Happy coding, and let me know if it works out for you!

This code work very fine. Thanks @shanmu . I got better understanding how async-await, websocket works. Thanks a lot for fast reply.

Thanks for the code snippet @shanmu . Its working smooth . I have one quick question , How can we subscribe or un subscribe the tokens from the our main program. This file itself run forever unless we close since its all runs in async await.

I am trying to sub or unsub on flight but could not access the methods at runtime. Can you please help me here.

@Vasu_Devan - since you have understood and implemented something . Can you please let me know how you have handled the sub or unsub the tokens while running the algos.

@sajith_raj I dont know why you want change token. maybe you have two upstox account. assuming you have two account, you request token and save it a table or variable. then use it as you need