Can't use two instrument at once in websocket

import asyncio
import json
import ssl
import upstox_client
import websockets
from google.protobuf.json_format import MessageToDict
import MarketDataFeed_pb2 as pb
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
import os
import sys
from timing_holiday import Market_open_close
import pytz
import time

# External files
from get_fund import get_fund, get_fund_data
from buy_sell import buy, sell, get_order_details, modify_order, cancel_order

def access_token():
    """this is the access token which is used for getting the tokens.
        str: provides the access_token
    # Load the access token from the JSON file
    with open('jsons/access_token.json', 'r') as file:
        data = json.load(file)
        access_token = data['access_token']
    return access_token

def get_market_data_feed_authorize(api_version, configuration):
    """this will get the Market data feed by authorizing.

        api_version (str): provides the version of the api which is v2
        configuration (str): here it provides the access token

        dict: provides the json response.
    api_instance = upstox_client.WebsocketApi(upstox_client.ApiClient(configuration))
    api_response = api_instance.get_market_data_feed_authorize(api_version)
    return api_response

def decode_protobuf(buffer):
    """decodes the protobuf from the upstox"""
    feed_response = pb.FeedResponse()
    return feed_response

def convert_timestamp(ts):
    """converts the timestamp seconds to strftime.

        ts (int): provide the timestamp

        str: get str version of timestamp and hours
    timestamp_seconds = int(ts) / 1000
    dt_object = datetime.fromtimestamp(timestamp_seconds)
    return dt_object.strftime("%Y-%m-%d %H:%M:%S")

def extract_ohlc_data(data_dict):
        feeds = data_dict['feeds']
        instrument_key = next(iter(feeds))
        ohlc_data = feeds[instrument_key]['ff']['marketFF']['marketOHLC']['ohlc']
        one_minute_data = next((item for item in ohlc_data if item['interval'] == 'I1'), None)
        if one_minute_data:
            one_minute_data['datetime'] = convert_timestamp(one_minute_data['ts'])
            return {
                'instrument_key': instrument_key,
                'one_minute_ohlc': one_minute_data
    except (KeyError, StopIteration):
    return None

class OHLCDataCollector:
    def __init__(self): = []

    def add_data(self, ohlc_data):['one_minute_ohlc'])

    def get_dataframe(self):
        df = pd.DataFrame(
        df['datetime'] = pd.to_datetime(df['datetime'])
        df.set_index('datetime', inplace=True)
        for col in ['open', 'high', 'low', 'close']:
            df[col] = pd.to_numeric(df[col])
        return df
def convert_to_timeframe(df, timeframe):
    if timeframe == '1m':
        return df  # No conversion needed for 1-minute data
    elif timeframe == '5m':
        rule = '5min'
    elif timeframe == '30m':
        rule = '30min'
    elif timeframe == '15m':
        rule = '15min'
    elif timeframe == '1h':
        rule = 'h'
        raise ValueError(f"Unsupported timeframe: {timeframe}")

    resampled = df.resample(rule).agg({
        'open': 'first',
        'high': 'max',
        'low': 'min',
        'close': 'last'
    return resampled.dropna()

def round_time(dt, timeframe):
    if timeframe == '1m':
        return dt.replace(second=0, microsecond=0)
    elif timeframe == '5m':
        return dt.replace(minute=dt.minute // 5 * 5, second=0, microsecond=0)
    elif timeframe == '15m':
        return dt.replace(minute=dt.minute // 15 * 15, second=0, microsecond=0)
    elif timeframe == '30m':
        return dt.replace(minute=(dt.minute // 30) * 30, second=0, microsecond=0)
    elif timeframe == '1h':
        return dt.replace(minute=0, second=0, microsecond=0)
        raise ValueError(f"Unsupported timeframe: {timeframe}")

def calculate_heikin_ashi(df):
    ha_close = (df['open'] + df['high'] + df['low'] + df['close']) / 4
    ha_open = np.zeros(len(df))
    ha_open[0] = df['open'].iloc[0]
    for i in range(1, len(df)):
        ha_open[i] = (ha_open[i-1] + ha_close.iloc[i-1]) / 2
    ha_high = np.maximum(df['high'], np.maximum(ha_open, ha_close))
    ha_low = np.minimum(df['low'], np.minimum(ha_open, ha_close))

    ha_open_rounded = np.round(ha_open, 2)
    ha_close_rounded = np.round(ha_close, 2)

    ha_df = pd.DataFrame({
        'HA_Open': ha_open_rounded,
        'HA_High': ha_high,
        'HA_Low': ha_low,
        'HA_Close': ha_close_rounded
    }, index=df.index)
    return ha_df

def format_output_row(datetime_str, ha_open, ha_high, ha_low, ha_close):
    return f"{datetime_str:<25} {ha_open:>8.2f} {ha_high:>8.2f} {ha_low:>8.2f} {ha_close:>8.2f}"

class Colors:
    RESET = "\033[0m"
    RED = "\033[91m"
    GREEN = "\033[92m"
    YELLOW = "\033[93m"  # Add this line to define YELLOW color

def supports_color():
    Returns True if the running system's terminal supports color,
    and False otherwise.
    plat = sys.platform
    supported_platform = plat != 'Pocket PC' and (plat != 'win32' or 'ANSICON' in os.environ)
    is_a_tty = hasattr(sys.stdout, 'isatty') and sys.stdout.isatty()
    return supported_platform and is_a_tty

def get_saved_order_id(stock_number, order_type):
    filename = f'orders/order_id_{stock_number}_{order_type}.json'
    if os.path.exists(filename):
        with open(filename, 'r') as file:
            order_data = json.load(file)
            return order_data.get('order_id')
    return None

class TradingStrategy:
    def __init__(self, balance, buy_offset, stop_loss_offset, instrument_key, quantity):
        self.balance = balance
        self.position = None
        self.buy_price = 0
        self.stop_loss = 0
        self.buy_offset = buy_offset
        self.stop_loss_offset = stop_loss_offset
        self.use_color = supports_color()
        self.instrument_key = instrument_key
        self.access_token = access_token()
        self.buy_order_id = None
        self.stop_loss_order_id = None
        self.quantity = quantity

    def get_stock_number(self):
        return self.instrument_key.split('|')[-1]

    def check_order_status(self, order_id):
        order_details = get_order_details(self.access_token, order_id)
        if order_details['status'] == 'success':
            return order_details['data']['status']
        return 'unknown'

    def place_stop_loss_order(self, trigger_price):
        stop_loss_price = round(max(trigger_price - self.stop_loss_offset, 0.1), 1)
        trigger_price = round(trigger_price, 1)
        self.stop_loss_order_id = sell(self.access_token, self.instrument_key, self.quantity, trigger_price, self.stop_loss_offset, self.get_stock_number())

        if self.stop_loss_order_id:
            self.stop_loss = stop_loss_price
            self.log_action(f"Stop-loss set at trigger: {trigger_price}, price: {stop_loss_price}", None, Colors.YELLOW if self.use_color else None)
            self.log_action("Failed to place stop-loss order", None, Colors.RED if self.use_color else None)
    async def process_candle(self, current_candle, previous_candle):
        # Check stop-loss order status if a position is open
        if self.position == "bought" and self.stop_loss_order_id:
            stop_loss_status = self.check_order_status(self.stop_loss_order_id)
            self.log_action(f"Stop-loss order status: {stop_loss_status}", None, Colors.YELLOW if self.use_color else None)

            if stop_loss_status == 'complete':
                sell_price = self.stop_loss
                profit = sell_price - self.buy_price
                self.balance += profit
                self.log_action("Stop-loss triggered", sell_price, Colors.RED if self.use_color else None)
                self.log_action(f"Trade closed. Profit: {profit:.2f}", None, Colors.RED if self.use_color else None)
                self.position = None
                self.buy_order_id = None
                self.stop_loss_order_id = None
            elif stop_loss_status == 'rejected':
                self.log_action("Stop-loss order rejected. Placing a new stop-loss order.", None, Colors.YELLOW if self.use_color else None)

        # Modify stop-loss order if a position is open, the stop loss order is not complete, and conditions are met
        if self.position == "bought" and self.stop_loss_order_id and stop_loss_status not in ['complete', 'rejected']:
            new_stop_loss_trigger = round(current_candle['HA_Low'], 1)
            new_stop_loss_price = round(new_stop_loss_trigger - self.stop_loss_offset, 1)
            # Use SL order type
            modified_order_id = modify_order(self.access_token, self.stop_loss_order_id, self.quantity, price=new_stop_loss_price, trigger_price=new_stop_loss_trigger)
            if modified_order_id:
                self.stop_loss = new_stop_loss_price
                self.log_action(f"Stop-loss modified to trigger: {new_stop_loss_trigger}, price: {new_stop_loss_price}", None, Colors.YELLOW if self.use_color else None)
                self.log_action("Failed to modify stop-loss order", None, Colors.RED if self.use_color else None)

        if self.position is None and self.buy_order_id:
            max_retries = 3
            retry_delay = 120  # seconds
            for attempt in range(max_retries):
                    buy_status = self.check_order_status(self.buy_order_id)
                    if buy_status == 'complete':
                        self.position = "bought"
                        self.buy_price = round(previous_candle['HA_High'] + self.buy_offset, 1)
                        self.log_action("Buy order completed", self.buy_price, Colors.GREEN if self.use_color else None)

                        # Place stop-loss order
                        break  # Exit the retry loop if buy order is complete
                    elif buy_status in ['cancelled', 'rejected']:
                        self.log_action(f"Buy order {buy_status}", None, Colors.RED if self.use_color else None)
                        self.buy_order_id = None
                        break  # Exit the retry loop if the order is cancelled or rejected
                    elif buy_status == 'trigger pending':
                        self.log_action(f"Buy order still trigger pending", None, Colors.YELLOW if self.use_color else None)
                        if attempt == max_retries - 1:  # If it's the last attempt
                            self.log_action("Max retries reached for buy order. Cancelling the order.", None, Colors.YELLOW if self.use_color else None)
                            cancel_result = cancel_order(self.access_token, self.buy_order_id)
                            if cancel_result.get('status') == 'success':
                                self.log_action("Buy order cancelled successfully", None, Colors.YELLOW if self.use_color else None)
                                self.buy_order_id = None
                                self.log_action(f"Failed to cancel buy order: {cancel_result.get('message', 'Unknown error')}", None, Colors.RED if self.use_color else None)
                            self.log_action(f"Retrying buy order check in {retry_delay} seconds...", None, Colors.YELLOW if self.use_color else None)
                            await asyncio.sleep(retry_delay)
                        self.log_action(f"Buy order status: {buy_status}", None, Colors.YELLOW if self.use_color else None)
                        if attempt < max_retries - 1:  # If not the last attempt
                            self.log_action(f"Retrying buy order check in {retry_delay} seconds...", None, Colors.YELLOW if self.use_color else None)
                            await asyncio.sleep(retry_delay)
                            self.log_action("Max retries reached for buy order. Cancelling the order.", None, Colors.YELLOW if self.use_color else None)
                            cancel_result = cancel_order(self.access_token, self.buy_order_id)
                            if cancel_result.get('status') == 'success':
                                self.log_action("Buy order cancelled successfully", None, Colors.YELLOW if self.use_color else None)
                                self.buy_order_id = None
                                self.log_action(f"Failed to cancel buy order: {cancel_result.get('message', 'Unknown error')}", None, Colors.RED if self.use_color else None)
                except Exception as e:
                    self.log_action(f"Error checking buy order status: {str(e)}", None, Colors.RED if self.use_color else None)
                    if attempt < max_retries - 1:
                        self.log_action(f"Retrying in {retry_delay} seconds...", None, Colors.YELLOW if self.use_color else None)
                        await asyncio.sleep(retry_delay)
                        self.log_action("Max retries reached for buy order. Cancelling the order.", None, Colors.YELLOW if self.use_color else None)
                        cancel_result = cancel_order(self.access_token, self.buy_order_id)
                        if cancel_result.get('status') == 'success':
                            self.log_action("Buy order cancelled successfully", None, Colors.YELLOW if self.use_color else None)
                            self.buy_order_id = None
                            self.log_action(f"Failed to cancel buy order: {cancel_result.get('message', 'Unknown error')}", None, Colors.RED if self.use_color else None)

        elif self.position is None and current_candle['HA_High'] > previous_candle['HA_High'] and not self.buy_order_id:
            # Place a new buy order only if there's no existing buy order
            buy_price = round(previous_candle['HA_High'] + self.buy_offset, 1)
            self.buy_order_id = buy(self.access_token, self.instrument_key, 1, previous_candle['HA_High'], self.buy_offset, self.get_stock_number())
            if self.buy_order_id:
                self.log_action(f"New buy order placed with ID: {self.buy_order_id}", buy_price, Colors.GREEN if self.use_color else None)
                self.log_action("Failed to place new buy order", None, Colors.RED if self.use_color else None)

    def log_action(self, message, price=None, color=None):
        timestamp ="%Y-%m-%d %H:%M:%S")
        if price is not None:
            log_message = f"[{timestamp}] {self.instrument_key}: {message} at {price:.2f}, Balance: {self.balance:.2f}"
            log_message = f"[{timestamp}] {self.instrument_key}: {message}"
        if color and self.use_color:

async def run_trading_strategy_for_instrument(start_time, end_time, instrument_key, strategy, buy_offset, stop_loss_offset, timeframe='15m'):
    while True:
            ssl_context = ssl.create_default_context()
            ssl_context.check_hostname = False
            ssl_context.verify_mode = ssl.CERT_NONE

            configuration = upstox_client.Configuration()
            api_version = '2.0'
            configuration.access_token = access_token()

            response = get_market_data_feed_authorize(api_version, configuration)
            ohlc_collector = OHLCDataCollector()
            first_print = True
            previous_candle = None

            async with websockets.connect(, ssl=ssl_context) as websocket:
                print(f'Connection established for {instrument_key} with timeframe {timeframe}')

                await asyncio.sleep(1)

                data = {
                    "guid": "someguid",
                    "method": "sub",
                    "data": {
                        "mode": "full",
                        "instrumentKeys": [instrument_key]

                binary_data = json.dumps(data).encode('utf-8')
                await websocket.send(binary_data)

                current_time =
                next_process_time = get_next_interval(current_time, timeframe)
                while current_time <= end_time:
                    if current_time < start_time:
                        await asyncio.sleep((start_time - current_time).total_seconds())
                        current_time =

                    if current_time >= next_process_time:
                        df = ohlc_collector.get_dataframe()
                        if not df.empty:
                            df = convert_to_timeframe(df, timeframe)
                            ha_df = calculate_heikin_ashi(df)
                            current_candle = ha_df.iloc[-1]

                            if first_print:
                                print(f"{instrument_key} ({timeframe}): {'datetime':<25} {'HA_Open':>8} {'HA_High':>8} {'HA_Low':>8} {'HA_Close':>8}")
                                first_print = False

                            print(f"{instrument_key} ({timeframe}): {format_output_row(next_process_time.strftime('%Y-%m-%d %H:%M:%S'), current_candle['HA_Open'], current_candle['HA_High'], current_candle['HA_Low'], current_candle['HA_Close'])}")

                            if previous_candle is not None:
                                await strategy.process_candle(current_candle, previous_candle)

                            previous_candle = current_candle

                        next_process_time = get_next_interval(next_process_time, timeframe)
                        ohlc_collector = OHLCDataCollector()  # Reset collector for the next interval
                    # Process incoming data
                        message = await asyncio.wait_for(websocket.recv(), timeout=1.0)
                        decoded_data = decode_protobuf(message)
                        data_dict = MessageToDict(decoded_data)
                        ohlc_data = extract_ohlc_data(data_dict)
                        if ohlc_data:
                            for key in ['open', 'high', 'low', 'close']:
                                ohlc_data['one_minute_ohlc'][key] = round(float(ohlc_data['one_minute_ohlc'][key]), 2)
                    except asyncio.TimeoutError:
                        pass  # No data received, continue to next iteration

                    await asyncio.sleep(0.1)  # Small delay to prevent CPU overuse
                    current_time =

                print(f"End time reached for {instrument_key}. Stopping the script.")
                break  # Exit the while True loop if we've reached the end time
        except websockets.ConnectionClosedError:
            print(f"WebSocket connection closed for {instrument_key}. Attempting to reconnect...")
            await asyncio.sleep(5)  # Wait for 5 seconds before attempting to reconnect
        except Exception as e:
            print(f"An error occurred for {instrument_key}: {str(e)}. Attempting to reconnect...")
            await asyncio.sleep(5)  # Wait for 5 seconds before attempting to reconnect

def get_next_interval(current_time, interval):
    if interval == '1m':
        return current_time.replace(second=0, microsecond=0) + timedelta(minutes=1)
    elif interval == '5m':
        return current_time.replace(minute=current_time.minute - (current_time.minute % 5), second=0, microsecond=0) + timedelta(minutes=5)
    elif interval == '15m':
        return current_time.replace(minute=current_time.minute - (current_time.minute % 15), second=0, microsecond=0) + timedelta(minutes=15)
    elif interval == '30m':
        return current_time.replace(minute=current_time.minute - (current_time.minute % 30), second=0, microsecond=0) + timedelta(minutes=30)
    elif interval == '1h':
        return current_time.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1)
        raise ValueError(f"Unsupported interval: {interval}")

def get_json_value(key, value):
        with open('jsons/key.json', 'r') as file:
            data = json.load(file)
        # Get the content of key_1 or key_2
        content = data.get(key, {})
        # Get the specific value from the content
        result = content.get(value)
        return result
    except FileNotFoundError:
        print("Error: File 'key.json' not found.")
        return None
    except json.JSONDecodeError:
        print("Error: Invalid JSON format in 'key.json'.")
        return None

# Load Market_status.json
with open('jsons/Market_status.json', 'r') as f:
    market_status = json.load(f)

# load setting.json
setting = {}
if os.path.exists('jsons/setting.json'):
    with open('jsons/setting.json', 'r') as f:
        setting = json.load(f)

# Load key.json
with open('jsons/key.json', 'r') as f:
    key_data = json.load(f)

# Function to get market times for a given exchange name
def get_market_times(exchange_name):
    for data in market_status["data"]:
        if data["exchange"] == exchange_name:
            return data["start_time"], data["end_time"]
    return None, None

# Function to get values from key.json
def get_json_value(key, field):
    return key_data.get(key, {}).get(field, None)

async def live_data():
    token = access_token()
    available_margin = get_fund_data("commodity", "available_margin")
    ist = pytz.timezone('Asia/Kolkata')
    current_date ="%Y-%m-%d")
    market_status_response = Market_open_close(current_date)
    shared_balance = available_margin

    if market_status_response["status"] == "success" and "Market is open" in market_status_response["details"]:
        print("Market is open. Running the script...")
        # Load settings
        with open('jsons/setting.json', 'r') as f:
            setting = json.load(f)

        # Check if stock_name_2 exists in settings
        has_second_stock = 'stock_name_2' in setting and setting['stock_name_2']

        # Get start and end times for key_1
        start_time_1, end_time_1 = get_market_times(get_json_value('key_1', 'exchange'))

        # Define the instrument configs
        instrument_configs = [
                "key": get_json_value('key_1', 'instrument_key'),
                "start_time": datetime.strptime(start_time_1, "%Y-%m-%d %H:%M:%S"),
                "end_time": datetime.strptime(end_time_1, "%Y-%m-%d %H:%M:%S"),
                "buy_offset": setting.get('buy_offset_1', 0),
                "stop_loss_offset": setting.get('stop_sell_offset_1', 0),
                "timeframe": setting.get('timeframe', '15m'),
                "quantity": setting.get('quantity_stock_1', 1)

        if has_second_stock:
            start_time_2, end_time_2 = get_market_times(get_json_value('key_2', 'exchange'))
                "key": get_json_value('key_2', 'instrument_key'),
                "start_time": datetime.strptime(start_time_2, "%Y-%m-%d %H:%M:%S"),
                "end_time": datetime.strptime(end_time_2, "%Y-%m-%d %H:%M:%S"),
                "buy_offset": setting.get('buy_offset_2', 0),
                "stop_loss_offset": setting.get('stop_sell_offset_2', 0),
                "timeframe": setting.get('timeframe', '15m'),
                "quantity": setting.get('quantity_stock_2', 1)

        # Create tasks for each instrument
        tasks = []
        for config in instrument_configs:
            strategy = TradingStrategy(shared_balance, config["buy_offset"], config["stop_loss_offset"], config["key"], config["quantity"])

        # Run all tasks concurrently
        await asyncio.gather(*tasks)
        return "Market is closed. Run it tomorrow again!"

if __name__ == "__main__":


why is it happening for me

and too how to get in a call from the developer of upstox

Still I didn’t get the reply

Hi @Harry_Donni ,

Our team is internally checking this and will get back to you shortly. Thank you for your patience.

Kindly refer to the code below, which is identical to the one provided on our GitHub page for establishing a WebSocket connection: GitHub Market Data Feeder.

# Import necessary modules
import asyncio
import json
import ssl
import upstox_client
import websockets
from google.protobuf.json_format import MessageToDict

import MarketDataFeed_pb2 as pb

def get_market_data_feed_authorize(api_version, configuration):
    """Get authorization for market data feed."""
    api_instance = upstox_client.WebsocketApi(
    api_response = api_instance.get_market_data_feed_authorize(api_version)
    return api_response

def decode_protobuf(buffer):
    """Decode protobuf message."""
    feed_response = pb.FeedResponse()
    return feed_response

async def fetch_market_data():
    """Fetch market data using WebSocket and print it."""

    # Create default SSL context
    ssl_context = ssl.create_default_context()
    ssl_context.check_hostname = False
    ssl_context.verify_mode = ssl.CERT_NONE

    # Configure OAuth2 access token for authorization
    configuration = upstox_client.Configuration()

    api_version = '2.0'
    configuration.access_token = "access_token"

    # Get market data feed authorization
    response = get_market_data_feed_authorize(
        api_version, configuration)

    # Connect to the WebSocket with SSL context
    async with websockets.connect(, ssl=ssl_context) as websocket:
        print('Connection established')

        await asyncio.sleep(1)  # Wait for 1 second

        # Data to be sent over the WebSocket
        data = {
            "guid": "someguid",
            "method": "sub",
            "data": {
                "mode": "full",
                "instrumentKeys": ["MCX_FO|435356","MCX_FO|435901"]

        # Convert data to binary and send over WebSocket
        binary_data = json.dumps(data).encode('utf-8')
        await websocket.send(binary_data)

        # Continuously receive and decode data from WebSocket
        while True:
            message = await websocket.recv()
            decoded_data = decode_protobuf(message)

            # Convert the decoded data to a dictionary
            data_dict = MessageToDict(decoded_data)

            # Print the dictionary representation

# Execute the function to fetch market data

Please note that you can subscribe to up to 100 instruments per WebSocket connection, and with a single access token, you can establish up to 3 WebSocket connections.

For easier implementation of websockets do refer to streamer functionalities provided in various languages.

Thank you.

But it works with my other upstox account but s
Does not work with my upstox account only?

I feel that I am blocked for the websocket connection so I would request to unblock because it works for the other upstox account

hello please help me to fix this issue it’s 403 which means I am blocked