Getting “Handshake status 403 Forbidden -±±”
(venv313) PS C:\Users\KARTHIK\Desktop\CYNVORA> python -m data_ingestion.upstox_connector
INFO:UpstoxConnector:Holdings published to Kafka
INFO:UpstoxConnector:Starting Upstox Market Data Feed V3…
INFO:UpstoxConnector:SDK Stream initialized and running in a background thread.
INFO:websocket:Websocket connected
INFO:UpstoxConnector:WebSocket connected. Subscribing to 1 instruments…
INFO:UpstoxConnector:Starting Upstox Market Data Feed V3…
ERROR:UpstoxConnector:WebSocket error: Handshake status 403 Forbidden -±± {‘date’: ‘Thu, 11 Dec 2025 10:17:47 GMT’, ‘transfer-encoding’: ‘chunked’, ‘connection’: ‘keep-alive’} -±± None
ERROR:websocket:Handshake status 403 Forbidden -±± {‘date’: ‘Thu, 11 Dec 2025 10:17:47 GMT’, ‘transfer-encoding’: ‘chunked’, ‘connection’: ‘keep-alive’} -±± None - goodbye
WARNING:UpstoxConnector:WebSocket closed. Code: (None, None), Message: {}
ERROR:UpstoxConnector:WebSocket error: Handshake status 403 Forbidden -±± {‘date’: ‘Thu, 11 Dec 2025 10:17:48 GMT’, ‘transfer-encoding’: ‘chunked’, ‘connection’: ‘keep-alive’} -±± None
ERROR:websocket:Handshake status 403 Forbidden -±± {‘date’: ‘Thu, 11 Dec 2025 10:17:48 GMT’, ‘transfer-encoding’: ‘chunked’, ‘connection’: ‘keep-alive’} -±± None - goodbye
—————
I have subscribed only for one instrument and no multi server running, and web login as well only one screen.
here is the code
data_ingestion/upstox_connector.py
#install upstox-python-sdk
import asyncio
import json
import logging
from confluent_kafka import Producer
from .upstox_instruments import get_subscribed_instrument_keys
import time
import requests
# SDK IMPORTS
import threading # Required to run the synchronous SDK client in the background
import upstox_client # Main SDK module (this is the key import)
from upstox_client.api_client import ApiClient
from upstox_client.configuration import Configuration
from upstox_client.rest import ApiException # For error handling
# — SECURE CONFIGURATION IMPORT —
from config.settings import (
UPSTOX_ACCESS_TOKEN,
UPSTOX_API_KEY, # Required for ApiClient initialization
KAFKA_BROKERS,
KAFKA_TOPIC_TICKS,
KAFKA_TOPIC_HOLDINGS
)
# — END SECURE CONFIGURATION IMPORT —
# Token refresh helpers (added missing import)
from dotenv import load_dotenv, find_dotenv
from importlib import reload
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(“UpstoxConnector”)
# — Retrieve the Instrument List (Run once at module load) —
#INSTRUMENTS_TO_SUBSCRIBE = get_subscribed_instrument_keys()
INSTRUMENTS_TO_SUBSCRIBE = [“NSE_INDEX|Nifty 50”]
if not INSTRUMENTS_TO_SUBSCRIBE:
logger.critical(“CRITICAL: Instrument subscription list is empty. Exiting.”)
# Might want to raise an exception or handle this failure gracefully
# For now, we rely on the connector to handle an empty list.
# Global reference to the connector instance (needed for SDK callback access to producer)
CONNECTOR_INSTANCE = None
# — SDK CALLBACK FUNCTION (Runs in the background thread) —
def on_market_data_update(message):
“”"
SDK Callback function triggered every time a new tick arrives.
“”"
if CONNECTOR_INSTANCE is None:
logger.error(“SDK Callback triggered but CONNECTOR_INSTANCE is not set.”)
return
try:
if not isinstance(message, dict):
logger.debug(f"Non-dict message: {message}")
return
if “feeds” not in message:
logger.debug(f"Control/status message: {message}")
return
for instrument_key, feed_data in message[“feeds”].items():
ff = feed_data.get(“ff”, {})
# Index vs Equity feed
tick_data = ff.get(“indexFF”) or ff.get(“marketFF”) or ff.get(“ltpc”) or {}
if not tick_data:
continue
# Enrich with instrument key (useful downstream)
tick_data[“instrument_key”] = instrument_key
tick_data[“timestamp”] = int(time.time() * 1000) # Optional: add server time if missing
# Publish to Kafka
CONNECTOR_INSTANCE.producer.produce(
topic=KAFKA_TOPIC_TICKS,
key=instrument_key.encode(“utf-8”),
value=json.dumps(tick_data).encode(“utf-8”),
callback=CONNECTOR_INSTANCE._delivery_report,
)
CONNECTOR_INSTANCE.producer.poll(0)
logger.debug(f"Published tick → {instrument_key} | LTP: {tick_data.get(‘ltp’)}")
except Exception as e:
logger.error(f"Error in on_market_data_update: %s | Msg: %s", e, message)
# — SDK CONNECTION START (Synchronous Blocking Task) —
def start_sdk_stream_blocking(api_client, instrument_keys, reconnect_delay=5):
“”"
Synchronous function to start the SDK V3 streamer. Runs in a separate thread.
Handles reconnections with exponential backoff.
“”"
delay = reconnect_delay
while True:
try:
# 1. Initialize the V3 Market Data Streamer using the configured client
streamer = upstox_client.MarketDataStreamerV3(api_client)
def on_open():
logger.info(f"WebSocket connected. Subscribing to {len(instrument_keys)} instruments…")
# Subscribe to instruments (mode: ‘full’ for comprehensive data)
streamer.subscribe(instrument_keys, “full”)
def on_error(err):
logger.error(f"WebSocket error: {err}")
def on_close(*args, **kwargs):
# ws: WebSocket object
# close_code: Integer close status code
# close_msg: String close reason
logger.warning(f"WebSocket closed. Code: {args}, Message: {kwargs}")
# Register event handlers
streamer.on(“open”, on_open)
streamer.on(“message”, on_market_data_update) # Your callback
streamer.on(“error”, on_error)
streamer.on(“close”, on_close)
logger.info(“Starting Upstox Market Data Feed V3…”)
# 2. Connect and start listening (blocks until closed)
streamer.connect()
except ApiException as e:
logger.error(f"SDK API Exception: {e}")
except Exception as e:
logger.error(f"SDK Stream crashed: {e} — Reconnecting in {delay}s…")
finally:
time.sleep(delay)
delay = min(delay * 2, 60) # Exponential backoff, max 60s
# -------------------------------------------------------------
class UpstoxDataConnector:
“”"
Handles all communication with the Upstox API (Auth, REST, WebSocket)
and publishes data to the Kafka event backbone.
“”"
def _init_(self):
global CONNECTOR_INSTANCE
CONNECTOR_INSTANCE = self
# Use the securely loaded ACCESS TOKEN directly
self.access_token = UPSTOX_ACCESS_TOKEN
self.last_token_refresh = time.time()
# Initialize the Kafka Producer
self.producer = Producer({‘bootstrap.servers’: KAFKA_BROKERS})
async def _ensure_valid_token(self):
“”“Auto-refresh token every 12 hours (Upstox tokens live 24h)”“”
if time.time() - self.last_token_refresh > 12 * 3600: # 12 hours
logger.info(“Access token older than 12 hours → refreshing…”)
try:
load_dotenv(find_dotenv(), override=True)
from config import settings # Local import to avoid issues
reload(settings)
self.access_token = settings.UPSTOX_ACCESS_TOKEN
self.last_token_refresh = time.time()
logger.info(“Token refreshed from .env”)
except Exception as e:
logger.error(f"Token refresh failed: {e}")
def _delivery_report(self, err, msg):
if err: logger.error(f"Kafka delivery failed: {err}")
# — REST API: Holdings and Positions —
def fetch_holdings_and_publish(self):
“”“Pulls holdings via REST API and publishes to Kafka.”“”
if not self.access_token:
logger.warning(“Access token not set. Cannot fetch holdings.”)
return
try:
r = requests.get(
“https://api.upstox.com/v2/portfolio/long-term-holdings”,
headers={“Authorization”: f"Bearer {self.access_token}"}
)
r.raise_for_status()
self.producer.produce(KAFKA_TOPIC_HOLDINGS, value=json.dumps(r.json()).encode())
self.producer.poll(0)
self.producer.flush(5)
logger.info(“Holdings published to Kafka”)
except Exception as e:
logger.error(f"Failed to fetch holdings: {e}")
# — Market Data Stream (WebSocket V3) —
async def stream_market_data(self):
“”“Initiates the WebSocket connection and streams ticks to Kafka.”“”
if not self.access_token:
logger.warning(“Access token not set. Cannot stream market data.”)
return
# 1. Initialize the SDK Configuration and Client
# Use Configuration to correctly pass the token
config = upstox_client.Configuration()
config.api_key[‘api-key’] = UPSTOX_API_KEY
config.access_token = self.access_token # Set the access token here
api_client = ApiClient(config)
# 2. Start the blocking stream in a new thread
thread = threading.Thread(
target=start_sdk_stream_blocking,
args=(api_client, INSTRUMENTS_TO_SUBSCRIBE, 5), # 5 is initial reconnect delay
daemon=True
)
thread.start()
logger.info(“SDK Stream initialized and running in a background thread.”)
# 3. Keep the main asyncio loop alive indefinitely
while True:
await self._ensure_valid_token()
await asyncio.sleep(60)
if _name_ == “_main_”:
connector = UpstoxDataConnector()
try:
# Fetch holdings once (using the existing synchronous requests call)
connector.fetch_holdings_and_publish()
# Start the continuous WebSocket stream
asyncio.run(connector.stream_market_data())
except KeyboardInterrupt:
logger.info(“Connector shutting down gracefully…”)
connector.producer.flush(10)
except Exception as e:
logger.critical(f"Main execution failed: {e}")
connector.producer.flush(10)