WebSocket Error 1011: Keepalive Ping Timeout – Need Help with Debugging

Hello Upstox Community,

I am encountering a WebSocket connection error when running my script websocketV2.py. The error message is as follows:

1011 websocket connection error "Error “sent 1011 (unexpected error) keepalive ping timeout; no close frame received”,“error in WebSocket connection for token …TNnQ: sent 1011 (internal error) keepalive ping timeout; no close frame received”

I suspect this may be related to WebSocket keepalive ping settings, but I would appreciate guidance on what might be causing this issue.

What I Have Implemented:

  • I am using the Upstox WebSocket API v2 to connect and stream market data.
  • The script maintains multiple WebSocket connections, each associated with a different access token.
  • I have implemented a latency monitor that sends explicit pings and tracks the latency.
  • WebSockets are established using websockets.connect() with the following parameters:

async with websockets.connect(
response.data.authorized_redirect_uri,
ssl=ssl_context,
ping_interval=5, # Send ping every 5 seconds
ping_timeout=10, # Wait 10 seconds for pong response
close_timeout=6,
) as websocket:

The _latency_monitor function sends an explicit ping(), measures the round-trip time for pong, and logs latency data.

Issues Observed:

  1. After a random duration, I get the keepalive ping timeout error (1011).
  2. Sometimes, the WebSocket disconnects without receiving a close frame.
  3. Occasionally, high latency is observed before disconnection.
  4. The error message includes “sent 1011 (internal error) keepalive ping timeout”, which suggests an issue with the Upstox server or my ping management.

Questions:

  1. Why does the WebSocket keepalive ping timeout occur even when I explicitly send pings every 5 seconds?
  2. What is the recommended ping interval and timeout to avoid this issue?
  3. Does Upstox WebSocket API enforce any rate limits on pings that could cause unexpected disconnects?
  4. Are there server-side keepalive mechanisms that could interfere with my explicit ping-pong mechanism?
  5. What is the best way to handle reconnections while maintaining minimal downtime?

I would appreciate any insights, suggestions, or best practices for managing stable WebSocket connections with Upstox.

I have attached the full script websocketV2.py below for reference.

websocketV2.py

from asyncio import Queue
import asyncio
import json
import ssl
import os
import csv
import upstox_client
import websockets
from google.protobuf.json_format import MessageToDict
import MarketDataFeed_pb2 as pb
from pathlib import Path
import time

class UpstoxWebSocketManager:
def init(self, token_dir=“api/token”, instrument_file=“api/instrument/option_chain_contracts.csv”):
self.token_dir = Path(token_dir)
self.instrument_file = Path(instrument_file)
self.max_instruments_per_token = 99
self.tokens =
self.instrument_keys =
self.data_queue = Queue()
self.latency_data = {}
self.ping_pong_queue = Queue()

def load_access_tokens(self):
    """Load all access tokens from the token directory."""
    if not self.token_dir.exists():
        raise FileNotFoundError(f"Token directory {self.token_dir} not found")
        
    token_files = list(self.token_dir.glob("*.txt"))
    for token_file in token_files:
        with open(token_file, 'r') as f:
            token = f.read().strip()
            self.tokens.append(token)
            
    if not self.tokens:
        raise ValueError("No access tokens found in the token directory")
        
def load_instrument_keys(self):
    """Load instrument keys from the CSV file."""
    if not self.instrument_file.exists():
        raise FileNotFoundError(f"Instrument file {self.instrument_file} not found")
        
    with open(self.instrument_file, 'r') as f:
        reader = csv.DictReader(f)
        self.instrument_keys = [row['instrumentKey'] for row in reader]
        
def distribute_instruments(self):
    """Distribute instrument keys among available tokens."""
    chunks = []
    for i in range(0, len(self.instrument_keys), self.max_instruments_per_token):
        chunk = self.instrument_keys[i:i + self.max_instruments_per_token]
        chunks.append(chunk)
    
    if len(chunks) > len(self.tokens):
        print(f"Warning: Need {len(chunks)} tokens but only {len(self.tokens)} available")
        chunks = chunks[:len(self.tokens)]
        
    return list(zip(self.tokens, chunks))

def get_market_data_feed_authorize(self, api_version, configuration):
    """Get authorization for market data feed."""
    api_instance = upstox_client.WebsocketApi(
        upstox_client.ApiClient(configuration))
    api_response = api_instance.get_market_data_feed_authorize(api_version)
    return api_response

def decode_protobuf(self, buffer):
    """Decode protobuf message."""
    feed_response = pb.FeedResponse()
    feed_response.ParseFromString(buffer)
    return feed_response

