WebSocket Authorization Fails with 401 When Reusing ApiClient — Same Token Works Elsewhere

Got it! Here’s a revised message that focuses only on the function-level behavior, with no references to script or file names. This version is suitable for sharing with the Upstox API team when you can’t disclose full project details:


Subject: WebSocket Authorization Fails with 401 When Reusing ApiClient — Same Token Works Elsewhere

Hi Upstox API Support Team,

I’m facing a 401 Unauthorized error while trying to establish a WebSocket connection using the authorized_redirect_uri provided by your get_market_data_feed_authorize() API. This occurs under certain conditions, even though the access token is valid and works correctly in other parts of the application.


:magnifying_glass_tilted_left: Problem Description

  • A fresh bearer token is fetched from storage, then assigned to the access_token attribute of a previously initialized upstox_client.Configuration() object.
  • The WebSocket authorization URL is retrieved successfully via get_market_data_feed_authorize("2.0").
  • However, when trying to connect using that URL, the server responds with 401 Unauthorized.
  • This only occurs when reusing a previously initialized ApiClient or Configuration object.

:white_check_mark: Working Implementation Pattern

configuration = upstox_client.Configuration()
configuration.access_token = access_token

api_client = upstox_client.ApiClient(configuration)
websocket_api = upstox_client.WebsocketApi(api_client)

auth_response = websocket_api.get_market_data_feed_authorize("2.0")
ws_url = auth_response.data.authorized_redirect_uri

async with websockets.connect(ws_url, ssl=ssl_context):
    ...

With this approach (where all components are re-created per call), the WebSocket connection works perfectly.


Failing Implementation Pattern

