Hello Upstox Community,
I am encountering a WebSocket connection error when running my script websocketV2.py
. The error message is as follows:
1011 websocket connection error "Error “sent 1011 (unexpected error) keepalive ping timeout; no close frame received”,“error in WebSocket connection for token …TNnQ: sent 1011 (internal error) keepalive ping timeout; no close frame received”
I suspect this may be related to WebSocket keepalive ping settings, but I would appreciate guidance on what might be causing this issue.
What I Have Implemented:
- I am using the Upstox WebSocket API v2 to connect and stream market data.
- The script maintains multiple WebSocket connections, each associated with a different access token.
- I have implemented a latency monitor that sends explicit pings and tracks the latency.
- WebSockets are established using
websockets.connect()
with the following parameters:
async with websockets.connect(
response.data.authorized_redirect_uri,
ssl=ssl_context,
ping_interval=5, # Send ping every 5 seconds
ping_timeout=10, # Wait 10 seconds for pong response
close_timeout=6,
) as websocket:
The _latency_monitor
function sends an explicit ping()
, measures the round-trip time for pong
, and logs latency data.
Issues Observed:
- After a random duration, I get the keepalive ping timeout error (
1011
). - Sometimes, the WebSocket disconnects without receiving a close frame.
- Occasionally, high latency is observed before disconnection.
- The error message includes “sent 1011 (internal error) keepalive ping timeout”, which suggests an issue with the Upstox server or my ping management.
Questions:
- Why does the WebSocket keepalive ping timeout occur even when I explicitly send pings every 5 seconds?
- What is the recommended ping interval and timeout to avoid this issue?
- Does Upstox WebSocket API enforce any rate limits on pings that could cause unexpected disconnects?
- Are there server-side keepalive mechanisms that could interfere with my explicit ping-pong mechanism?
- What is the best way to handle reconnections while maintaining minimal downtime?
I would appreciate any insights, suggestions, or best practices for managing stable WebSocket connections with Upstox.
I have attached the full script websocketV2.py
below for reference.
websocketV2.py
from asyncio import Queue
import asyncio
import json
import ssl
import os
import csv
import upstox_client
import websockets
from google.protobuf.json_format import MessageToDict
import MarketDataFeed_pb2 as pb
from pathlib import Path
import time
class UpstoxWebSocketManager:
def init(self, token_dir=“api/token”, instrument_file=“api/instrument/option_chain_contracts.csv”):
self.token_dir = Path(token_dir)
self.instrument_file = Path(instrument_file)
self.max_instruments_per_token = 99
self.tokens =
self.instrument_keys =
self.data_queue = Queue()
self.latency_data = {}
self.ping_pong_queue = Queue()
def load_access_tokens(self):
"""Load all access tokens from the token directory."""
if not self.token_dir.exists():
raise FileNotFoundError(f"Token directory {self.token_dir} not found")
token_files = list(self.token_dir.glob("*.txt"))
for token_file in token_files:
with open(token_file, 'r') as f:
token = f.read().strip()
self.tokens.append(token)
if not self.tokens:
raise ValueError("No access tokens found in the token directory")
def load_instrument_keys(self):
"""Load instrument keys from the CSV file."""
if not self.instrument_file.exists():
raise FileNotFoundError(f"Instrument file {self.instrument_file} not found")
with open(self.instrument_file, 'r') as f:
reader = csv.DictReader(f)
self.instrument_keys = [row['instrumentKey'] for row in reader]
def distribute_instruments(self):
"""Distribute instrument keys among available tokens."""
chunks = []
for i in range(0, len(self.instrument_keys), self.max_instruments_per_token):
chunk = self.instrument_keys[i:i + self.max_instruments_per_token]
chunks.append(chunk)
if len(chunks) > len(self.tokens):
print(f"Warning: Need {len(chunks)} tokens but only {len(self.tokens)} available")
chunks = chunks[:len(self.tokens)]
return list(zip(self.tokens, chunks))
def get_market_data_feed_authorize(self, api_version, configuration):
"""Get authorization for market data feed."""
api_instance = upstox_client.WebsocketApi(
upstox_client.ApiClient(configuration))
api_response = api_instance.get_market_data_feed_authorize(api_version)
return api_response
def decode_protobuf(self, buffer):
"""Decode protobuf message."""
feed_response = pb.FeedResponse()
feed_response.ParseFromString(buffer)
return feed_response
async def _latency_monitor(self, websocket, token):
"""Background task to monitor and log latency"""
try:
while True:
try:
# Measure latency using explicit ping/pong
ping_timestamp = time.monotonic()
pong_waiter = await websocket.ping()
await pong_waiter # Wait for pong response
pong_timestamp = time.monotonic()
# Calculate ping-pong latency
ping_pong_latency = pong_timestamp - ping_timestamp
# Store latency with timestamp
latency_info = {
'timestamp': time.time(),
'latency': ping_pong_latency,
'status': 'healthy'
}
self.latency_data[token[-4:]] = latency_info
# Add ping-pong data to queue
await self.ping_pong_queue.put({
'token': token[-4:],
'ping_timestamp': ping_timestamp,
'pong_timestamp': pong_timestamp,
'ping_pong_latency': ping_pong_latency,
'status': 'successful'
})
print(f"Ping-Pong Latency for ...{token[-4:]}: {ping_pong_latency:.3f}s")
# Check latency threshold (adjust 1.0 to your needs)
if ping_pong_latency > 1.0:
print(f"High ping-pong latency warning for ...{token[-4:]}: {ping_pong_latency:.3f}s")
await asyncio.sleep(20) # Check every 20 seconds
except websockets.exceptions.ConnectionClosed:
# Queue ping-pong failure data
await self.ping_pong_queue.put({
'token': token[-4:],
'ping_timestamp': ping_timestamp,
'pong_timestamp': None,
'ping_pong_latency': None,
'status': 'connection_closed'
})
break
except asyncio.CancelledError:
pass
async def handle_websocket(self, token, instruments):
"""Handle individual WebSocket connection for a token."""
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
configuration = upstox_client.Configuration()
configuration.access_token = token
api_version = '2.0'
latency_task = None
try:
response = self.get_market_data_feed_authorize(api_version, configuration)
# async with websockets.connect(response.data.authorized_redirect_uri, ssl=ssl_context) as websocket:
# Adjust ping_interval and ping_timeout here
async with websockets.connect(
response.data.authorized_redirect_uri,
ssl=ssl_context,
ping_interval=5, # Send ping every 20 seconds
ping_timeout=10, # Wait 30 seconds for pong response
close_timeout=6,
) as websocket:
print(f'Connection established for token ending in ...{token[-4:]}')
# Start latency monitoring task
latency_task = asyncio.create_task(self._latency_monitor(websocket, token))
await asyncio.sleep(1)
data = {
"guid": f"guid_{token[-4:]}",
"method": "sub",
"data": {
"mode": "full",
"instrumentKeys": instruments
}
}
binary_data = json.dumps(data).encode('utf-8')
await websocket.send(binary_data)
try:
while True:
message = await websocket.recv()
decoded_data = self.decode_protobuf(message)
data_dict = MessageToDict(decoded_data)
# Add latency information to the data
latency_info = self.latency_data.get(token[-4:], {})
data_dict['latency'] = {
'value': latency_info.get('latency'),
'status': latency_info.get('status')
}
print(f"Token ...{token[-4:]}: {json.dumps(data_dict)}")
await self.data_queue.put({
'token': token[-4:],
'data': data_dict,
'latency': latency_info
})
finally:
if latency_task and not latency_task.done():
latency_task.cancel()
try:
await latency_task
except asyncio.CancelledError:
pass
except Exception as e:
print(f"Error in WebSocket connection for token ...{token[-4:]}: {str(e)}")
self.latency_data[token[-4:]] = {
'timestamp': time.time(),
'latency': None,
'status': 'error'
}
async def get_data_queue(self):
"""Return the data queue for external access."""
return self.data_queue
async def get_ping_pong_queue(self):
"""Return the ping-pong queue for external access."""
return self.ping_pong_queue
async def run(self):
"""Run the WebSocket manager."""
self.load_access_tokens()
self.load_instrument_keys()
distribution = self.distribute_instruments()
tasks = []
for token, instruments in distribution:
task = asyncio.create_task(self.handle_websocket(token, instruments))
tasks.append(task)
await asyncio.gather(*tasks)
def main():
manager = UpstoxWebSocketManager()
asyncio.run(manager.run())
if name == “main”:
main()