import asyncio
import json
import ssl
import upstox_client
import websockets
from google.protobuf.json_format import MessageToDict
import MarketDataFeed_pb2 as pb
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
import os
import sys
from timing_holiday import Market_open_close
import pytz
import time
# External files
from get_fund import get_fund, get_fund_data
from buy_sell import buy, sell, get_order_details, modify_order, cancel_order
def access_token():
"""this is the access token which is used for getting the tokens.
Returns:
str: provides the access_token
"""
# Load the access token from the JSON file
with open('jsons/access_token.json', 'r') as file:
data = json.load(file)
access_token = data['access_token']
return access_token
def get_market_data_feed_authorize(api_version, configuration):
"""this will get the Market data feed by authorizing.
Args:
api_version (str): provides the version of the api which is v2
configuration (str): here it provides the access token
Returns:
dict: provides the json response.
"""
api_instance = upstox_client.WebsocketApi(upstox_client.ApiClient(configuration))
api_response = api_instance.get_market_data_feed_authorize(api_version)
return api_response
def decode_protobuf(buffer):
"""decodes the protobuf from the upstox"""
feed_response = pb.FeedResponse()
feed_response.ParseFromString(buffer)
return feed_response
def convert_timestamp(ts):
"""converts the timestamp seconds to strftime.
Args:
ts (int): provide the timestamp
Returns:
str: get str version of timestamp and hours
"""
timestamp_seconds = int(ts) / 1000
dt_object = datetime.fromtimestamp(timestamp_seconds)
return dt_object.strftime("%Y-%m-%d %H:%M:%S")
def extract_ohlc_data(data_dict):
try:
feeds = data_dict['feeds']
instrument_key = next(iter(feeds))
ohlc_data = feeds[instrument_key]['ff']['marketFF']['marketOHLC']['ohlc']
one_minute_data = next((item for item in ohlc_data if item['interval'] == 'I1'), None)
if one_minute_data:
one_minute_data['datetime'] = convert_timestamp(one_minute_data['ts'])
return {
'instrument_key': instrument_key,
'one_minute_ohlc': one_minute_data
}
except (KeyError, StopIteration):
pass
return None
class OHLCDataCollector:
def __init__(self):
self.data = []
def add_data(self, ohlc_data):
self.data.append(ohlc_data['one_minute_ohlc'])
def get_dataframe(self):
df = pd.DataFrame(self.data)
df['datetime'] = pd.to_datetime(df['datetime'])
df.set_index('datetime', inplace=True)
for col in ['open', 'high', 'low', 'close']:
df[col] = pd.to_numeric(df[col])
return df
def convert_to_timeframe(df, timeframe):
if timeframe == '1m':
return df # No conversion needed for 1-minute data
elif timeframe == '5m':
rule = '5min'
elif timeframe == '30m':
rule = '30min'
elif timeframe == '15m':
rule = '15min'
elif timeframe == '1h':
rule = 'h'
else:
raise ValueError(f"Unsupported timeframe: {timeframe}")
resampled = df.resample(rule).agg({
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last'
})
return resampled.dropna()
def round_time(dt, timeframe):
if timeframe == '1m':
return dt.replace(second=0, microsecond=0)
elif timeframe == '5m':
return dt.replace(minute=dt.minute // 5 * 5, second=0, microsecond=0)
elif timeframe == '15m':
return dt.replace(minute=dt.minute // 15 * 15, second=0, microsecond=0)
elif timeframe == '30m':
return dt.replace(minute=(dt.minute // 30) * 30, second=0, microsecond=0)
elif timeframe == '1h':
return dt.replace(minute=0, second=0, microsecond=0)
else:
raise ValueError(f"Unsupported timeframe: {timeframe}")
def calculate_heikin_ashi(df):
ha_close = (df['open'] + df['high'] + df['low'] + df['close']) / 4
ha_open = np.zeros(len(df))
ha_open[0] = df['open'].iloc[0]
for i in range(1, len(df)):
ha_open[i] = (ha_open[i-1] + ha_close.iloc[i-1]) / 2
ha_high = np.maximum(df['high'], np.maximum(ha_open, ha_close))
ha_low = np.minimum(df['low'], np.minimum(ha_open, ha_close))
ha_open_rounded = np.round(ha_open, 2)
ha_close_rounded = np.round(ha_close, 2)
ha_df = pd.DataFrame({
'HA_Open': ha_open_rounded,
'HA_High': ha_high,
'HA_Low': ha_low,
'HA_Close': ha_close_rounded
}, index=df.index)
return ha_df
def format_output_row(datetime_str, ha_open, ha_high, ha_low, ha_close):
return f"{datetime_str:<25} {ha_open:>8.2f} {ha_high:>8.2f} {ha_low:>8.2f} {ha_close:>8.2f}"
class Colors:
RESET = "\033[0m"
RED = "\033[91m"
GREEN = "\033[92m"
YELLOW = "\033[93m" # Add this line to define YELLOW color
def supports_color():
"""
Returns True if the running system's terminal supports color,
and False otherwise.
"""
plat = sys.platform
supported_platform = plat != 'Pocket PC' and (plat != 'win32' or 'ANSICON' in os.environ)
is_a_tty = hasattr(sys.stdout, 'isatty') and sys.stdout.isatty()
return supported_platform and is_a_tty
def get_saved_order_id(stock_number, order_type):
filename = f'orders/order_id_{stock_number}_{order_type}.json'
if os.path.exists(filename):
with open(filename, 'r') as file:
order_data = json.load(file)
return order_data.get('order_id')
return None
class TradingStrategy:
def __init__(self, balance, buy_offset, stop_loss_offset, instrument_key, quantity):
self.balance = balance
self.position = None
self.buy_price = 0
self.stop_loss = 0
self.buy_offset = buy_offset
self.stop_loss_offset = stop_loss_offset
self.use_color = supports_color()
self.instrument_key = instrument_key
self.access_token = access_token()
self.buy_order_id = None
self.stop_loss_order_id = None
self.quantity = quantity
def get_stock_number(self):
return self.instrument_key.split('|')[-1]
def check_order_status(self, order_id):
order_details = get_order_details(self.access_token, order_id)
if order_details['status'] == 'success':
return order_details['data']['status']
return 'unknown'
def place_stop_loss_order(self, trigger_price):
stop_loss_price = round(max(trigger_price - self.stop_loss_offset, 0.1), 1)
trigger_price = round(trigger_price, 1)
self.stop_loss_order_id = sell(self.access_token, self.instrument_key, self.quantity, trigger_price, self.stop_loss_offset, self.get_stock_number())
if self.stop_loss_order_id:
self.stop_loss = stop_loss_price
self.log_action(f"Stop-loss set at trigger: {trigger_price}, price: {stop_loss_price}", None, Colors.YELLOW if self.use_color else None)
else:
self.log_action("Failed to place stop-loss order", None, Colors.RED if self.use_color else None)
async def process_candle(self, current_candle, previous_candle):
# Check stop-loss order status if a position is open
if self.position == "bought" and self.stop_loss_order_id:
stop_loss_status = self.check_order_status(self.stop_loss_order_id)
self.log_action(f"Stop-loss order status: {stop_loss_status}", None, Colors.YELLOW if self.use_color else None)
if stop_loss_status == 'complete':
sell_price = self.stop_loss
profit = sell_price - self.buy_price
self.balance += profit
self.log_action("Stop-loss triggered", sell_price, Colors.RED if self.use_color else None)
self.log_action(f"Trade closed. Profit: {profit:.2f}", None, Colors.RED if self.use_color else None)
self.position = None
self.buy_order_id = None
self.stop_loss_order_id = None
return
elif stop_loss_status == 'rejected':
self.log_action("Stop-loss order rejected. Placing a new stop-loss order.", None, Colors.YELLOW if self.use_color else None)
self.place_stop_loss_order(current_candle['HA_Low'])
# Modify stop-loss order if a position is open, the stop loss order is not complete, and conditions are met
if self.position == "bought" and self.stop_loss_order_id and stop_loss_status not in ['complete', 'rejected']:
new_stop_loss_trigger = round(current_candle['HA_Low'], 1)
new_stop_loss_price = round(new_stop_loss_trigger - self.stop_loss_offset, 1)
# Use SL order type
modified_order_id = modify_order(self.access_token, self.stop_loss_order_id, self.quantity, price=new_stop_loss_price, trigger_price=new_stop_loss_trigger)
if modified_order_id:
self.stop_loss = new_stop_loss_price
self.log_action(f"Stop-loss modified to trigger: {new_stop_loss_trigger}, price: {new_stop_loss_price}", None, Colors.YELLOW if self.use_color else None)
else:
self.log_action("Failed to modify stop-loss order", None, Colors.RED if self.use_color else None)
if self.position is None and self.buy_order_id:
max_retries = 3
retry_delay = 120 # seconds
for attempt in range(max_retries):
try:
buy_status = self.check_order_status(self.buy_order_id)
if buy_status == 'complete':
self.position = "bought"
self.buy_price = round(previous_candle['HA_High'] + self.buy_offset, 1)
self.log_action("Buy order completed", self.buy_price, Colors.GREEN if self.use_color else None)
# Place stop-loss order
self.place_stop_loss_order(previous_candle['HA_Low'])
break # Exit the retry loop if buy order is complete
elif buy_status in ['cancelled', 'rejected']:
self.log_action(f"Buy order {buy_status}", None, Colors.RED if self.use_color else None)
self.buy_order_id = None
break # Exit the retry loop if the order is cancelled or rejected
elif buy_status == 'trigger pending':
self.log_action(f"Buy order still trigger pending", None, Colors.YELLOW if self.use_color else None)
if attempt == max_retries - 1: # If it's the last attempt
self.log_action("Max retries reached for buy order. Cancelling the order.", None, Colors.YELLOW if self.use_color else None)
cancel_result = cancel_order(self.access_token, self.buy_order_id)
if cancel_result.get('status') == 'success':
self.log_action("Buy order cancelled successfully", None, Colors.YELLOW if self.use_color else None)
self.buy_order_id = None
else:
self.log_action(f"Failed to cancel buy order: {cancel_result.get('message', 'Unknown error')}", None, Colors.RED if self.use_color else None)
else:
self.log_action(f"Retrying buy order check in {retry_delay} seconds...", None, Colors.YELLOW if self.use_color else None)
await asyncio.sleep(retry_delay)
else:
self.log_action(f"Buy order status: {buy_status}", None, Colors.YELLOW if self.use_color else None)
if attempt < max_retries - 1: # If not the last attempt
self.log_action(f"Retrying buy order check in {retry_delay} seconds...", None, Colors.YELLOW if self.use_color else None)
await asyncio.sleep(retry_delay)
else:
self.log_action("Max retries reached for buy order. Cancelling the order.", None, Colors.YELLOW if self.use_color else None)
cancel_result = cancel_order(self.access_token, self.buy_order_id)
if cancel_result.get('status') == 'success':
self.log_action("Buy order cancelled successfully", None, Colors.YELLOW if self.use_color else None)
self.buy_order_id = None
else:
self.log_action(f"Failed to cancel buy order: {cancel_result.get('message', 'Unknown error')}", None, Colors.RED if self.use_color else None)
except Exception as e:
self.log_action(f"Error checking buy order status: {str(e)}", None, Colors.RED if self.use_color else None)
if attempt < max_retries - 1:
self.log_action(f"Retrying in {retry_delay} seconds...", None, Colors.YELLOW if self.use_color else None)
await asyncio.sleep(retry_delay)
else:
self.log_action("Max retries reached for buy order. Cancelling the order.", None, Colors.YELLOW if self.use_color else None)
cancel_result = cancel_order(self.access_token, self.buy_order_id)
if cancel_result.get('status') == 'success':
self.log_action("Buy order cancelled successfully", None, Colors.YELLOW if self.use_color else None)
self.buy_order_id = None
else:
self.log_action(f"Failed to cancel buy order: {cancel_result.get('message', 'Unknown error')}", None, Colors.RED if self.use_color else None)
elif self.position is None and current_candle['HA_High'] > previous_candle['HA_High'] and not self.buy_order_id:
# Place a new buy order only if there's no existing buy order
buy_price = round(previous_candle['HA_High'] + self.buy_offset, 1)
self.buy_order_id = buy(self.access_token, self.instrument_key, 1, previous_candle['HA_High'], self.buy_offset, self.get_stock_number())
if self.buy_order_id:
self.log_action(f"New buy order placed with ID: {self.buy_order_id}", buy_price, Colors.GREEN if self.use_color else None)
else:
self.log_action("Failed to place new buy order", None, Colors.RED if self.use_color else None)
def log_action(self, message, price=None, color=None):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if price is not None:
log_message = f"[{timestamp}] {self.instrument_key}: {message} at {price:.2f}, Balance: {self.balance:.2f}"
else:
log_message = f"[{timestamp}] {self.instrument_key}: {message}"
if color and self.use_color:
print(f"{color}{log_message}{Colors.RESET}")
else:
print(log_message)
async def run_trading_strategy_for_instrument(start_time, end_time, instrument_key, strategy, buy_offset, stop_loss_offset, timeframe='15m'):
while True:
try:
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
configuration = upstox_client.Configuration()
api_version = '2.0'
configuration.access_token = access_token()
response = get_market_data_feed_authorize(api_version, configuration)
ohlc_collector = OHLCDataCollector()
first_print = True
previous_candle = None
async with websockets.connect(response.data.authorized_redirect_uri, ssl=ssl_context) as websocket:
print(f'Connection established for {instrument_key} with timeframe {timeframe}')
await asyncio.sleep(1)
data = {
"guid": "someguid",
"method": "sub",
"data": {
"mode": "full",
"instrumentKeys": [instrument_key]
}
}
binary_data = json.dumps(data).encode('utf-8')
await websocket.send(binary_data)
current_time = datetime.now()
next_process_time = get_next_interval(current_time, timeframe)
while current_time <= end_time:
if current_time < start_time:
await asyncio.sleep((start_time - current_time).total_seconds())
current_time = datetime.now()
continue
if current_time >= next_process_time:
df = ohlc_collector.get_dataframe()
if not df.empty:
df = convert_to_timeframe(df, timeframe)
ha_df = calculate_heikin_ashi(df)
current_candle = ha_df.iloc[-1]
if first_print:
print(f"{instrument_key} ({timeframe}): {'datetime':<25} {'HA_Open':>8} {'HA_High':>8} {'HA_Low':>8} {'HA_Close':>8}")
first_print = False
print(f"{instrument_key} ({timeframe}): {format_output_row(next_process_time.strftime('%Y-%m-%d %H:%M:%S'), current_candle['HA_Open'], current_candle['HA_High'], current_candle['HA_Low'], current_candle['HA_Close'])}")
if previous_candle is not None:
await strategy.process_candle(current_candle, previous_candle)
previous_candle = current_candle
next_process_time = get_next_interval(next_process_time, timeframe)
ohlc_collector = OHLCDataCollector() # Reset collector for the next interval
# Process incoming data
try:
message = await asyncio.wait_for(websocket.recv(), timeout=1.0)
decoded_data = decode_protobuf(message)
data_dict = MessageToDict(decoded_data)
ohlc_data = extract_ohlc_data(data_dict)
if ohlc_data:
for key in ['open', 'high', 'low', 'close']:
ohlc_data['one_minute_ohlc'][key] = round(float(ohlc_data['one_minute_ohlc'][key]), 2)
ohlc_collector.add_data(ohlc_data)
except asyncio.TimeoutError:
pass # No data received, continue to next iteration
await asyncio.sleep(0.1) # Small delay to prevent CPU overuse
current_time = datetime.now()
print(f"End time reached for {instrument_key}. Stopping the script.")
break # Exit the while True loop if we've reached the end time
except websockets.ConnectionClosedError:
print(f"WebSocket connection closed for {instrument_key}. Attempting to reconnect...")
await asyncio.sleep(5) # Wait for 5 seconds before attempting to reconnect
except Exception as e:
print(f"An error occurred for {instrument_key}: {str(e)}. Attempting to reconnect...")
await asyncio.sleep(5) # Wait for 5 seconds before attempting to reconnect
def get_next_interval(current_time, interval):
if interval == '1m':
return current_time.replace(second=0, microsecond=0) + timedelta(minutes=1)
elif interval == '5m':
return current_time.replace(minute=current_time.minute - (current_time.minute % 5), second=0, microsecond=0) + timedelta(minutes=5)
elif interval == '15m':
return current_time.replace(minute=current_time.minute - (current_time.minute % 15), second=0, microsecond=0) + timedelta(minutes=15)
elif interval == '30m':
return current_time.replace(minute=current_time.minute - (current_time.minute % 30), second=0, microsecond=0) + timedelta(minutes=30)
elif interval == '1h':
return current_time.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1)
else:
raise ValueError(f"Unsupported interval: {interval}")
def get_json_value(key, value):
try:
with open('jsons/key.json', 'r') as file:
data = json.load(file)
# Get the content of key_1 or key_2
content = data.get(key, {})
# Get the specific value from the content
result = content.get(value)
return result
except FileNotFoundError:
print("Error: File 'key.json' not found.")
return None
except json.JSONDecodeError:
print("Error: Invalid JSON format in 'key.json'.")
return None
# Load Market_status.json
with open('jsons/Market_status.json', 'r') as f:
market_status = json.load(f)
# load setting.json
setting = {}
if os.path.exists('jsons/setting.json'):
with open('jsons/setting.json', 'r') as f:
setting = json.load(f)
# Load key.json
with open('jsons/key.json', 'r') as f:
key_data = json.load(f)
# Function to get market times for a given exchange name
def get_market_times(exchange_name):
for data in market_status["data"]:
if data["exchange"] == exchange_name:
return data["start_time"], data["end_time"]
return None, None
# Function to get values from key.json
def get_json_value(key, field):
return key_data.get(key, {}).get(field, None)
async def live_data():
token = access_token()
get_fund(token)
available_margin = get_fund_data("commodity", "available_margin")
ist = pytz.timezone('Asia/Kolkata')
current_date = datetime.now(ist).strftime("%Y-%m-%d")
market_status_response = Market_open_close(current_date)
shared_balance = available_margin
if market_status_response["status"] == "success" and "Market is open" in market_status_response["details"]:
print("Market is open. Running the script...")
# Load settings
with open('jsons/setting.json', 'r') as f:
setting = json.load(f)
# Check if stock_name_2 exists in settings
has_second_stock = 'stock_name_2' in setting and setting['stock_name_2']
# Get start and end times for key_1
start_time_1, end_time_1 = get_market_times(get_json_value('key_1', 'exchange'))
# Define the instrument configs
instrument_configs = [
{
"key": get_json_value('key_1', 'instrument_key'),
"start_time": datetime.strptime(start_time_1, "%Y-%m-%d %H:%M:%S"),
"end_time": datetime.strptime(end_time_1, "%Y-%m-%d %H:%M:%S"),
"buy_offset": setting.get('buy_offset_1', 0),
"stop_loss_offset": setting.get('stop_sell_offset_1', 0),
"timeframe": setting.get('timeframe', '15m'),
"quantity": setting.get('quantity_stock_1', 1)
}
]
if has_second_stock:
start_time_2, end_time_2 = get_market_times(get_json_value('key_2', 'exchange'))
instrument_configs.append({
"key": get_json_value('key_2', 'instrument_key'),
"start_time": datetime.strptime(start_time_2, "%Y-%m-%d %H:%M:%S"),
"end_time": datetime.strptime(end_time_2, "%Y-%m-%d %H:%M:%S"),
"buy_offset": setting.get('buy_offset_2', 0),
"stop_loss_offset": setting.get('stop_sell_offset_2', 0),
"timeframe": setting.get('timeframe', '15m'),
"quantity": setting.get('quantity_stock_2', 1)
})
# Create tasks for each instrument
tasks = []
for config in instrument_configs:
strategy = TradingStrategy(shared_balance, config["buy_offset"], config["stop_loss_offset"], config["key"], config["quantity"])
tasks.append(run_trading_strategy_for_instrument(
config["start_time"],
config["end_time"],
config["key"],
strategy,
config["buy_offset"],
config["stop_loss_offset"],
config["timeframe"]
))
# Run all tasks concurrently
await asyncio.gather(*tasks)
else:
return "Market is closed. Run it tomorrow again!"
if __name__ == "__main__":
asyncio.run(live_data())
error: