Line data Source code
1 : library endpoint_utils;
2 :
3 : import 'package:walletkit_dart/src/common/logger.dart';
4 : import 'package:walletkit_dart/src/domain/entities/coin_entity.dart';
5 : import 'package:walletkit_dart/src/domain/predefined_assets.dart';
6 : import 'package:walletkit_dart/src/domain/exceptions.dart';
7 : import 'package:walletkit_dart/src/crypto/utxo/repositories/electrum_json_rpc_client.dart';
8 : import 'package:walletkit_dart/src/crypto/utxo/repositories/json_rpc_client.dart';
9 : import 'package:walletkit_dart/src/domain/extensions.dart';
10 :
11 9 : Future<List<(String, int)>> getBestHealthEndpointsWithRetry({
12 : required Iterable<(String, int)> endpointPool,
13 : required CoinEntity token,
14 : int max = 10,
15 : int min = 2,
16 : int maxRetries = 4,
17 : Duration maxLatency = const Duration(milliseconds: 800),
18 : }) async {
19 : final selectedEndpoints = <(String, int)>{};
20 : var retries = 0;
21 : var lat = maxLatency;
22 :
23 : while (true) {
24 9 : final endpoints = await _getBestHealthEndpoints(
25 : endpoints: endpointPool,
26 : token: token,
27 : maxClients: max,
28 : maxLatency: lat,
29 : );
30 :
31 9 : selectedEndpoints.addAll(endpoints);
32 :
33 19 : if (selectedEndpoints.length >= min || retries >= maxRetries) break;
34 :
35 1 : retries++;
36 1 : lat *= 1.5;
37 : }
38 :
39 18 : if (selectedEndpoints.length < min) {
40 0 : throw Exception(
41 0 : "Could not find enough healthy ElectrumX endpoints for ${token.symbol}",
42 : );
43 : }
44 :
45 9 : Logger.log(
46 36 : "Selected ${selectedEndpoints.length} ElectrumX endpoints for ${token.symbol} after $retries retries with maxLatency: ${lat.inMilliseconds}ms",
47 : );
48 :
49 9 : return selectedEndpoints.toList();
50 : }
51 :
52 9 : Future<List<(String, int)>> _getBestHealthEndpoints({
53 : required Iterable<(String, int)> endpoints,
54 : required CoinEntity token,
55 : int maxClients = 10,
56 : Duration maxLatency = const Duration(milliseconds: 1500),
57 : }) async {
58 18 : final latencies = await Future.wait([
59 9 : for (final entry in endpoints)
60 9 : getResponseTimeForHost(
61 : entry,
62 : maxLatency: maxLatency,
63 : token: token,
64 9 : ).timeout(
65 9 : maxLatency * 5,
66 7 : onTimeout: () {
67 14 : print("Creat Client Timeout for ${entry}");
68 : return null;
69 : },
70 : )
71 : ]);
72 :
73 9 : final latencyMap = {
74 27 : for (int i = 0; i < endpoints.length; i++)
75 36 : if (latencies[i] != null) endpoints[i]: latencies[i]!
76 : };
77 :
78 : /// Sort after latency
79 9 : final sorted = latencyMap.entries
80 45 : .where((entry) => entry.value.$1 <= maxLatency.inMilliseconds)
81 9 : .toList()
82 45 : ..sort((a, b) => a.value.$1.compareTo(b.value.$1));
83 :
84 9 : if (sorted.isEmpty) {
85 0 : return [];
86 : }
87 :
88 9 : final bockheightMap = <int, List<(String, int)>>{};
89 18 : for (final entry in sorted) {
90 9 : final blockHeight = entry.value.$2;
91 18 : bockheightMap[blockHeight] = [
92 18 : ...bockheightMap[blockHeight] ?? [],
93 9 : entry.key
94 : ];
95 : }
96 :
97 : /// Take the Biggest Endpoint List
98 18 : final mostForBlockHeight = bockheightMap.values.reduce(
99 12 : (a, b) => a.length > b.length ? a : b,
100 : );
101 :
102 18 : return mostForBlockHeight.take(maxClients).toList();
103 : }
104 :
105 9 : Future<(double, int)?> getResponseTimeForHost(
106 : final (String, int) host, {
107 : required Duration maxLatency,
108 : required CoinEntity token,
109 : }) async {
110 18 : Stopwatch stopwatch = Stopwatch()..start();
111 9 : final client = await createElectrumXClient(
112 : endpoint: host.$1,
113 : port: host.$2,
114 : token: token,
115 : );
116 : if (client == null) return null;
117 : final int? block;
118 : try {
119 9 : block = await client.getCurrentBlock();
120 : } catch (e) {
121 10 : print("Error: $e");
122 : return null;
123 : }
124 :
125 9 : await client.disconnect();
126 9 : stopwatch.stop();
127 :
128 : /// If the block is null the Host doenst provide a valid response
129 : if (block == null) return null;
130 :
131 36 : return (stopwatch.elapsed.inMilliseconds.toDouble() / 2, block);
132 : }
133 :
134 7 : Future<List<ElectrumXClient>> createClients({
135 : required List<(String, int)> endpoints,
136 : required CoinEntity token,
137 : }) async {
138 14 : return (await Future.wait([
139 7 : for (final entry in endpoints)
140 7 : createElectrumXClient(
141 : endpoint: entry.$1,
142 : port: entry.$2,
143 : token: token,
144 7 : ).timeout(
145 : const Duration(seconds: 5),
146 0 : onTimeout: () {
147 0 : print("Creat Client Timeout for ${entry}");
148 : return null;
149 : },
150 : )
151 : ]))
152 7 : .whereType<ElectrumXClient>()
153 7 : .toList();
154 : }
155 :
156 13 : Future<ElectrumXClient?> createElectrumXClient({
157 : required String endpoint,
158 : required int port,
159 : required CoinEntity token,
160 : }) async {
161 13 : final tcpJsonRpcClient = TcpJsonRpcClient(
162 13 : isZeniq: token == zeniqCoin,
163 : host: endpoint,
164 : port: port,
165 : );
166 13 : final success = await tcpJsonRpcClient.connected;
167 13 : if (success == false) return null;
168 13 : return ElectrumXClient(tcpJsonRpcClient);
169 : }
170 :
171 0 : Future<ElectrumXClient?> createRandomElectrumXClient({
172 : required List<(String, int)> endpoints,
173 : required List<(String, int)> excludedEndpoints,
174 : required CoinEntity token,
175 : }) async {
176 0 : endpoints.shuffle();
177 :
178 0 : for (final endpoint in endpoints) {
179 0 : final client = await createElectrumXClient(
180 : endpoint: endpoint.$1,
181 : port: endpoint.$2,
182 : token: token,
183 : );
184 : if (client == null) continue;
185 0 : endpoints.remove(endpoint);
186 0 : excludedEndpoints.add(endpoint);
187 : return client;
188 : }
189 :
190 : return null;
191 : }
192 :
193 12 : Future<(T?, ElectrumXClient?, NoWorkingHostsException?)>
194 : fetchFromRandomElectrumXNode<T>(
195 : Future<T> Function(ElectrumXClient) fetchFunction, {
196 : required ElectrumXClient? client,
197 : required Iterable<(String, int)> endpoints,
198 : required CoinEntity token,
199 : Duration timeout = const Duration(milliseconds: 3000),
200 : bool cleanup = true,
201 : }) async {
202 : try {
203 10 : if (client == null) throw ClientNullException("Client is null");
204 16 : final result = await fetchFunction(client).timeout(timeout);
205 : return (result, client, null);
206 : } catch (e, s) {
207 10 : if (e is! ClientNullException) {
208 0 : Logger.logWarning(
209 0 : "ElectrumX fetch failed for initial ${client?.host}. Trying new Hosts. $e $s",
210 : );
211 : }
212 :
213 0 : client?.disconnect();
214 :
215 : /// If the fetch failed, try to fetch with a random client
216 20 : final endpoints0 = List.of(endpoints, growable: true)..shuffle();
217 :
218 10 : final errors = <String>[];
219 :
220 20 : for (final endpoint in endpoints0) {
221 10 : final client = await createElectrumXClient(
222 : endpoint: endpoint.$1,
223 : port: endpoint.$2,
224 : token: token,
225 : );
226 : if (client == null) continue;
227 : try {
228 20 : final result = await fetchFunction(client).timeout(timeout);
229 10 : if (cleanup) client.disconnect();
230 : return (result, client, null);
231 : } catch (e, s) {
232 2 : client.disconnect();
233 4 : errors.add("$e $s");
234 6 : Logger.logWarning("ElectrumX fetch failed for ${client.host}: $e");
235 : }
236 : }
237 :
238 : return (
239 : null,
240 : null,
241 0 : NoWorkingHostsException(
242 0 : "Could not fetch from any endpoint: $endpoints $errors",
243 : )
244 : );
245 : }
246 : }
247 :
248 0 : Future<(T?, Object?)> fetchFromNode<T>(
249 : Future<T> Function(ElectrumXClient) fetchFunction, {
250 : required ElectrumXClient client,
251 : Duration timeout = const Duration(milliseconds: 3000),
252 : bool cleanup = false,
253 : }) async {
254 : try {
255 0 : final T result = await fetchFunction(client).timeout(timeout);
256 0 : if (cleanup) client.disconnect();
257 : return (result, null);
258 0 : } on Exception catch (e, _) {
259 0 : Logger.logWarning(
260 0 : "ElectrumX fetch failed for ${client.host}. $e",
261 : );
262 :
263 0 : if (cleanup) client.disconnect();
264 : return (null, e);
265 : }
266 : }
|