I am tried of working to fix the issue of properly getting the accurate Heikin-Ashi in python.
I get some times incorrect data when I compare my HA candle to the HA candle here.
Upstox.
Mostly I get wrong calculation in the opening,closing and low data so how to fix this.
there will be some difference in them
code:
import asyncio
import json
import ssl
import upstox_client
import websockets
from google.protobuf.json_format import MessageToDict
from live_market_data_strategy import access_token_fn
import MarketDataFeed_pb2 as pb
from datetime import datetime, timedelta
import pandas as pd
class HeikinAshiConverter:
def __init__(self):
self.previous_ha = None
self.is_first_candle = True
def convert(self, ohlc):
ha_close = (ohlc['open'] + ohlc['high'] + ohlc['low'] + ohlc['close']) / 4
if self.is_first_candle:
ha_open = ohlc['open'] # Use the original open for the first candle
self.is_first_candle = False
else:
ha_open = (self.previous_ha['open'] + self.previous_ha['close']) / 2
ha_high = max(ohlc['high'], ha_open, ha_close)
ha_low = min(ohlc['low'], ha_open, ha_close)
ha_candle = {
'open': ha_open,
'high': ha_high,
'low': ha_low,
'close': ha_close,
'volume': ohlc.get('volume', 0),
'datetime': ohlc['datetime']
}
self.previous_ha = ha_candle
return ha_candle
def get_market_data_feed_authorize(api_version, configuration):
api_instance = upstox_client.WebsocketApi(upstox_client.ApiClient(configuration))
api_response = api_instance.get_market_data_feed_authorize(api_version)
return api_response
def decode_protobuf(buffer):
feed_response = pb.FeedResponse()
feed_response.ParseFromString(buffer)
return feed_response
def convert_timestamp(ts):
timestamp_seconds = int(ts) / 1000
dt_object = datetime.fromtimestamp(timestamp_seconds)
return dt_object.strftime("%Y-%m-%d %H:%M:%S")
def extract_ohlc_data(data_dict):
try:
feeds = data_dict['feeds']
instrument_key = next(iter(feeds))
ohlc_data = feeds[instrument_key]['ff']['marketFF']['marketOHLC']['ohlc']
one_minute_data = next((item for item in ohlc_data if item['interval'] == 'I1'), None)
if one_minute_data:
one_minute_data['datetime'] = convert_timestamp(one_minute_data['ts'])
return {
'instrument_key': instrument_key,
'one_minute_ohlc': one_minute_data
}
except (KeyError, StopIteration):
pass
return None
async def run_data_converter():
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
configuration = upstox_client.Configuration()
api_version = '2.0'
configuration.access_token = access_token_fn()
response = get_market_data_feed_authorize(api_version, configuration)
ha_converter = HeikinAshiConverter()
last_print_time = None
async with websockets.connect(response.data.authorized_redirect_uri, ssl=ssl_context) as websocket:
print('Connection established')
await asyncio.sleep(1)
data = {
"guid": "someguid",
"method": "sub",
"data": {
"mode": "full",
"instrumentKeys": ["MCX_FO|434267"] # You can modify this instrument key as needed
}
}
binary_data = json.dumps(data).encode('utf-8')
await websocket.send(binary_data)
while True:
message = await websocket.recv()
decoded_data = decode_protobuf(message)
data_dict = MessageToDict(decoded_data)
ohlc_data = extract_ohlc_data(data_dict)
if ohlc_data:
current_time = datetime.strptime(ohlc_data['one_minute_ohlc']['datetime'], "%Y-%m-%d %H:%M:%S")
if last_print_time is None or current_time >= last_print_time + timedelta(minutes=1):
print(f"\nData for {current_time.strftime('%Y-%m-%d %H:%M:%S')}:")
ohlc_df = pd.DataFrame([ohlc_data['one_minute_ohlc']])
print("Original OHLC data:")
print(ohlc_df)
ha_candle = ha_converter.convert(ohlc_data['one_minute_ohlc'])
ha_df = pd.DataFrame([ha_candle])
print("\nHeikin-Ashi candle:")
print(ha_df)
last_print_time = current_time
if __name__ == "__main__":
asyncio.run(run_data_converter())