Line data Source code
1 : library endpoint_utils;
2 :
3 : import 'package:walletkit_dart/src/common/logger.dart';
4 : import 'package:walletkit_dart/src/domain/entities/coin_entity.dart';
5 : import 'package:walletkit_dart/src/domain/predefined_assets.dart';
6 : import 'package:walletkit_dart/src/domain/exceptions.dart';
7 : import 'package:walletkit_dart/src/crypto/utxo/repositories/electrum_json_rpc_client.dart';
8 : import 'package:walletkit_dart/src/crypto/utxo/repositories/json_rpc_client.dart';
9 : import 'package:walletkit_dart/src/domain/extensions.dart';
10 :
11 9 : Future<List<(String, int)>> getBestHealthEndpointsWithRetry({
12 : required Iterable<(String, int)> endpointPool,
13 : required CoinEntity token,
14 : int max = 10,
15 : int min = 2,
16 : int maxRetries = 4,
17 : Duration maxLatency = const Duration(milliseconds: 800),
18 : }) async {
19 : final selectedEndpoints = <(String, int)>{};
20 : var retries = 0;
21 : var lat = maxLatency;
22 :
23 : while (true) {
24 9 : final endpoints = await _getBestHealthEndpoints(
25 : endpoints: endpointPool,
26 : token: token,
27 : maxClients: max,
28 : maxLatency: lat,
29 : );
30 :
31 9 : selectedEndpoints.addAll(endpoints);
32 :
33 18 : if (selectedEndpoints.length >= min || retries >= maxRetries) break;
34 :
35 0 : retries++;
36 0 : lat *= 1.5;
37 : }
38 :
39 18 : if (selectedEndpoints.length < min) {
40 0 : throw Exception(
41 0 : "Could not find enough healthy ElectrumX endpoints for ${token.symbol}",
42 : );
43 : }
44 :
45 9 : Logger.log(
46 36 : "Selected ${selectedEndpoints.length} ElectrumX endpoints for ${token.symbol} after $retries retries with maxLatency: ${lat.inMilliseconds}ms",
47 : );
48 :
49 9 : return selectedEndpoints.toList();
50 : }
51 :
52 9 : Future<List<(String, int)>> _getBestHealthEndpoints({
53 : required Iterable<(String, int)> endpoints,
54 : required CoinEntity token,
55 : int maxClients = 10,
56 : Duration maxLatency = const Duration(milliseconds: 1500),
57 : }) async {
58 18 : final latencies = await Future.wait([
59 9 : for (final entry in endpoints)
60 9 : getResponseTimeForHost(
61 : entry,
62 : maxLatency: maxLatency,
63 : token: token,
64 9 : ).timeout(
65 9 : maxLatency * 5,
66 8 : onTimeout: () {
67 16 : print("Creat Client Timeout for ${entry}");
68 : return null;
69 : },
70 : )
71 : ]);
72 :
73 9 : final latencyMap = {
74 27 : for (int i = 0; i < endpoints.length; i++)
75 36 : if (latencies[i] != null) endpoints[i]: latencies[i]!
76 : };
77 :
78 : /// Sort after latency
79 9 : final sorted = latencyMap.entries
80 45 : .where((entry) => entry.value.$1 <= maxLatency.inMilliseconds)
81 9 : .toList()
82 45 : ..sort((a, b) => a.value.$1.compareTo(b.value.$1));
83 :
84 9 : if (sorted.isEmpty) {
85 0 : return [];
86 : }
87 :
88 9 : final bockheightMap = <int, List<(String, int)>>{};
89 18 : for (final entry in sorted) {
90 9 : final blockHeight = entry.value.$2;
91 45 : bockheightMap[blockHeight] = [...bockheightMap[blockHeight] ?? [], entry.key];
92 : }
93 :
94 : /// Take the Biggest Endpoint List
95 18 : final mostForBlockHeight = bockheightMap.values.reduce(
96 0 : (a, b) => a.length > b.length ? a : b,
97 : );
98 :
99 18 : return mostForBlockHeight.take(maxClients).toList();
100 : }
101 :
102 9 : Future<(double, int)?> getResponseTimeForHost(
103 : final (String, int) host, {
104 : required Duration maxLatency,
105 : required CoinEntity token,
106 : }) async {
107 18 : Stopwatch stopwatch = Stopwatch()..start();
108 9 : final client = await createElectrumXClient(
109 : endpoint: host.$1,
110 : port: host.$2,
111 : token: token,
112 : );
113 : if (client == null) return null;
114 : final int? block;
115 : try {
116 9 : block = await client.getCurrentBlock();
117 : } catch (e) {
118 10 : print("Error: $e");
119 : return null;
120 : }
121 :
122 9 : await client.disconnect();
123 9 : stopwatch.stop();
124 :
125 : /// If the block is null the Host doenst provide a valid response
126 : if (block == null) return null;
127 :
128 36 : return (stopwatch.elapsed.inMilliseconds.toDouble() / 2, block);
129 : }
130 :
131 7 : Future<List<ElectrumXClient>> createClients({
132 : required List<(String, int)> endpoints,
133 : required CoinEntity token,
134 : }) async {
135 14 : return (await Future.wait([
136 7 : for (final entry in endpoints)
137 7 : createElectrumXClient(
138 : endpoint: entry.$1,
139 : port: entry.$2,
140 : token: token,
141 7 : ).timeout(
142 : const Duration(seconds: 5),
143 0 : onTimeout: () {
144 0 : print("Creat Client Timeout for ${entry}");
145 : return null;
146 : },
147 : )
148 : ]))
149 7 : .whereType<ElectrumXClient>()
150 7 : .toList();
151 : }
152 :
153 13 : Future<ElectrumXClient?> createElectrumXClient({
154 : required String endpoint,
155 : required int port,
156 : required CoinEntity token,
157 : }) async {
158 13 : final tcpJsonRpcClient = TcpJsonRpcClient(
159 13 : isZeniq: token == zeniqCoin,
160 : host: endpoint,
161 : port: port,
162 : );
163 13 : final success = await tcpJsonRpcClient.connected;
164 13 : if (success == false) return null;
165 13 : return ElectrumXClient(tcpJsonRpcClient);
166 : }
167 :
168 0 : Future<ElectrumXClient?> createRandomElectrumXClient({
169 : required List<(String, int)> endpoints,
170 : required List<(String, int)> excludedEndpoints,
171 : required CoinEntity token,
172 : }) async {
173 0 : endpoints.shuffle();
174 :
175 0 : for (final endpoint in endpoints) {
176 0 : final client = await createElectrumXClient(
177 : endpoint: endpoint.$1,
178 : port: endpoint.$2,
179 : token: token,
180 : );
181 : if (client == null) continue;
182 0 : endpoints.remove(endpoint);
183 0 : excludedEndpoints.add(endpoint);
184 : return client;
185 : }
186 :
187 : return null;
188 : }
189 :
190 12 : Future<(T?, ElectrumXClient?, NoWorkingHostsException?)> fetchFromRandomElectrumXNode<T>(
191 : Future<T> Function(ElectrumXClient) fetchFunction, {
192 : required ElectrumXClient? client,
193 : required Iterable<(String, int)> endpoints,
194 : required CoinEntity token,
195 : Duration timeout = const Duration(milliseconds: 3000),
196 : bool cleanup = true,
197 : }) async {
198 : try {
199 10 : if (client == null) throw ClientNullException("Client is null");
200 16 : final result = await fetchFunction(client).timeout(timeout);
201 : return (result, client, null);
202 : } catch (e, s) {
203 10 : if (e is! ClientNullException) {
204 0 : Logger.logWarning(
205 0 : "ElectrumX fetch failed for initial ${client?.host}. Trying new Hosts. $e $s",
206 : );
207 : }
208 :
209 0 : client?.disconnect();
210 :
211 : /// If the fetch failed, try to fetch with a random client
212 20 : final endpoints0 = List.of(endpoints, growable: true)..shuffle();
213 :
214 10 : final errors = <String>[];
215 :
216 20 : for (final endpoint in endpoints0) {
217 10 : final client = await createElectrumXClient(
218 : endpoint: endpoint.$1,
219 : port: endpoint.$2,
220 : token: token,
221 : );
222 : if (client == null) continue;
223 : try {
224 20 : final result = await fetchFunction(client).timeout(timeout);
225 10 : if (cleanup) client.disconnect();
226 : return (result, client, null);
227 : } catch (e, s) {
228 3 : client.disconnect();
229 6 : errors.add("$e $s");
230 9 : Logger.logWarning("ElectrumX fetch failed for ${client.host}: $e");
231 : }
232 : }
233 :
234 : return (
235 : null,
236 : null,
237 0 : NoWorkingHostsException(
238 0 : "Could not fetch from any endpoint: $endpoints $errors",
239 : )
240 : );
241 : }
242 : }
243 :
244 0 : Future<(T?, Object?)> fetchFromNode<T>(
245 : Future<T> Function(ElectrumXClient) fetchFunction, {
246 : required ElectrumXClient client,
247 : Duration timeout = const Duration(milliseconds: 3000),
248 : bool cleanup = false,
249 : }) async {
250 : try {
251 0 : final T result = await fetchFunction(client).timeout(timeout);
252 0 : if (cleanup) client.disconnect();
253 : return (result, null);
254 0 : } on Exception catch (e, _) {
255 0 : Logger.logWarning(
256 0 : "ElectrumX fetch failed for ${client.host}. $e",
257 : );
258 :
259 0 : if (cleanup) client.disconnect();
260 : return (null, e);
261 : }
262 : }
|