async def get_cmp(instruments_list):
    ACCESS_TOKEN = fetch_access_token()  # This is a synchronous call
    api_version = '2.0'
    
    # Ensure 'configuration' is properly initialized and has the token set
    global configuration # If configuration is a global object
    configuration.access_token = ACCESS_TOKEN

    auth_response_data = None
    try:
        # Run the synchronous blocking call in an executor to avoid blocking the event loop
        loop = asyncio.get_running_loop()
        api_client_instance = upstox_client.ApiClient(configuration)
        websocket_api_instance = upstox_client.WebsocketApi(api_client_instance)
        
        logging.info("Authorizing market data feed...")
        auth_response_data = await loop.run_in_executor(
            None,  # Uses the default ThreadPoolExecutor
            websocket_api_instance.get_market_data_feed_authorize,
            api_version
        )
        
        if not auth_response_data or not auth_response_data.data or not auth_response_data.data.authorized_redirect_uri:
            logging.error("Failed to get a valid authorized_redirect_uri from Upstox.")
            return []
        
        websocket_url = auth_response_data.data.authorized_redirect_uri
        logging.info(f"Successfully obtained WebSocket URL: {websocket_url}")

    except Exception as e_auth:
        logging.error(f"Error during market data feed authorization: {e_auth}", exc_info=True)
        return []

    MAX_CONNECT_ATTEMPTS = 5
    RETRY_DELAY_SECONDS = 10
    CONNECTION_OPEN_TIMEOUT_SECONDS = 30  # Increased from default 10s
    MESSAGE_RECEIVE_TIMEOUT_SECONDS = 30  # Timeout for receiving the first message

    for attempt in range(1, MAX_CONNECT_ATTEMPTS + 1):
        try:
            logging.info(f"Attempting WebSocket connection ({attempt}/{MAX_CONNECT_ATTEMPTS}) to {websocket_url}")
            print('websocket_url:', websocket_url)
            print('ssl_context:', ssl_context)
            async with websockets.connect(
                websocket_url,
                ssl=ssl_context, # Make sure ssl_context is defined and valid
                open_timeout=CONNECTION_OPEN_TIMEOUT_SECONDS
            ) as websocket:
                logging.info("WebSocket connection established successfully.")

                subscription_data = {
                    "guid": f"strategy_run_{uuid.uuid4()}", # More unique GUID
                    "method": "sub",
                    "data": {
                        "mode": "ltpc",
                        "instrumentKeys": instruments_list
                    }
                }
                
                await websocket.send(json.dumps(subscription_data).encode('utf-8'))
                logging.info(f"Subscription message sent for {len(instruments_list)} instruments.")

                # Wait for a response with a timeout
                logging.info(f"Waiting for message from WebSocket (timeout: {MESSAGE_RECEIVE_TIMEOUT_SECONDS}s)...")
                message = await asyncio.wait_for(websocket.recv(), timeout=MESSAGE_RECEIVE_TIMEOUT_SECONDS)
                logging.info("Message received from WebSocket.")
                
                decoded_data = decode_protobuf(message)
                data_dict = MessageToDict(decoded_data)

                cmp_list = []
                feeds = data_dict.get('feeds', {})
                if not feeds:
                    logging.warning("Received data but 'feeds' field is missing or empty.")
                
                for instrument_key, feed_data in feeds.items():
                    ltpc_data = feed_data.get('ltpc', {})
                    if not ltpc_data:
                        logging.warning(f"No 'ltpc' data for instrument_key: {instrument_key}")
                        continue
                        
                    ltp = ltpc_data.get('ltp')
                    if ltp is not None:
                        try:
                            ltp_float = float(ltp)
                            cmp = {"instrument_key": instrument_key, "ltp": ltp_float}
                            cmp_list.append(cmp)
                            # logging.debug(f"Processed LTP for {instrument_key}: {ltp_float}") # Use if very verbose logging needed
                        except ValueError:
                            logging.error(f"Could not convert LTP '{ltp}' to float for {instrument_key}")
                    else:
                        logging.warning(f"LTP is None for {instrument_key} in ltpc data: {ltpc_data}")
                
                if not cmp_list and instruments_list:
                    logging.warning(f"No LTP values were successfully parsed. Instruments queried: {len(instruments_list)}. Full response dictionary: {data_dict}")
                else:
                    logging.info(f"Successfully processed {len(cmp_list)} CMP values.")
                
                return cmp_list  # Success, exit function

        except asyncio.TimeoutError:
            logging.error(f"WebSocket operation timed out on attempt {attempt}/{MAX_CONNECT_ATTEMPTS}. (Open: {CONNECTION_OPEN_TIMEOUT_SECONDS}s, Recv: {MESSAGE_RECEIVE_TIMEOUT_SECONDS}s)")
        except websockets.exceptions.InvalidStatusCode as e_status:
            logging.error(f"WebSocket connection failed: Invalid status code {e_status.status_code} on attempt {attempt}. Headers: {e_status.headers}")
        except websockets.exceptions.WebSocketException as e_ws:
            logging.error(f"A WebSocket exception occurred on attempt {attempt}: {e_ws}", exc_info=True)
        except ConnectionRefusedError:
            logging.error(f"WebSocket connection refused by server on attempt {attempt} to {websocket_url}.")
        except Exception as e:
            logging.error(f"An unexpected error occurred in get_cmp on attempt {attempt}: {e}", exc_info=True)

        if attempt < MAX_CONNECT_ATTEMPTS:
            logging.info(f"Waiting {RETRY_DELAY_SECONDS} seconds before next attempt...")
            await asyncio.sleep(RETRY_DELAY_SECONDS)
        else:
            logging.error("All WebSocket attempts failed.")
            return [] # Failed after all retries

    return [] # Should be technically unreachable if loop logic is correct

Despite updating .access_token, the connection fails with 401. I suspect this is because the SDK internally caches the Authorization header when ApiClient is first constructed, and later changes to configuration.access_token do not refresh the actual headers used.


Additional Observations

  • If I retry the connection using the same authorized_redirect_uri, I continue getting 401s.
  • If I regenerate a new Configuration and ApiClient, and then request a new authorized_redirect_uri, the connection succeeds.
  • REST API endpoints such as GET /user/profile using the same bearer token work fine, indicating the token itself is valid.

Questions

  1. Is the authorized_redirect_uri a single-use URI? If a handshake fails once, is the URI invalidated?
  2. Is the Authorization header cached at ApiClient creation time in the SDK? Should we avoid reusing the same client instance after updating the token?
  3. Is there a best-practice you recommend for refreshing tokens and reconnecting sockets safely?

Workaround (currently using)

To avoid the 401, I’ve switched to:

  • Creating a new Configuration, ApiClient, and WebsocketApi for each WebSocket session.
  • Requesting a fresh authorized_redirect_uri for every connection attempt.

Would appreciate your input to confirm whether this is the intended usage pattern or if there’s a more efficient way.