async def _latency_monitor(self, websocket, token):
    """Background task to monitor and log latency"""
    try:
        while True:
            try:
                # Measure latency using explicit ping/pong
                ping_timestamp = time.monotonic()
                pong_waiter = await websocket.ping()
                await pong_waiter  # Wait for pong response
                pong_timestamp = time.monotonic()
                
                # Calculate ping-pong latency
                ping_pong_latency = pong_timestamp - ping_timestamp

                # Store latency with timestamp
                latency_info = {
                    'timestamp': time.time(),
                    'latency': ping_pong_latency,
                    'status': 'healthy'
                }
                
                self.latency_data[token[-4:]] = latency_info
                
                # Add ping-pong data to queue
                await self.ping_pong_queue.put({
                    'token': token[-4:],
                    'ping_timestamp': ping_timestamp,
                    'pong_timestamp': pong_timestamp,
                    'ping_pong_latency': ping_pong_latency,
                    'status': 'successful'
                })
                
                print(f"Ping-Pong Latency for ...{token[-4:]}: {ping_pong_latency:.3f}s")
                
                # Check latency threshold (adjust 1.0 to your needs)
                if ping_pong_latency > 1.0:
                    print(f"High ping-pong latency warning for ...{token[-4:]}: {ping_pong_latency:.3f}s")
                
                await asyncio.sleep(20)  # Check every 20 seconds
                
            except websockets.exceptions.ConnectionClosed:
                # Queue ping-pong failure data
                await self.ping_pong_queue.put({
                    'token': token[-4:],
                    'ping_timestamp': ping_timestamp,
                    'pong_timestamp': None,
                    'ping_pong_latency': None,
                    'status': 'connection_closed'
                })
                break
                
    except asyncio.CancelledError:
        pass

async def handle_websocket(self, token, instruments):
    """Handle individual WebSocket connection for a token."""
    ssl_context = ssl.create_default_context()
    ssl_context.check_hostname = False
    ssl_context.verify_mode = ssl.CERT_NONE

    configuration = upstox_client.Configuration()
    configuration.access_token = token
    api_version = '2.0'

    latency_task = None

    try:
        response = self.get_market_data_feed_authorize(api_version, configuration)
        
        # async with websockets.connect(response.data.authorized_redirect_uri, ssl=ssl_context) as websocket:
        # Adjust ping_interval and ping_timeout here
        async with websockets.connect(
            response.data.authorized_redirect_uri, 
            ssl=ssl_context,
            ping_interval=5,       # Send ping every 20 seconds
            ping_timeout=10,         # Wait 30 seconds for pong response
            close_timeout=6,
        ) as websocket:
            
            print(f'Connection established for token ending in ...{token[-4:]}')

            # Start latency monitoring task
            latency_task = asyncio.create_task(self._latency_monitor(websocket, token))

            await asyncio.sleep(1)

            data = {
                "guid": f"guid_{token[-4:]}", 
                "method": "sub",
                "data": {
                    "mode": "full",
                    "instrumentKeys": instruments
                }
            }

            binary_data = json.dumps(data).encode('utf-8')
            await websocket.send(binary_data)

            try:
                while True:
                    message = await websocket.recv()
                    decoded_data = self.decode_protobuf(message)
                    data_dict = MessageToDict(decoded_data)
                    
                    # Add latency information to the data
                    latency_info = self.latency_data.get(token[-4:], {})
                    data_dict['latency'] = {
                        'value': latency_info.get('latency'),
                        'status': latency_info.get('status')
                    }
                    
                    print(f"Token ...{token[-4:]}: {json.dumps(data_dict)}")
                    
                    await self.data_queue.put({
                        'token': token[-4:],
                        'data': data_dict,
                        'latency': latency_info
                    })

            finally:
                if latency_task and not latency_task.done():
                    latency_task.cancel()
                    try:
                        await latency_task
                    except asyncio.CancelledError:
                        pass

    except Exception as e:
        print(f"Error in WebSocket connection for token ...{token[-4:]}: {str(e)}")
        self.latency_data[token[-4:]] = {
            'timestamp': time.time(),
            'latency': None,
            'status': 'error'
        }
        
async def get_data_queue(self):
    """Return the data queue for external access."""
    return self.data_queue

async def get_ping_pong_queue(self):
    """Return the ping-pong queue for external access."""
    return self.ping_pong_queue

async def run(self):
    """Run the WebSocket manager."""
    self.load_access_tokens()
    self.load_instrument_keys()
    distribution = self.distribute_instruments()
    
    tasks = []
    for token, instruments in distribution:
        task = asyncio.create_task(self.handle_websocket(token, instruments))
        tasks.append(task)
        
    await asyncio.gather(*tasks)

def main():
manager = UpstoxWebSocketManager()
asyncio.run(manager.run())

if name == “main”:
main()

@Vijay_Kumar_Sharma - We’ll follow up on this soon.

Thanks!