Can't use two instrument at once in websocket

import asyncio
import json
import ssl
import upstox_client
import websockets
from google.protobuf.json_format import MessageToDict
import MarketDataFeed_pb2 as pb
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
import os
import sys
from timing_holiday import Market_open_close
import pytz
import time

# External files
from get_fund import get_fund, get_fund_data
from buy_sell import buy, sell, get_order_details, modify_order, cancel_order

def access_token():
    """this is the access token which is used for getting the tokens.
    Returns:
        str: provides the access_token
    """
    # Load the access token from the JSON file
    with open('jsons/access_token.json', 'r') as file:
        data = json.load(file)
        access_token = data['access_token']
    return access_token

def get_market_data_feed_authorize(api_version, configuration):
    """this will get the Market data feed by authorizing.

    Args:
        api_version (str): provides the version of the api which is v2
        configuration (str): here it provides the access token

    Returns:
        dict: provides the json response.
    """
    api_instance = upstox_client.WebsocketApi(upstox_client.ApiClient(configuration))
    api_response = api_instance.get_market_data_feed_authorize(api_version)
    return api_response

def decode_protobuf(buffer):
    """decodes the protobuf from the upstox"""
    feed_response = pb.FeedResponse()
    feed_response.ParseFromString(buffer)
    return feed_response

def convert_timestamp(ts):
    """converts the timestamp seconds to strftime.

    Args:
        ts (int): provide the timestamp

    Returns:
        str: get str version of timestamp and hours
    """
    timestamp_seconds = int(ts) / 1000
    dt_object = datetime.fromtimestamp(timestamp_seconds)
    return dt_object.strftime("%Y-%m-%d %H:%M:%S")

def extract_ohlc_data(data_dict):
    try:
        feeds = data_dict['feeds']
        instrument_key = next(iter(feeds))
        ohlc_data = feeds[instrument_key]['ff']['marketFF']['marketOHLC']['ohlc']
        one_minute_data = next((item for item in ohlc_data if item['interval'] == 'I1'), None)
        
        if one_minute_data:
            one_minute_data['datetime'] = convert_timestamp(one_minute_data['ts'])
            return {
                'instrument_key': instrument_key,
                'one_minute_ohlc': one_minute_data
            }
    except (KeyError, StopIteration):
        pass
    return None

class OHLCDataCollector:
    def __init__(self):
        self.data = []

    def add_data(self, ohlc_data):
        self.data.append(ohlc_data['one_minute_ohlc'])

    def get_dataframe(self):
        df = pd.DataFrame(self.data)
        df['datetime'] = pd.to_datetime(df['datetime'])
        df.set_index('datetime', inplace=True)
        for col in ['open', 'high', 'low', 'close']:
            df[col] = pd.to_numeric(df[col])
        return df
    
def convert_to_timeframe(df, timeframe):
    if timeframe == '1m':
        return df  # No conversion needed for 1-minute data
    elif timeframe == '5m':
        rule = '5min'
    elif timeframe == '30m':
        rule = '30min'
    elif timeframe == '15m':
        rule = '15min'
    elif timeframe == '1h':
        rule = 'h'
    else:
        raise ValueError(f"Unsupported timeframe: {timeframe}")

    resampled = df.resample(rule).agg({
        'open': 'first',
        'high': 'max',
        'low': 'min',
        'close': 'last'
    })
    return resampled.dropna()

