# ==============================================================
#
Async Upstox Streamer (EQUITY ONLY — FINAL WORKING VERSION)
# ==============================================================
class UpstoxAsyncStreamer:
def \__init_\_(self, access_token: str):
self.access_token = access_token
self.stop_event = asyncio.Event()
\# ➤ HARD-CODED EQUITY INSTRUMENT KEYS (you asked to hard-code)
\# Use the ISIN-based instrument key (NSE_EQ|<ISIN>) — example HDFC BANK
self.equity_keys = \[
"NSE_EQ|INE040A01034", # HDFC BANK (example)
\# Add more ISIN keys if you need; keep as strings in this list
\]
\# ---------------------------
\# 🔐 GET WS AUTH URL
\# ---------------------------
async def \_authorize_get_ws_url(self):
headers = {
"Accept": "application/json",
"Authorization": f"Bearer {self.access_token}",
}
resp = requests.get(UPSTOX_AUTHORIZE_URL, headers=headers, timeout=10)
if resp.status_code != 200:
raise RuntimeError(f"WS auth failed: {resp.status_code} - {resp.text}")
data = resp.json().get("data", {})
ws_url = data.get("authorizedRedirectUri") or data.get("authorized_redirect_uri")
print("🔑 Authorized WS URL:", ws_url)
return ws_url
\# ---------------------------
\# 🔗 CONNECT + SUBSCRIBE (EQUITY full_d5)
\# ---------------------------
async def \_connect_and_listen(self):
ws_url = await self.\_authorize_get_ws_url()
\# Use unverified context to avoid hostname/verify_mode conflict
\# (GitHub examples often use a permissive SSL context because endpoints use custom cert chains)
ssl_ctx = ssl.\_create_unverified_context()
async with websockets.connect(ws_url, ssl=ssl_ctx, ping_interval=15) as ws:
print("✅ WS Connected")
\# small delay like official example
await asyncio.sleep(1)
\# -------- EQUITY SUBSCRIPTION (full_d5) --------
\# Important: mode=full_d5 + publishFormat protobuf + publishInterval 1 sec
\# This matches requirements for full-depth equity feed
sub_equity = {
"guid": "sub-equity",
"method": "sub",
"data": {
"mode": "full_d5", # full equity feed (depth + ltpc)
"instrumentKeys": self.equity_keys,
"publishFormat": "protobuf", # request protobuf binary payloads
"publishInterval": 1 # 1 second server publish interval
},
}
await ws.send(json.dumps(sub_equity))
print("📡 Sent SUB:", sub_equity)
\# LISTEN LOOP
while not self.stop_event.is_set():
try:
msg = await ws.recv()
\# ---- Binary protobuf messages ----
if isinstance(msg, (bytes, bytearray)) and PROTO_LOADED:
feed = PROTO_FEED_RESPONSE()
try:
feed.ParseFromString(msg)
except Exception as e:
\# parsing error - skip message
print("⚠️ Protobuf parse error:", e)
continue
\# feed.type == 2 is heartbeats / market info per proto
if feed.type == 2:
print("💓 Heartbeat / market_info")
continue
parsed_ticks = {}
\# iterate feeds map — keys are instrument keys
for key, data in feed.feeds.items():
ff = None
\# Expect fullFeed.marketFF for equity full feed
if data.HasField("fullFeed") and data.fullFeed is not None:
if data.fullFeed.HasField("marketFF"):
ff = data.fullFeed.marketFF
elif data.fullFeed.HasField("firstLevelWithGreeks"):
\# sometimes derivatives may appear; handle gracefully
ff = data.fullFeed.firstLevelWithGreeks
elif data.fullFeed.HasField("indexFF"):
\# improbable for equity, but be robust
ff = data.fullFeed.indexFF
if ff is None:
continue
\# ltpc contains LTP/CP/LTT/LTQ
if ff.HasField("ltpc") and ff.ltpc is not None and ff.ltpc.HasField("ltp"):
try:
ltp = float(ff.ltpc.ltp)
except Exception:
ltp = None
try:
cp = float(ff.ltpc.cp)
except Exception:
cp = None
ltq = getattr(ff.ltpc, "ltq", None)
ltt = getattr(ff.ltpc, "ltt", None)
parsed_ticks\[key\] = {
"ltp": round(ltp, 2) if ltp is not None else None,
"ltq": ltq,
"cp": round(cp, 2) if cp is not None else None,
"timestamp": ltt,
}
if parsed_ticks:
print("📈 Tick:", parsed_ticks)
try:
socketio.emit("tick_update", parsed_ticks)
except Exception as e:
print("⚠️ SocketIO emit error:", e)
\# ---- JSON text messages (subscription ACK / errors) ----
elif isinstance(msg, str):
try:
data = json.loads(msg)
except Exception:
print("📨 TEXT:", msg)
continue
\# Print subscription ack, market_info or errors
if "subscription" in msg.lower() or data.get("method") == "subAck":
print("🟢 Subscription ACK:", data)
elif data.get("type") == "market_info":
print("ℹ️ Market info / heartbeat (text):", data.get("data") if "data" in data else data)
elif "error" in data or data.get("status") == "error":
print("⚠️ WS Error (text):", data)
else:
\# other text messages
print("📨 JSON:", data)
except websockets.exceptions.ConnectionClosed:
print("🔌 WS closed — reconnecting...")
await asyncio.sleep(3)
return await self.\_connect_and_listen()
except Exception as e:
print("⚠️ Error in WS loop:", e)
\# brief backoff to avoid tight loop on persistent errors
await asyncio.sleep(1)
\# run forever with automatic reconnect on fatal exceptions
async def run_forever(self):
while not self.stop_event.is_set():
try:
await self.\_connect_and_listen()
except Exception as e:
print("💥 Streamer crashed, retrying in 5s:", e)
await asyncio.sleep(5)
# ==============================================================
#
START STREAMER (BACKGROUND THREAD)
# ==============================================================
streamer_started = False
def ensure_streamer_running():
"""Start Upstox WebSocket streamer in a background thread (asyncio)."""
global streamer_started
token = os.environ.get("UPSTOX_ACCESS_TOKEN", "").strip()
if not token or len(token) < 20:
print("⏸️ Streamer not started — no valid Upstox token.")
return
if streamer_started:
print("🟢 Upstox async streamer already running.")
return
async def start_async_streamer():
try:
print("🚀 Launching Upstox WebSocket streamer (async)...")
streamer = UpstoxAsyncStreamer(token)
await streamer.run_forever()
except Exception as e:
print("💥 Streamer crashed:", e)
def background_runner():
asyncio.run(start_async_streamer())
threading.Thread(target=background_runner, daemon=True).start()
streamer_started = True
i am not getting live ticks please check this and give me the solution