i have been using the websocket market data streamer v2 fr long time it was working fine bt now when i change to Datastreamer V3 it is saying D TextToSpeech initialized successfully
18:09:29.226 MarketFeederService com.example.marketdepth E Failed to fetch WebSocket URL. Code: 401
18:09:29.236 MarketFeederService com.example.marketdepth D Authorized WebSocket URL: wss://wsfeeder-api.upstox.com/market-data-feeder/v3/upstox-developer-api/feeds?requestId=c5a99ac3-2588-4149-bafd-727d9e0dd395&code=LAoJK-9dbaba04-5152-4a69-9403-b59afaf0345b
18:09:30.476 MarketFeederService com.example.marketdepth D Disconnected from WebSocket. Code: 1002, Reason: Invalid status code received: 403 Status line: HTTP/1.1 403 Forbidden
18:09:30.487 MarketFeederService com.example.marketdepth D Unexpected disconnection. Attempting to reconnect…
18:09:35.502 MarketFeederService com.example.marketdepth E Error while reconnecting: null
The entire setup is the same it was working when using V2 Marketdatastreamer.
private void fetchAuthorizedWebSocketURL() {
private void fetchAuthorizedWebSocketURL() {
OkHttpClient httpClient = new OkHttpClient();
Request request = new Request.Builder().url(AUTH_WS_URI).addHeader(“Authorization”, "Bearer " + accessToken).addHeader(“Accept”, “application/json”).build();
httpClient.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
Log.e("MarketFeederService", "Error fetching WebSocket URL: " + e.getMessage());
}
@Override
public void onResponse(Call call, Response response) throws IOException {
if (response.isSuccessful()) {
String responseBody = response.body().string();
String wsURL = new Gson().fromJson(responseBody, JsonObject.class).getAsJsonObject("data").get("authorized_redirect_uri").getAsString();
Log.d("MarketFeederService", "Authorized WebSocket URL: " + wsURL);
connectWebSocket(wsURL);
} else {
Log.e("MarketFeederService", "Failed to fetch WebSocket URL. Code: " + response.code());
}
}
});
}
private boolean isConnected = false;
private void connectWebSocket(String wsURL) {
ApiClient defaultClient = Configuration.getDefaultApiClient();
OAuth oAuth = (OAuth) defaultClient.getAuthentication("OAUTH2");
oAuth.setAccessToken(accessToken);
MarketDataStreamerV3 marketDataStreamer = new MarketDataStreamerV3(defaultClient, instrumentKeys, Mode.FULL);
marketDataStreamer.autoReconnect(true, 10, 5);
marketDataStreamer.setOnOpenListener(() -> {
Log.d("MarketFeederService", "Connected to WebSocket");
showNotification("WebSocket Connected", "Successfully connected to the WebSocket");
Calendar calendar = Calendar.getInstance();
int hourOfDay = calendar.get(Calendar.HOUR_OF_DAY);
String greeting;
String notification = "Webscoket Successfully Connected";
if (textToSpeech != null) {
Log.d("MarketFeederService", "Attempting to speak: " + notification);
int speakResult = textToSpeech.speak(notification, TextToSpeech.QUEUE_FLUSH, null, null);
Log.d("MarketFeederService", "Speak result: " + speakResult);
} else {
Log.e("MarketFeederService", "TextToSpeech is null");
}
showNotification("WebSocket Connected", "Successfully connected to the WebSocket");
isConnected = true; // Update connection status
Log.d("MarketFeederService", "WebSocket Connected");
});
marketDataStreamer.setOnMarketUpdateListener(new OnMarketUpdateV3Listener() {
@Override
public void onUpdate(MarketUpdateV3 marketUpdate) {
try {
// Log the incoming update
Log.d("MarketFeederService", "Received MarketUpdate: " + marketUpdate);
// Step 1: Process feeds in MarketUpdate
Map<String, MarketUpdateV3.Feed> feeds = marketUpdate.getFeeds();
if (feeds == null || feeds.isEmpty()) {
Log.d("MarketFeederService", "No feeds available in MarketUpdate.");
return;
}
// Step 2: Iterate over the feeds and process relevant instruments
for (Map.Entry<String, MarketUpdateV3.Feed> entry : feeds.entrySet()) {
String instrumentKey = entry.getKey();
MarketUpdateV3.Feed feed = entry.getValue();
// Only process if the instrument matches the subscribed key
if (currentInstrumentKeys != null && currentInstrumentKeys.equals(instrumentKey)) {
Log.d("MarketFeederService", "Processing subscribed instrument: " + instrumentKey);
processInstrument(feed, instrumentKey);
} else {
Log.d("MarketFeederService", "Skipping instrument: " + instrumentKey);
}
}
} catch (Exception e) {
Log.e("MarketFeederService", "Error processing MarketUpdate: ", e);
}
}
});
marketDataStreamer.setOnCloseListener((code, reason) -> {
Log.d("MarketFeederService", "Disconnected from WebSocket. Code: " + code + ", Reason: " + reason);
showNotification("WebSocket Disconnected", "Connection lost. Reason: " + reason);
isConnected = false; // Update connection status
// Reconnect only if the disconnection was unexpected
if (code != 1000) { // Code 1000 indicates normal closure
Log.d("MarketFeederService", "Unexpected disconnection. Attempting to reconnect...");
new Handler(Looper.getMainLooper()).postDelayed(() -> {
if (!isConnected) { // Only reconnect if not connected
try {
marketDataStreamer.connect();
} catch (Exception e) {
Log.e("MarketFeederService", "Error while reconnecting: " + e.getMessage());
}
}
}, 5000); // 5-second delay before attempting to reconnect
}
});
marketDataStreamer.setOnErrorListener(ex -> {
Log.e("MarketFeederService", "WebSocket Error: " + ex.getMessage());
});
try {
if (!isConnected) { // Only connect if not already connected
marketDataStreamer.connect();
} else {
Log.d("MarketFeederService", "Already connected to WebSocket.");
}
} catch (Exception e) {
Log.e("MarketFeederService", "WebSocket connection error: " + e.getMessage());
}
}