Handshake status 403 Forbidden

Getting “Handshake status 403 Forbidden -±±”

(venv313) PS C:\Users\KARTHIK\Desktop\CYNVORA> python -m data_ingestion.upstox_connector
INFO:UpstoxConnector:Holdings published to Kafka
INFO:UpstoxConnector:Starting Upstox Market Data Feed V3…
INFO:UpstoxConnector:SDK Stream initialized and running in a background thread.
INFO:websocket:Websocket connected
INFO:UpstoxConnector:WebSocket connected. Subscribing to 1 instruments…
INFO:UpstoxConnector:Starting Upstox Market Data Feed V3…
ERROR:UpstoxConnector:WebSocket error: Handshake status 403 Forbidden -±± {‘date’: ‘Thu, 11 Dec 2025 10:17:47 GMT’, ‘transfer-encoding’: ‘chunked’, ‘connection’: ‘keep-alive’} -±± None
ERROR:websocket:Handshake status 403 Forbidden -±± {‘date’: ‘Thu, 11 Dec 2025 10:17:47 GMT’, ‘transfer-encoding’: ‘chunked’, ‘connection’: ‘keep-alive’} -±± None - goodbye
WARNING:UpstoxConnector:WebSocket closed. Code: (None, None), Message: {}
ERROR:UpstoxConnector:WebSocket error: Handshake status 403 Forbidden -±± {‘date’: ‘Thu, 11 Dec 2025 10:17:48 GMT’, ‘transfer-encoding’: ‘chunked’, ‘connection’: ‘keep-alive’} -±± None
ERROR:websocket:Handshake status 403 Forbidden -±± {‘date’: ‘Thu, 11 Dec 2025 10:17:48 GMT’, ‘transfer-encoding’: ‘chunked’, ‘connection’: ‘keep-alive’} -±± None - goodbye
—————

I have subscribed only for one instrument and no multi server running, and web login as well only one screen.

here is the code

data_ingestion/upstox_connector.py

#install upstox-python-sdk

import asyncio

import json

import logging

from confluent_kafka import Producer

from .upstox_instruments import get_subscribed_instrument_keys

import time

import requests

# SDK IMPORTS

import threading # Required to run the synchronous SDK client in the background

import upstox_client # Main SDK module (this is the key import)

from upstox_client.api_client import ApiClient

from upstox_client.configuration import Configuration

from upstox_client.rest import ApiException # For error handling

# — SECURE CONFIGURATION IMPORT —

from config.settings import (

UPSTOX_ACCESS_TOKEN,

UPSTOX_API_KEY, # Required for ApiClient initialization

KAFKA_BROKERS,

KAFKA_TOPIC_TICKS,

KAFKA_TOPIC_HOLDINGS

)

# — END SECURE CONFIGURATION IMPORT —

# Token refresh helpers (added missing import)

from dotenv import load_dotenv, find_dotenv

from importlib import reload

logging.basicConfig(level=logging.INFO)

logger = logging.getLogger(“UpstoxConnector”)

# — Retrieve the Instrument List (Run once at module load) —

#INSTRUMENTS_TO_SUBSCRIBE = get_subscribed_instrument_keys()

INSTRUMENTS_TO_SUBSCRIBE = [“NSE_INDEX|Nifty 50”]

if not INSTRUMENTS_TO_SUBSCRIBE:

logger.critical(“CRITICAL: Instrument subscription list is empty. Exiting.”)

# Might want to raise an exception or handle this failure gracefully

# For now, we rely on the connector to handle an empty list.

# Global reference to the connector instance (needed for SDK callback access to producer)

CONNECTOR_INSTANCE = None

# — SDK CALLBACK FUNCTION (Runs in the background thread) —

def on_market_data_update(message):

“”"

SDK Callback function triggered every time a new tick arrives.

“”"

if CONNECTOR_INSTANCE is None:

logger.error(“SDK Callback triggered but CONNECTOR_INSTANCE is not set.”)

return

try:

if not isinstance(message, dict):

logger.debug(f"Non-dict message: {message}")

return

if “feeds” not in message:

logger.debug(f"Control/status message: {message}")

return

for instrument_key, feed_data in message[“feeds”].items():

ff = feed_data.get(“ff”, {})

# Index vs Equity feed

tick_data = ff.get(“indexFF”) or ff.get(“marketFF”) or ff.get(“ltpc”) or {}

if not tick_data:

continue

# Enrich with instrument key (useful downstream)

tick_data[“instrument_key”] = instrument_key

tick_data[“timestamp”] = int(time.time() * 1000) # Optional: add server time if missing

# Publish to Kafka

CONNECTOR_INSTANCE.producer.produce(

topic=KAFKA_TOPIC_TICKS,

key=instrument_key.encode(“utf-8”),

value=json.dumps(tick_data).encode(“utf-8”),

callback=CONNECTOR_INSTANCE._delivery_report,

)

CONNECTOR_INSTANCE.producer.poll(0)

logger.debug(f"Published tick → {instrument_key} | LTP: {tick_data.get(‘ltp’)}")

except Exception as e:

logger.error(f"Error in on_market_data_update: %s | Msg: %s", e, message)

# — SDK CONNECTION START (Synchronous Blocking Task) —

def start_sdk_stream_blocking(api_client, instrument_keys, reconnect_delay=5):

