MarketDataStreamer functionality

Hello there

I am trying to get familiar with the MarketDataStreamer functionality and I need a conceptual clarity around the flow of code execution… Please have a look at the code pulled from the official documentation below:

‘’’
import upstox_client
import time

def main():
** configuration = upstox_client.Configuration()**
** access_token = <ACCESS_TOKEN>**
** configuration.access_token = access_token**

** streamer = upstox_client.MarketDataStreamer(**
** upstox_client.ApiClient(configuration))**

** def on_open():**
** print(“Connected. Subscribing to instrument keys.”)**
** streamer.subscribe(**
** [“NSE_EQ|INE020B01018”, “NSE_EQ|INE467B01029”], “full”)**

** # Handle incoming market data messages**
** def on_message(message):**
** print(message)**

** streamer.on(“open”, on_open)**
** streamer.on(“message”, on_message)**

** streamer.connect()**

** time.sleep(5)**
** print(“Unsubscribing from instrument keys.”)**
** 3. 3. streamer.unsubscribe([“NSE_EQ|INE020B01018”, “NSE_EQ|INE467B01029”])**

if name == “main”:
** main()**
‘’’

My understanding about the order of code execution is as follows:

  1. Streamer makes a websocket connection using the set configuration
  2. The connection is established
  3. On_open function is executed and the streamer subscribes to [“NSE_EQ|INE020B01018”, “NSE_EQ|INE467B01029”] in ‘full’ mode
  4. Message Iteration 1 is received from the connection
  5. On_message function is executed
  6. Message Iteration 2 is received from the connection
  7. On_message function is executed
  8. …. And so on….

My question is: where do the following codes fit in the program in the above execution paradigm:

  1. time.sleep(5)
  2. print(“Unsubscribing from instrument keys.”)
  3. streamer.unsubscribe([“NSE_EQ|INE020B01018”, “NSE_EQ|INE467B01029”])

Thanks
Ankush

Hello @Ankush_Agarwal,

You have understood the flow correctly. Regarding your questions, all the WebSocket functions provided by our SDK are asynchronous and event-based. This means the on_message function listens for message events asynchronously. When it receives a message, it will execute the code you have written inside the function.

Therefore, after 5 seconds of opening the WebSocket connection, the unsubscribe function will be called.

I hope this answers your questions. If you have any doubts, please feel free to ask.

Thanks.

@Ketan thanks for this…

One question: I assume that the listening frequency of the streamer would be continuous… Is there a way I discretize the listening frequency of the streamer? For instance, the streamer should receive messages every 5 seconds instead of continuous…

The reason why I am asking is that the database where I am hardcoding the messages will have some latency…

Thanks
Ankush