def round_time(dt, timeframe):
    if timeframe == '1m':
        return dt.replace(second=0, microsecond=0)
    elif timeframe == '5m':
        return dt.replace(minute=dt.minute // 5 * 5, second=0, microsecond=0)
    elif timeframe == '15m':
        return dt.replace(minute=dt.minute // 15 * 15, second=0, microsecond=0)
    elif timeframe == '30m':
        return dt.replace(minute=(dt.minute // 30) * 30, second=0, microsecond=0)
    elif timeframe == '1h':
        return dt.replace(minute=0, second=0, microsecond=0)
    else:
        raise ValueError(f"Unsupported timeframe: {timeframe}")

def calculate_heikin_ashi(df):
    ha_close = (df['open'] + df['high'] + df['low'] + df['close']) / 4
    ha_open = np.zeros(len(df))
    ha_open[0] = df['open'].iloc[0]
    for i in range(1, len(df)):
        ha_open[i] = (ha_open[i-1] + ha_close.iloc[i-1]) / 2
    ha_high = np.maximum(df['high'], np.maximum(ha_open, ha_close))
    ha_low = np.minimum(df['low'], np.minimum(ha_open, ha_close))

    ha_open_rounded = np.round(ha_open, 2)
    ha_close_rounded = np.round(ha_close, 2)

    ha_df = pd.DataFrame({
        'HA_Open': ha_open_rounded,
        'HA_High': ha_high,
        'HA_Low': ha_low,
        'HA_Close': ha_close_rounded
    }, index=df.index)
    
    return ha_df

def format_output_row(datetime_str, ha_open, ha_high, ha_low, ha_close):
    return f"{datetime_str:<25} {ha_open:>8.2f} {ha_high:>8.2f} {ha_low:>8.2f} {ha_close:>8.2f}"

class Colors:
    RESET = "\033[0m"
    RED = "\033[91m"
    GREEN = "\033[92m"
    YELLOW = "\033[93m"  # Add this line to define YELLOW color

def supports_color():
    """
    Returns True if the running system's terminal supports color,
    and False otherwise.
    """
    plat = sys.platform
    supported_platform = plat != 'Pocket PC' and (plat != 'win32' or 'ANSICON' in os.environ)
    is_a_tty = hasattr(sys.stdout, 'isatty') and sys.stdout.isatty()
    return supported_platform and is_a_tty

def get_saved_order_id(stock_number, order_type):
    filename = f'orders/order_id_{stock_number}_{order_type}.json'
    if os.path.exists(filename):
        with open(filename, 'r') as file:
            order_data = json.load(file)
            return order_data.get('order_id')
    return None

class TradingStrategy:
    def __init__(self, balance, buy_offset, stop_loss_offset, instrument_key, quantity):
        self.balance = balance
        self.position = None
        self.buy_price = 0
        self.stop_loss = 0
        self.buy_offset = buy_offset
        self.stop_loss_offset = stop_loss_offset
        self.use_color = supports_color()
        self.instrument_key = instrument_key
        self.access_token = access_token()
        self.buy_order_id = None
        self.stop_loss_order_id = None
        self.quantity = quantity

    def get_stock_number(self):
        return self.instrument_key.split('|')[-1]

    def check_order_status(self, order_id):
        order_details = get_order_details(self.access_token, order_id)
        if order_details['status'] == 'success':
            return order_details['data']['status']
        return 'unknown'

    def place_stop_loss_order(self, trigger_price):
        stop_loss_price = round(max(trigger_price - self.stop_loss_offset, 0.1), 1)
        trigger_price = round(trigger_price, 1)
        self.stop_loss_order_id = sell(self.access_token, self.instrument_key, self.quantity, trigger_price, self.stop_loss_offset, self.get_stock_number())

        if self.stop_loss_order_id:
            self.stop_loss = stop_loss_price
            self.log_action(f"Stop-loss set at trigger: {trigger_price}, price: {stop_loss_price}", None, Colors.YELLOW if self.use_color else None)
        else:
            self.log_action("Failed to place stop-loss order", None, Colors.RED if self.use_color else None)
        
    async def process_candle(self, current_candle, previous_candle):
        # Check stop-loss order status if a position is open
        if self.position == "bought" and self.stop_loss_order_id:
            stop_loss_status = self.check_order_status(self.stop_loss_order_id)
            self.log_action(f"Stop-loss order status: {stop_loss_status}", None, Colors.YELLOW if self.use_color else None)

            if stop_loss_status == 'complete':
                sell_price = self.stop_loss
                profit = sell_price - self.buy_price
                self.balance += profit
                self.log_action("Stop-loss triggered", sell_price, Colors.RED if self.use_color else None)
                self.log_action(f"Trade closed. Profit: {profit:.2f}", None, Colors.RED if self.use_color else None)
                self.position = None
                self.buy_order_id = None
                self.stop_loss_order_id = None
                return
            elif stop_loss_status == 'rejected':
                self.log_action("Stop-loss order rejected. Placing a new stop-loss order.", None, Colors.YELLOW if self.use_color else None)
                self.place_stop_loss_order(current_candle['HA_Low'])

        # Modify stop-loss order if a position is open, the stop loss order is not complete, and conditions are met
        if self.position == "bought" and self.stop_loss_order_id and stop_loss_status not in ['complete', 'rejected']:
            new_stop_loss_trigger = round(current_candle['HA_Low'], 1)
            new_stop_loss_price = round(new_stop_loss_trigger - self.stop_loss_offset, 1)
            # Use SL order type
            modified_order_id = modify_order(self.access_token, self.stop_loss_order_id, self.quantity, price=new_stop_loss_price, trigger_price=new_stop_loss_trigger)
            if modified_order_id:
                self.stop_loss = new_stop_loss_price
                self.log_action(f"Stop-loss modified to trigger: {new_stop_loss_trigger}, price: {new_stop_loss_price}", None, Colors.YELLOW if self.use_color else None)
            else:
                self.log_action("Failed to modify stop-loss order", None, Colors.RED if self.use_color else None)

        if self.position is None and self.buy_order_id:
            max_retries = 3
            retry_delay = 120  # seconds
            for attempt in range(max_retries):
                try:
                    buy_status = self.check_order_status(self.buy_order_id)
                    if buy_status == 'complete':
                        self.position = "bought"
                        self.buy_price = round(previous_candle['HA_High'] + self.buy_offset, 1)
                        self.log_action("Buy order completed", self.buy_price, Colors.GREEN if self.use_color else None)

                        # Place stop-loss order
                        self.place_stop_loss_order(previous_candle['HA_Low'])
                        break  # Exit the retry loop if buy order is complete
                    elif buy_status in ['cancelled', 'rejected']:
                        self.log_action(f"Buy order {buy_status}", None, Colors.RED if self.use_color else None)
                        self.buy_order_id = None
                        break  # Exit the retry loop if the order is cancelled or rejected
                    elif buy_status == 'trigger pending':
                        self.log_action(f"Buy order still trigger pending", None, Colors.YELLOW if self.use_color else None)
                        if attempt == max_retries - 1:  # If it's the last attempt
                            self.log_action("Max retries reached for buy order. Cancelling the order.", None, Colors.YELLOW if self.use_color else None)
                            cancel_result = cancel_order(self.access_token, self.buy_order_id)
                            if cancel_result.get('status') == 'success':
                                self.log_action("Buy order cancelled successfully", None, Colors.YELLOW if self.use_color else None)
                                self.buy_order_id = None
                            else:
                                self.log_action(f"Failed to cancel buy order: {cancel_result.get('message', 'Unknown error')}", None, Colors.RED if self.use_color else None)
                        else:
                            self.log_action(f"Retrying buy order check in {retry_delay} seconds...", None, Colors.YELLOW if self.use_color else None)
                            await asyncio.sleep(retry_delay)
                    else:
                        self.log_action(f"Buy order status: {buy_status}", None, Colors.YELLOW if self.use_color else None)
                        if attempt < max_retries - 1:  # If not the last attempt
                            self.log_action(f"Retrying buy order check in {retry_delay} seconds...", None, Colors.YELLOW if self.use_color else None)
                            await asyncio.sleep(retry_delay)
                        else:
                            self.log_action("Max retries reached for buy order. Cancelling the order.", None, Colors.YELLOW if self.use_color else None)
                            cancel_result = cancel_order(self.access_token, self.buy_order_id)
                            if cancel_result.get('status') == 'success':
                                self.log_action("Buy order cancelled successfully", None, Colors.YELLOW if self.use_color else None)
                                self.buy_order_id = None
                            else:
                                self.log_action(f"Failed to cancel buy order: {cancel_result.get('message', 'Unknown error')}", None, Colors.RED if self.use_color else None)
                except Exception as e:
                    self.log_action(f"Error checking buy order status: {str(e)}", None, Colors.RED if self.use_color else None)
                    if attempt < max_retries - 1:
                        self.log_action(f"Retrying in {retry_delay} seconds...", None, Colors.YELLOW if self.use_color else None)
                        await asyncio.sleep(retry_delay)
                    else:
                        self.log_action("Max retries reached for buy order. Cancelling the order.", None, Colors.YELLOW if self.use_color else None)
                        cancel_result = cancel_order(self.access_token, self.buy_order_id)
                        if cancel_result.get('status') == 'success':
                            self.log_action("Buy order cancelled successfully", None, Colors.YELLOW if self.use_color else None)
                            self.buy_order_id = None
                        else:
                            self.log_action(f"Failed to cancel buy order: {cancel_result.get('message', 'Unknown error')}", None, Colors.RED if self.use_color else None)

        elif self.position is None and current_candle['HA_High'] > previous_candle['HA_High'] and not self.buy_order_id:
            # Place a new buy order only if there's no existing buy order
            buy_price = round(previous_candle['HA_High'] + self.buy_offset, 1)
            self.buy_order_id = buy(self.access_token, self.instrument_key, 1, previous_candle['HA_High'], self.buy_offset, self.get_stock_number())
            if self.buy_order_id:
                self.log_action(f"New buy order placed with ID: {self.buy_order_id}", buy_price, Colors.GREEN if self.use_color else None)
            else:
                self.log_action("Failed to place new buy order", None, Colors.RED if self.use_color else None)

        
    def log_action(self, message, price=None, color=None):
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        if price is not None:
            log_message = f"[{timestamp}] {self.instrument_key}: {message} at {price:.2f}, Balance: {self.balance:.2f}"
        else:
            log_message = f"[{timestamp}] {self.instrument_key}: {message}"
        
        if color and self.use_color:
            print(f"{color}{log_message}{Colors.RESET}")
        else:
            print(log_message)

async def run_trading_strategy_for_instrument(start_time, end_time, instrument_key, strategy, buy_offset, stop_loss_offset, timeframe='15m'):
    while True:
        try:
            ssl_context = ssl.create_default_context()
            ssl_context.check_hostname = False
            ssl_context.verify_mode = ssl.CERT_NONE

            configuration = upstox_client.Configuration()
            api_version = '2.0'
            configuration.access_token = access_token()

            response = get_market_data_feed_authorize(api_version, configuration)
            ohlc_collector = OHLCDataCollector()
            first_print = True
            previous_candle = None

            async with websockets.connect(response.data.authorized_redirect_uri, ssl=ssl_context) as websocket:
                print(f'Connection established for {instrument_key} with timeframe {timeframe}')

                await asyncio.sleep(1)

                data = {
                    "guid": "someguid",
                    "method": "sub",
                    "data": {
                        "mode": "full",
                        "instrumentKeys": [instrument_key]
                    }
                }

                binary_data = json.dumps(data).encode('utf-8')
                await websocket.send(binary_data)

                current_time = datetime.now()
                next_process_time = get_next_interval(current_time, timeframe)
                
                while current_time <= end_time:
                    if current_time < start_time:
                        await asyncio.sleep((start_time - current_time).total_seconds())
                        current_time = datetime.now()
                        continue

                    if current_time >= next_process_time:
                        df = ohlc_collector.get_dataframe()
                        if not df.empty:
                            df = convert_to_timeframe(df, timeframe)
                            ha_df = calculate_heikin_ashi(df)
                            current_candle = ha_df.iloc[-1]

                            if first_print:
                                print(f"{instrument_key} ({timeframe}): {'datetime':<25} {'HA_Open':>8} {'HA_High':>8} {'HA_Low':>8} {'HA_Close':>8}")
                                first_print = False

                            print(f"{instrument_key} ({timeframe}): {format_output_row(next_process_time.strftime('%Y-%m-%d %H:%M:%S'), current_candle['HA_Open'], current_candle['HA_High'], current_candle['HA_Low'], current_candle['HA_Close'])}")

                            if previous_candle is not None:
                                await strategy.process_candle(current_candle, previous_candle)

                            previous_candle = current_candle

                        next_process_time = get_next_interval(next_process_time, timeframe)
                        ohlc_collector = OHLCDataCollector()  # Reset collector for the next interval
                        
                    # Process incoming data
                    try:
                        message = await asyncio.wait_for(websocket.recv(), timeout=1.0)
                        decoded_data = decode_protobuf(message)
                        data_dict = MessageToDict(decoded_data)
                        
                        ohlc_data = extract_ohlc_data(data_dict)
                        if ohlc_data:
                            for key in ['open', 'high', 'low', 'close']:
                                ohlc_data['one_minute_ohlc'][key] = round(float(ohlc_data['one_minute_ohlc'][key]), 2)
                            ohlc_collector.add_data(ohlc_data)
                    except asyncio.TimeoutError:
                        pass  # No data received, continue to next iteration

                    await asyncio.sleep(0.1)  # Small delay to prevent CPU overuse
                    current_time = datetime.now()

                print(f"End time reached for {instrument_key}. Stopping the script.")
                break  # Exit the while True loop if we've reached the end time
        except websockets.ConnectionClosedError:
            print(f"WebSocket connection closed for {instrument_key}. Attempting to reconnect...")
            await asyncio.sleep(5)  # Wait for 5 seconds before attempting to reconnect
        except Exception as e:
            print(f"An error occurred for {instrument_key}: {str(e)}. Attempting to reconnect...")
            await asyncio.sleep(5)  # Wait for 5 seconds before attempting to reconnect


def get_next_interval(current_time, interval):
    if interval == '1m':
        return current_time.replace(second=0, microsecond=0) + timedelta(minutes=1)
    elif interval == '5m':
        return current_time.replace(minute=current_time.minute - (current_time.minute % 5), second=0, microsecond=0) + timedelta(minutes=5)
    elif interval == '15m':
        return current_time.replace(minute=current_time.minute - (current_time.minute % 15), second=0, microsecond=0) + timedelta(minutes=15)
    elif interval == '30m':
        return current_time.replace(minute=current_time.minute - (current_time.minute % 30), second=0, microsecond=0) + timedelta(minutes=30)
    elif interval == '1h':
        return current_time.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1)
    else:
        raise ValueError(f"Unsupported interval: {interval}")


def get_json_value(key, value):
    try:
        with open('jsons/key.json', 'r') as file:
            data = json.load(file)
        
        # Get the content of key_1 or key_2
        content = data.get(key, {})
        
        # Get the specific value from the content
        result = content.get(value)
        
        return result
    except FileNotFoundError:
        print("Error: File 'key.json' not found.")
        return None
    except json.JSONDecodeError:
        print("Error: Invalid JSON format in 'key.json'.")
        return None


# Load Market_status.json
with open('jsons/Market_status.json', 'r') as f:
    market_status = json.load(f)

# load setting.json
setting = {}
if os.path.exists('jsons/setting.json'):
    with open('jsons/setting.json', 'r') as f:
        setting = json.load(f)

# Load key.json
with open('jsons/key.json', 'r') as f:
    key_data = json.load(f)

# Function to get market times for a given exchange name
def get_market_times(exchange_name):
    for data in market_status["data"]:
        if data["exchange"] == exchange_name:
            return data["start_time"], data["end_time"]
    return None, None

# Function to get values from key.json
def get_json_value(key, field):
    return key_data.get(key, {}).get(field, None)


async def live_data():
    token = access_token()
    get_fund(token)
    available_margin = get_fund_data("commodity", "available_margin")
    ist = pytz.timezone('Asia/Kolkata')
    current_date = datetime.now(ist).strftime("%Y-%m-%d")
    market_status_response = Market_open_close(current_date)
    
    shared_balance = available_margin

    if market_status_response["status"] == "success" and "Market is open" in market_status_response["details"]:
        print("Market is open. Running the script...")
        
        # Load settings
        with open('jsons/setting.json', 'r') as f:
            setting = json.load(f)

        # Check if stock_name_2 exists in settings
        has_second_stock = 'stock_name_2' in setting and setting['stock_name_2']

        # Get start and end times for key_1
        start_time_1, end_time_1 = get_market_times(get_json_value('key_1', 'exchange'))

        # Define the instrument configs
        instrument_configs = [
            {
                "key": get_json_value('key_1', 'instrument_key'),
                "start_time": datetime.strptime(start_time_1, "%Y-%m-%d %H:%M:%S"),
                "end_time": datetime.strptime(end_time_1, "%Y-%m-%d %H:%M:%S"),
                "buy_offset": setting.get('buy_offset_1', 0),
                "stop_loss_offset": setting.get('stop_sell_offset_1', 0),
                "timeframe": setting.get('timeframe', '15m'),
                "quantity": setting.get('quantity_stock_1', 1)
            }
        ]

        if has_second_stock:
            start_time_2, end_time_2 = get_market_times(get_json_value('key_2', 'exchange'))
            instrument_configs.append({
                "key": get_json_value('key_2', 'instrument_key'),
                "start_time": datetime.strptime(start_time_2, "%Y-%m-%d %H:%M:%S"),
                "end_time": datetime.strptime(end_time_2, "%Y-%m-%d %H:%M:%S"),
                "buy_offset": setting.get('buy_offset_2', 0),
                "stop_loss_offset": setting.get('stop_sell_offset_2', 0),
                "timeframe": setting.get('timeframe', '15m'),
                "quantity": setting.get('quantity_stock_2', 1)
            })

        # Create tasks for each instrument
        tasks = []
        for config in instrument_configs:
            strategy = TradingStrategy(shared_balance, config["buy_offset"], config["stop_loss_offset"], config["key"], config["quantity"])
            tasks.append(run_trading_strategy_for_instrument(
                config["start_time"],
                config["end_time"],
                config["key"],
                strategy,
                config["buy_offset"],
                config["stop_loss_offset"],
                config["timeframe"]
            ))

        # Run all tasks concurrently
        await asyncio.gather(*tasks)
    else:
        return "Market is closed. Run it tomorrow again!"

if __name__ == "__main__":
    asyncio.run(live_data())

error:

why is it happening for me

and too how to get in a call from the developer of upstox

Still I didn’t get the reply

Hi @Harry_Donni ,

Our team is internally checking this and will get back to you shortly. Thank you for your patience.

@Harry_Donni
Kindly refer to the code below, which is identical to the one provided on our GitHub page for establishing a WebSocket connection: GitHub Market Data Feeder.

# Import necessary modules
import asyncio
import json
import ssl
import upstox_client
import websockets
from google.protobuf.json_format import MessageToDict

import MarketDataFeed_pb2 as pb


def get_market_data_feed_authorize(api_version, configuration):
    """Get authorization for market data feed."""
    api_instance = upstox_client.WebsocketApi(
        upstox_client.ApiClient(configuration))
    api_response = api_instance.get_market_data_feed_authorize(api_version)
    return api_response


def decode_protobuf(buffer):
    """Decode protobuf message."""
    feed_response = pb.FeedResponse()
    feed_response.ParseFromString(buffer)
    return feed_response


async def fetch_market_data():
    """Fetch market data using WebSocket and print it."""

    # Create default SSL context
    ssl_context = ssl.create_default_context()
    ssl_context.check_hostname = False
    ssl_context.verify_mode = ssl.CERT_NONE

    # Configure OAuth2 access token for authorization
    configuration = upstox_client.Configuration()

    api_version = '2.0'
    configuration.access_token = "access_token"

    # Get market data feed authorization
    response = get_market_data_feed_authorize(
        api_version, configuration)

    # Connect to the WebSocket with SSL context
    async with websockets.connect(response.data.authorized_redirect_uri, ssl=ssl_context) as websocket:
        print('Connection established')

        await asyncio.sleep(1)  # Wait for 1 second

        # Data to be sent over the WebSocket
        data = {
            "guid": "someguid",
            "method": "sub",
            "data": {
                "mode": "full",
                "instrumentKeys": ["MCX_FO|435356","MCX_FO|435901"]
            }
        }

        # Convert data to binary and send over WebSocket
        binary_data = json.dumps(data).encode('utf-8')
        await websocket.send(binary_data)

        # Continuously receive and decode data from WebSocket
        while True:
            message = await websocket.recv()
            decoded_data = decode_protobuf(message)

            # Convert the decoded data to a dictionary
            data_dict = MessageToDict(decoded_data)

            # Print the dictionary representation
            print(json.dumps(data_dict))


# Execute the function to fetch market data
asyncio.run(fetch_market_data())

Please note that you can subscribe to up to 100 instruments per WebSocket connection, and with a single access token, you can establish up to 3 WebSocket connections.

For easier implementation of websockets do refer to streamer functionalities provided in various languages.

Thank you.

But it works with my other upstox account but s
Does not work with my upstox account only?

I feel that I am blocked for the websocket connection so I would request to unblock because it works for the other upstox account