“”"

Synchronous function to start the SDK V3 streamer. Runs in a separate thread.

Handles reconnections with exponential backoff.

“”"

delay = reconnect_delay

while True:

try:

# 1. Initialize the V3 Market Data Streamer using the configured client

streamer = upstox_client.MarketDataStreamerV3(api_client)

def on_open():

logger.info(f"WebSocket connected. Subscribing to {len(instrument_keys)} instruments…")

# Subscribe to instruments (mode: ‘full’ for comprehensive data)

streamer.subscribe(instrument_keys, “full”)

def on_error(err):

logger.error(f"WebSocket error: {err}")

def on_close(*args, **kwargs):

# ws: WebSocket object

# close_code: Integer close status code

# close_msg: String close reason

logger.warning(f"WebSocket closed. Code: {args}, Message: {kwargs}")

# Register event handlers

streamer.on(“open”, on_open)

streamer.on(“message”, on_market_data_update) # Your callback

streamer.on(“error”, on_error)

streamer.on(“close”, on_close)

logger.info(“Starting Upstox Market Data Feed V3…”)

# 2. Connect and start listening (blocks until closed)

streamer.connect()

except ApiException as e:

logger.error(f"SDK API Exception: {e}")

except Exception as e:

logger.error(f"SDK Stream crashed: {e} — Reconnecting in {delay}s…")

finally:

time.sleep(delay)

delay = min(delay * 2, 60) # Exponential backoff, max 60s

# -------------------------------------------------------------

class UpstoxDataConnector:

“”"

Handles all communication with the Upstox API (Auth, REST, WebSocket)

and publishes data to the Kafka event backbone.

“”"

def _init_(self):

global CONNECTOR_INSTANCE

CONNECTOR_INSTANCE = self

# Use the securely loaded ACCESS TOKEN directly

self.access_token = UPSTOX_ACCESS_TOKEN

self.last_token_refresh = time.time()

# Initialize the Kafka Producer

self.producer = Producer({‘bootstrap.servers’: KAFKA_BROKERS})

async def _ensure_valid_token(self):

“”“Auto-refresh token every 12 hours (Upstox tokens live 24h)”“”

if time.time() - self.last_token_refresh > 12 * 3600: # 12 hours

logger.info(“Access token older than 12 hours → refreshing…”)

try:

load_dotenv(find_dotenv(), override=True)

from config import settings # Local import to avoid issues

reload(settings)

self.access_token = settings.UPSTOX_ACCESS_TOKEN

self.last_token_refresh = time.time()

logger.info(“Token refreshed from .env”)

except Exception as e:

logger.error(f"Token refresh failed: {e}")

def _delivery_report(self, err, msg):

if err: logger.error(f"Kafka delivery failed: {err}")

# — REST API: Holdings and Positions —

def fetch_holdings_and_publish(self):

“”“Pulls holdings via REST API and publishes to Kafka.”“”

if not self.access_token:

logger.warning(“Access token not set. Cannot fetch holdings.”)

return

try:

r = requests.get(

https://api.upstox.com/v2/portfolio/long-term-holdings”,

headers={“Authorization”: f"Bearer {self.access_token}"}

)

r.raise_for_status()

self.producer.produce(KAFKA_TOPIC_HOLDINGS, value=json.dumps(r.json()).encode())

self.producer.poll(0)

self.producer.flush(5)

logger.info(“Holdings published to Kafka”)

except Exception as e:

logger.error(f"Failed to fetch holdings: {e}")

# — Market Data Stream (WebSocket V3) —

async def stream_market_data(self):

“”“Initiates the WebSocket connection and streams ticks to Kafka.”“”

if not self.access_token:

logger.warning(“Access token not set. Cannot stream market data.”)

return

# 1. Initialize the SDK Configuration and Client

# Use Configuration to correctly pass the token

config = upstox_client.Configuration()

config.api_key[‘api-key’] = UPSTOX_API_KEY

config.access_token = self.access_token # Set the access token here

api_client = ApiClient(config)

# 2. Start the blocking stream in a new thread

thread = threading.Thread(

target=start_sdk_stream_blocking,

args=(api_client, INSTRUMENTS_TO_SUBSCRIBE, 5), # 5 is initial reconnect delay

daemon=True

)

thread.start()

logger.info(“SDK Stream initialized and running in a background thread.”)

# 3. Keep the main asyncio loop alive indefinitely

while True:

await self._ensure_valid_token()

await asyncio.sleep(60)

if _name_ == “_main_”:

connector = UpstoxDataConnector()

try:

# Fetch holdings once (using the existing synchronous requests call)

connector.fetch_holdings_and_publish()

# Start the continuous WebSocket stream

asyncio.run(connector.stream_market_data())

except KeyboardInterrupt:

logger.info(“Connector shutting down gracefully…”)

connector.producer.flush(10)

except Exception as e:

logger.critical(f"Main execution failed: {e}")

connector.producer.flush(10)

@Pradeep_Jaiswar @Ketan can you please help me with this

Hi @NATARAJAN_5225419
The 403 Forbidden error occurs when the WebSocket connection or subscription limits are not followed.

Please review the connection and subscription limits mentioned in the official documentation below and ensure your implementation adheres to them:

Once the limits are respected, the WebSocket connection should work as expected.