I want to dynamically update the subscription to websocket. Review the code give your solution

# Import necessary modules
import asyncio
import json
import ssl
import upstox_client
import websockets
from google.protobuf.json_format import MessageToDict
from pprint import pprint
import MarketDataFeed_pb2 as pb
from cs50 import SQL


# Configure CS50 Library to use SQLite database
db = SQL("sqlite:///finance.db")

def get_market_data_feed_authorize(api_version, configuration):
    """Get authorization for market data feed."""
    api_instance = upstox_client.WebsocketApi(
        upstox_client.ApiClient(configuration))
    api_response = api_instance.get_market_data_feed_authorize(api_version)
    return api_response


def decode_protobuf(buffer):
    """Decode protobuf message."""
    feed_response = pb.FeedResponse()
    feed_response.ParseFromString(buffer)
    return feed_response


async def fetch_market_data(subscribe_instruments):
    """Fetch market data using WebSocket and print it."""

    # Create default SSL context
    ssl_context = ssl.create_default_context()
    ssl_context.check_hostname = False
    ssl_context.verify_mode = ssl.CERT_NONE

    # Configure OAuth2 access token for authorization

    API_VERSION = "2.0"
    # Get market data feed authorization
    configuration = upstox_client.Configuration()
    ACCESS_TOKEN = db.execute("SELECT ACCESS_TOKEN FROM credentials WHERE CLIENT_ID = 1")[0]["ACCESS_TOKEN"]
    configuration.access_token = ACCESS_TOKEN

    response = get_market_data_feed_authorize(API_VERSION, configuration)

    # Connect to the WebSocket with SSL context
    async with websockets.connect(response.data.authorized_redirect_uri, ssl=ssl_context) as websocket:
        print('Connection established')

        await asyncio.sleep(1)  # Wait for 1 second

        await subscribe_to_instruments(websocket, subscribe_instruments)

        # Continuously receive and decode data from WebSocket
        try:
            while True:
                message = await websocket.recv()
                decoded_data = decode_protobuf(message)

                # Convert the decoded data to a dictionary
                data_dict = MessageToDict(decoded_data)

                # Print the dictionary representation
                print(json.dumps(data_dict))
                #ltp_value = data_dict['feeds']['NSE_INDEX|Nifty 50']['ltpc']['ltp']
                #print(ltp_value)
        except asyncio.CancelledError:
            # Perform cleanup or termination tasks here
            print("WebSocket connection is being gracefully shutdown.")
            await websocket.close()

async def subscribe_to_instruments(websocket, instruments):
    """Subscribe to a list of instruments."""
    data = {
        "guid": "someguid",
        "method": "sub",
        "data": {
            "mode": 'ltpc',
            "instrumentKeys": instruments
        }
    }

    # Convert data to binary and send over WebSocket
    binary_data = json.dumps(data).encode('utf-8')
    await websocket.send(binary_data)

async def update_subscription(websocket, new_instruments):
    """Update the subscription to a new list of instruments."""
    await websocket.send(json.dumps({"method": "unsub"}).encode('utf-8'))
    await asyncio.sleep(1)  # Wait for the unsubscription to take effect
    await subscribe_to_instruments(websocket, new_instruments)

async def main():
    subscribe_instruments = ['NSE_INDEX|Nifty 50']
    await fetch_market_data(subscribe_instruments)

    # # # Dynamically update subscription to new instruments
    new_subscribe_instruments = ['NSE_INDEX|Nifty Bank']
    await update_subscription(websocket, new_subscribe_instruments)

if __name__ == "__main__":
    # Execute the function to fetch market data
    asyncio.run(main())


@Vasu_Devan

Thank you for reaching out to us.

Currently, our repository lacks an example of dynamic or on-demand subscription, where a user can initially subscribe to one instrument key and subsequently add more as needed. I’ll take this opportunity to develop and share a sample code illustrating this functionality.

Thanks!

1 Like

Thanks @shanmu . I’ll be waiting for your code :grinning:

@Vasu_Devan

Here you go: Upstox API - Market Feeder - Sample for Dynamic Subscription Ā· GitHub.

Hope this example clearly showcases how dynamic subscriptions can be handled. This is just one way to do it, so feel free to adjust it to your needs or create a fresh approach as you see fit.

Happy coding, and let me know if it works out for you!

This code work very fine. Thanks @shanmu . I got better understanding how async-await, websocket works. Thanks a lot for fast reply.

1 Like

Thanks for the code snippet @shanmu . Its working smooth . I have one quick question , How can we subscribe or un subscribe the tokens from the our main program. This file itself run forever unless we close since its all runs in async await.

I am trying to sub or unsub on flight but could not access the methods at runtime. Can you please help me here.

@Vasu_Devan - since you have understood and implemented something . Can you please let me know how you have handled the sub or unsub the tokens while running the algos.

@sajith_raj I dont know why you want change token. maybe you have two upstox account. assuming you have two account, you request token and save it a table or variable. then use it as you need

Thank you @shanmu & @Vasu_Devan for the sharing the code here,
It’s working fine, just one doubt how can we subscribe or unsubscribe insturments from main program as it’s talking only with first subscribed instruments only.
Could you please help me on this as I tried multiple ways but not worked for me.