Got it! Here’s a revised message that focuses only on the function-level behavior, with no references to script or file names. This version is suitable for sharing with the Upstox API team when you can’t disclose full project details:
Subject: WebSocket Authorization Fails with 401 When Reusing ApiClient — Same Token Works Elsewhere
Hi Upstox API Support Team,
I’m facing a 401 Unauthorized
error while trying to establish a WebSocket connection using the authorized_redirect_uri
provided by your get_market_data_feed_authorize()
API. This occurs under certain conditions, even though the access token is valid and works correctly in other parts of the application.
Problem Description
- A fresh bearer token is fetched from storage, then assigned to the
access_token
attribute of a previously initializedupstox_client.Configuration()
object. - The WebSocket authorization URL is retrieved successfully via
get_market_data_feed_authorize("2.0")
. - However, when trying to connect using that URL, the server responds with 401 Unauthorized.
- This only occurs when reusing a previously initialized
ApiClient
orConfiguration
object.
Working Implementation Pattern
configuration = upstox_client.Configuration()
configuration.access_token = access_token
api_client = upstox_client.ApiClient(configuration)
websocket_api = upstox_client.WebsocketApi(api_client)
auth_response = websocket_api.get_market_data_feed_authorize("2.0")
ws_url = auth_response.data.authorized_redirect_uri
async with websockets.connect(ws_url, ssl=ssl_context):
...
With this approach (where all components are re-created per call), the WebSocket connection works perfectly.
Failing Implementation Pattern
async def get_cmp(instruments_list):
ACCESS_TOKEN = fetch_access_token() # This is a synchronous call
api_version = '2.0'
# Ensure 'configuration' is properly initialized and has the token set
global configuration # If configuration is a global object
configuration.access_token = ACCESS_TOKEN
auth_response_data = None
try:
# Run the synchronous blocking call in an executor to avoid blocking the event loop
loop = asyncio.get_running_loop()
api_client_instance = upstox_client.ApiClient(configuration)
websocket_api_instance = upstox_client.WebsocketApi(api_client_instance)
logging.info("Authorizing market data feed...")
auth_response_data = await loop.run_in_executor(
None, # Uses the default ThreadPoolExecutor
websocket_api_instance.get_market_data_feed_authorize,
api_version
)
if not auth_response_data or not auth_response_data.data or not auth_response_data.data.authorized_redirect_uri:
logging.error("Failed to get a valid authorized_redirect_uri from Upstox.")
return []
websocket_url = auth_response_data.data.authorized_redirect_uri
logging.info(f"Successfully obtained WebSocket URL: {websocket_url}")
except Exception as e_auth:
logging.error(f"Error during market data feed authorization: {e_auth}", exc_info=True)
return []
MAX_CONNECT_ATTEMPTS = 5
RETRY_DELAY_SECONDS = 10
CONNECTION_OPEN_TIMEOUT_SECONDS = 30 # Increased from default 10s
MESSAGE_RECEIVE_TIMEOUT_SECONDS = 30 # Timeout for receiving the first message
for attempt in range(1, MAX_CONNECT_ATTEMPTS + 1):
try:
logging.info(f"Attempting WebSocket connection ({attempt}/{MAX_CONNECT_ATTEMPTS}) to {websocket_url}")
print('websocket_url:', websocket_url)
print('ssl_context:', ssl_context)
async with websockets.connect(
websocket_url,
ssl=ssl_context, # Make sure ssl_context is defined and valid
open_timeout=CONNECTION_OPEN_TIMEOUT_SECONDS
) as websocket:
logging.info("WebSocket connection established successfully.")
subscription_data = {
"guid": f"strategy_run_{uuid.uuid4()}", # More unique GUID
"method": "sub",
"data": {
"mode": "ltpc",
"instrumentKeys": instruments_list
}
}
await websocket.send(json.dumps(subscription_data).encode('utf-8'))
logging.info(f"Subscription message sent for {len(instruments_list)} instruments.")
# Wait for a response with a timeout
logging.info(f"Waiting for message from WebSocket (timeout: {MESSAGE_RECEIVE_TIMEOUT_SECONDS}s)...")
message = await asyncio.wait_for(websocket.recv(), timeout=MESSAGE_RECEIVE_TIMEOUT_SECONDS)
logging.info("Message received from WebSocket.")
decoded_data = decode_protobuf(message)
data_dict = MessageToDict(decoded_data)
cmp_list = []
feeds = data_dict.get('feeds', {})
if not feeds:
logging.warning("Received data but 'feeds' field is missing or empty.")
for instrument_key, feed_data in feeds.items():
ltpc_data = feed_data.get('ltpc', {})
if not ltpc_data:
logging.warning(f"No 'ltpc' data for instrument_key: {instrument_key}")
continue
ltp = ltpc_data.get('ltp')
if ltp is not None:
try:
ltp_float = float(ltp)
cmp = {"instrument_key": instrument_key, "ltp": ltp_float}
cmp_list.append(cmp)
# logging.debug(f"Processed LTP for {instrument_key}: {ltp_float}") # Use if very verbose logging needed
except ValueError:
logging.error(f"Could not convert LTP '{ltp}' to float for {instrument_key}")
else:
logging.warning(f"LTP is None for {instrument_key} in ltpc data: {ltpc_data}")
if not cmp_list and instruments_list:
logging.warning(f"No LTP values were successfully parsed. Instruments queried: {len(instruments_list)}. Full response dictionary: {data_dict}")
else:
logging.info(f"Successfully processed {len(cmp_list)} CMP values.")
return cmp_list # Success, exit function
except asyncio.TimeoutError:
logging.error(f"WebSocket operation timed out on attempt {attempt}/{MAX_CONNECT_ATTEMPTS}. (Open: {CONNECTION_OPEN_TIMEOUT_SECONDS}s, Recv: {MESSAGE_RECEIVE_TIMEOUT_SECONDS}s)")
except websockets.exceptions.InvalidStatusCode as e_status:
logging.error(f"WebSocket connection failed: Invalid status code {e_status.status_code} on attempt {attempt}. Headers: {e_status.headers}")
except websockets.exceptions.WebSocketException as e_ws:
logging.error(f"A WebSocket exception occurred on attempt {attempt}: {e_ws}", exc_info=True)
except ConnectionRefusedError:
logging.error(f"WebSocket connection refused by server on attempt {attempt} to {websocket_url}.")
except Exception as e:
logging.error(f"An unexpected error occurred in get_cmp on attempt {attempt}: {e}", exc_info=True)
if attempt < MAX_CONNECT_ATTEMPTS:
logging.info(f"Waiting {RETRY_DELAY_SECONDS} seconds before next attempt...")
await asyncio.sleep(RETRY_DELAY_SECONDS)
else:
logging.error("All WebSocket attempts failed.")
return [] # Failed after all retries
return [] # Should be technically unreachable if loop logic is correct
Despite updating .access_token
, the connection fails with 401. I suspect this is because the SDK internally caches the Authorization header when ApiClient
is first constructed, and later changes to configuration.access_token
do not refresh the actual headers used.
Additional Observations
- If I retry the connection using the same
authorized_redirect_uri
, I continue getting 401s. - If I regenerate a new Configuration and ApiClient, and then request a new authorized_redirect_uri, the connection succeeds.
- REST API endpoints such as
GET /user/profile
using the same bearer token work fine, indicating the token itself is valid.
Questions
- Is the
authorized_redirect_uri
a single-use URI? If a handshake fails once, is the URI invalidated? - Is the Authorization header cached at
ApiClient
creation time in the SDK? Should we avoid reusing the same client instance after updating the token? - Is there a best-practice you recommend for refreshing tokens and reconnecting sockets safely?
Workaround (currently using)
To avoid the 401, I’ve switched to:
- Creating a new Configuration, ApiClient, and WebsocketApi for each WebSocket session.
- Requesting a fresh authorized_redirect_uri for every connection attempt.
Would appreciate your input to confirm whether this is the intended usage pattern or if there’s a more efficient way.