Here is the code
// lib/index_websocket_service.dart
import ‘dart:convert’;
import ‘dart:io’;
import ‘dart:typed_data’;
import ‘proto/marketdatafeed.pb.dart’;
typedef PriceCallback = void Function(String instrumentKey, double ltp);
class IndexWebSocketService {
final String accessToken;
WebSocket? _socket;
/// Called when we extract an LTP tick: (instrumentKey, ltp)
PriceCallback? onPriceUpdate;
IndexWebSocketService({required this.accessToken});
/// Full flow: authorize → connect → subscribe → decode
Future connect({required String instrumentKey}) async {
// 1) Authorize: GET to obtain WSS URL
final client = HttpClient();
final authUri = Uri.parse(
‘https://api.upstox.com/v3/feed/market-data-feed/authorize’,
);
HttpClientRequest req;
try {
req = await client.getUrl(authUri);
} catch (e) {
throw Exception('Network error creating auth request: $e');
}
req.headers.set('Authorization', 'Bearer $accessToken');
final authResp = await req.close();
final authBody = await utf8.decoder.bind(authResp).join();
print('Authorize status: ${authResp.statusCode}');
// debug print small portion for visibility
print('Authorize body (truncated): ${authBody.length > 300 ? authBody.substring(0, 300) + "..." : authBody}');
if (authResp.statusCode != 200) {
throw Exception('Authorize failed: ${authResp.statusCode} - $authBody');
}
dynamic authJson;
try {
authJson = jsonDecode(authBody);
} catch (e) {
throw Exception('Authorize returned non-JSON: $e');
}
// Try common fields for WSS URL
final feedUrl = authJson['data']?['authorizedRedirectUri'] ??
authJson['data']?['wss_url'] ??
authJson['data']?['wssUrl'] ??
authJson['authorizedRedirectUri'] ??
authJson['wss_url'];
if (feedUrl == null) {
throw Exception('No WSS URL found in authorize response: $authJson');
}
print('Connecting to WSS URL: $feedUrl');
// 2) Connect to WSS with header
try {
_socket = await WebSocket.connect(
feedUrl,
headers: {
'Authorization': 'Bearer $accessToken',
'apiVersion': '3.0',
},
);
} catch (e) {
throw Exception('WebSocket.connect failed: $e (feedUrl: $feedUrl)');
}
print('WebSocket connected; sending subscription for $instrumentKey');
// 3) Subscribe
final subMsg = {
'guid': 'sub-${DateTime.now().millisecondsSinceEpoch}',
'method': 'sub',
'data': {
'mode': 'full',
'instrumentKeys': [instrumentKey],
},
};
_socket!.add(jsonEncode(subMsg));
// 4) Listen and decode (binary or JSON-with-base64)
_socket!.listen((dynamic event) {
try {
if (event is String) {
_handleTextFrame(event, instrumentKey);
} else if (event is List<int>) {
_handleBinaryFrame(Uint8List.fromList(event));
} else if (event is ByteBuffer) {
_handleBinaryFrame(event.asUint8List());
} else {
print('Unknown WS event type: ${event.runtimeType}');
}
} catch (e, st) {
print('Error processing WS event: $e\n$st');
}
}, onError: (err) {
print('WebSocket error: $err');
}, onDone: () {
print('WebSocket closed by server');
});
}
void _handleTextFrame(String text, String instrumentKey) {
// Some servers send JSON with base64-encoded proto in data.feeds[INST].marketFF
try {
final jsonMsg = jsonDecode(text);
final data = jsonMsg[‘data’];
if (data == null) return;
final feeds = data['feeds'];
if (feeds != null) {
var feedEntry = feeds[instrumentKey];
if (feedEntry == null) {
// sometimes instrument key uses underscores or slightly different format
feedEntry = feeds[instrumentKey.replaceAll('|', '_')];
}
if (feedEntry != null) {
// If base64 market payload found
final base64Str = feedEntry['marketFF'] ??
feedEntry['marketFullFeed'] ??
feedEntry['market_full_feed'] ??
feedEntry['market'];
if (base64Str is String && base64Str.isNotEmpty) {
try {
final bytes = base64Decode(base64Str);
_handleBinaryFrame(Uint8List.fromList(bytes));
return;
} catch (_) {
// not base64 -> continue
}
}
// If JSON already contains ltpc object:
final ltpc = feedEntry['ltpc'];
if (ltpc != null && ltpc['ltp'] != null) {
final ltpVal = (ltpc['ltp'] as num).toDouble();
onPriceUpdate?.call(instrumentKey, ltpVal);
return;
}
}
}
} catch (_) {
// ignore non-json or unexpected
}
}
void _handleBinaryFrame(Uint8List bytes) {
// PRIMARY: FeedResponse.fromBuffer (map of instrumentKey->Feed)
try {
final feedResp = FeedResponse.fromBuffer(bytes);
// feedResp.feeds is PbMap<String, Feed> — iterate
feedResp.feeds.forEach((instrKey, feed) {
// Use dynamic access to avoid compile-time mismatch on field names
final dynFeed = feed as dynamic;
double? ltp;
// 1) Try marketFF -> ltpc -> ltp
try {
final mff = dynFeed.marketFF;
if (mff != null) {
final ltpc = mff.ltpc;
if (ltpc != null && ltpc.ltp != null) {
final v = ltpc.ltp;
ltp = (v is num) ? v.toDouble() : double.tryParse(v.toString());
}
}
} catch (_) {}
// 2) Try indexFF / marketFullFeed variants
if (ltp == null) {
try {
final alt = dynFeed.indexFF ?? dynFeed.marketFullFeed ?? dynFeed.market_ff;
if (alt != null) {
final ltpc = alt.ltpc;
if (ltpc != null && ltpc.ltp != null) {
final v = ltpc.ltp;
ltp = (v is num) ? v.toDouble() : double.tryParse(v.toString());
}
}
} catch (_) {}
}
// 3) Try feed.ltpc directly
if (ltp == null) {
try {
final ltpc = dynFeed.ltpc;
if (ltpc != null && ltpc.ltp != null) {
final v = ltpc.ltp;
ltp = (v is num) ? v.toDouble() : double.tryParse(v.toString());
}
} catch (_) {}
}
if (ltp != null) {
onPriceUpdate?.call(instrKey, ltp);
} else {
// helpful debug output for you — paste this if no ticks appear
try {
final j = dynFeed.writeToJson();
print('Decoded Feed (no ltp found) for $instrKey: $j');
} catch (_) {
print('Decoded Feed for $instrKey (could not JSONize)');
}
}
});
return;
} catch (_) {
// Not FeedResponse — try MarketFullFeed directly
}
// FALLBACK: try MarketFullFeed directly
try {
final mff = MarketFullFeed.fromBuffer(bytes);
final ltp = mff.ltpc?.ltp;
if (ltp != null) {
onPriceUpdate?.call('', (ltp is num) ? ltp.toDouble() : double.tryParse(ltp.toString())!);
} else {
try {
print('MarketFullFeed decoded but no ltp: ${mff.writeToJson()}');
} catch (_) {}
}
return;
} catch (_) {}
// Nothing matched
print('Binary decode unknown message (len=${bytes.length})');
}
Future disconnect({String? instrumentKey}) async {
if (_socket != null && instrumentKey != null) {
final unsub = {
‘guid’: ‘unsub-${DateTime.now().millisecondsSinceEpoch}’,
‘method’: ‘unsub’,
‘data’: {
‘mode’: ‘full’,
‘instrumentKeys’: [instrumentKey],
}
};
try {
socket!.add(jsonEncode(unsub));
} catch () {}
}
try {
await socket?.close();
} catch () {}
_socket = null;
}
}