Line data Source code
1 : import 'dart:async';
2 :
3 : import 'package:walletkit_dart/src/common/logger.dart';
4 : import 'package:walletkit_dart/walletkit_dart.dart';
5 :
6 : sealed class ValueOrError<T, E> {
7 : final E? extra;
8 :
9 5 : const ValueOrError({this.extra});
10 :
11 5 : factory ValueOrError.value(
12 : T value, {
13 : E? extra,
14 : }) =>
15 5 : Value(value, extra: extra);
16 :
17 0 : factory ValueOrError.error(
18 : Object error, {
19 : StackTrace? stackTrace,
20 : E? extra,
21 : }) =>
22 0 : Error<T, E>(
23 : error,
24 : stackTrace: stackTrace,
25 : extra: extra,
26 : );
27 :
28 5 : R when<R>({
29 : required R Function(Value<T, E> value) value,
30 : required R Function(Error<T, E> error) error,
31 : }) {
32 : return switch (this) {
33 10 : Value<T, E> v => value(v),
34 0 : Error<T, E> e => error(e),
35 : };
36 : }
37 : }
38 :
39 : final class Value<T, E> extends ValueOrError<T, E> {
40 : final T value;
41 :
42 5 : const Value(this.value, {super.extra});
43 : }
44 :
45 : final class Error<T, E> extends ValueOrError<T, E> {
46 : final Object error;
47 : final StackTrace? stackTrace;
48 : final List<Error<dynamic, E>>? children;
49 :
50 0 : const Error(
51 : this.error, {
52 : this.stackTrace,
53 : this.children,
54 : super.extra,
55 : });
56 :
57 0 : @override
58 : String toString() {
59 0 : return 'Error{error: $error, extra: $extra,stackTrace: $stackTrace, children: $children}';
60 : }
61 : }
62 :
63 : extension ValueOrErrorExtension<T> on Future<T> {
64 5 : Future<ValueOrError<T, E>> toValueOrError<E>({
65 : E? extra,
66 : }) {
67 5 : return then(
68 10 : (value) => ValueOrError<T, E>.value(value),
69 5 : ).catchError(
70 0 : (e, s) => Error<T, E>(
71 : e,
72 : stackTrace: s,
73 : extra: extra,
74 : ),
75 : );
76 : }
77 : }
78 :
79 : enum RefreshType { onInit, onTask }
80 :
81 : abstract class RpcManager {
82 : /// All clients available to the manager
83 : final List<EvmRpcClient> allClients;
84 :
85 : final RefreshType refreshType;
86 :
87 : final bool eagerError;
88 :
89 : /// Whether to wait for the clients to be refreshed before performing a task
90 : final bool awaitRefresh;
91 :
92 : /// Clients which successfully returned the current block number
93 : List<EvmRpcClient> clients;
94 :
95 : /// The rate at which the clients are refreshed. Only applicable if [refreshType] is [RefreshType.onInit]
96 : final Duration? clientRefreshRate;
97 :
98 : final Completer<void> _refreshCompleter = Completer();
99 :
100 : /// Future that completes when the clients are refreshed at least once
101 15 : Future<void> get refreshFuture => _refreshCompleter.future;
102 :
103 : bool _isRefreshingClients = false;
104 :
105 5 : RpcManager({
106 : required this.allClients,
107 : required this.eagerError,
108 : required this.awaitRefresh,
109 : required this.clientRefreshRate,
110 : required this.refreshType,
111 5 : }) : clients = List.from(allClients) {
112 10 : if (refreshType == RefreshType.onInit) {
113 0 : refreshClients();
114 0 : if (clientRefreshRate != null) {
115 0 : Timer.periodic(clientRefreshRate!, (_) => refreshClients());
116 : }
117 : }
118 : }
119 :
120 5 : Future refreshClients() async {
121 5 : if (_isRefreshingClients) return;
122 5 : _isRefreshingClients = true;
123 15 : final futures = allClients.map((client) async {
124 : try {
125 10 : await client.getBlockNumber().timeout(const Duration(seconds: 10));
126 : return client;
127 : } catch (e, s) {
128 2 : Logger.logError(e, s: s, hint: 'Client Refresh Error');
129 : return null;
130 : }
131 : });
132 :
133 5 : final results = await Future.wait(futures);
134 :
135 15 : clients = results.whereType<EvmRpcClient>().toList();
136 :
137 10 : if (!_refreshCompleter.isCompleted) {
138 10 : _refreshCompleter.complete();
139 : }
140 5 : _isRefreshingClients = false;
141 35 : Logger.log('Selected clients: ${clients.map((e) => e.rpcUrl).toList()}');
142 : }
143 :
144 : ///
145 : /// Perform a single task using one of the available clients
146 : /// Retries the task on the next client if the current client fails
147 : ///
148 : Future<ValueOrError<T, EvmRpcClient>> performTask<T>(
149 : Future<T> Function(EvmRpcClient client) task, {
150 : Duration timeout = const Duration(seconds: 30),
151 : int? maxTries,
152 : });
153 :
154 : ///
155 : /// Perform a single task on all available clients
156 : ///
157 : Future<R> performTaskForClients<T, R>(
158 : Future<T> Function(EvmRpcClient client) task, {
159 : required R Function(
160 : List<ValueOrError<T, EvmRpcClient>> results,
161 : ) consilidate,
162 : Duration timeout = const Duration(seconds: 30),
163 : int maxTriesPerClient = 2,
164 : int minClients = 2,
165 : int? maxClients,
166 : bool enforceParallel = false,
167 : });
168 :
169 4 : Future<ValueOrError<T, EvmRpcClient>> performTaskForClient<T>(
170 : Future<T> Function(EvmRpcClient client) task, {
171 : required EvmRpcClient client,
172 : Duration timeout = const Duration(seconds: 30),
173 : int maxTries = 2,
174 : }) async {
175 4 : if (maxTries == 1) {
176 12 : return task(client).timeout(timeout).toValueOrError(extra: client);
177 : }
178 0 : final errors = <Error<T, EvmRpcClient>>[];
179 :
180 0 : for (int i = 0; i < maxTries; i++) {
181 0 : final result = await task(client).timeout(timeout).toValueOrError(extra: client);
182 :
183 : switch (result) {
184 0 : case Value<T, EvmRpcClient> value:
185 : return value;
186 0 : case Error<T, EvmRpcClient> error:
187 0 : errors.add(error);
188 0 : Logger.logError(
189 0 : error.error,
190 0 : s: error.stackTrace,
191 : hint: 'RPC Task Error',
192 : );
193 : }
194 : }
195 0 : return Error(
196 0 : "Failed to perform the task after $maxTries tries",
197 : stackTrace: null,
198 : children: errors,
199 : );
200 : }
201 : }
202 :
203 : final class SimpleRpcManager extends RpcManager {
204 5 : SimpleRpcManager({
205 : required super.allClients,
206 : required super.awaitRefresh,
207 : required super.clientRefreshRate,
208 : required super.eagerError,
209 : required super.refreshType,
210 : });
211 :
212 0 : Future<R> performTaskForClients<T, R>(
213 : Future<T> Function(EvmRpcClient client) task, {
214 : required R Function(
215 : List<ValueOrError<T, EvmRpcClient>> results,
216 : ) consilidate,
217 : Duration timeout = const Duration(seconds: 30),
218 : int maxTriesPerClient = 2,
219 : int minClients = 2,
220 : int? maxClients,
221 : bool enforceParallel = false,
222 : }) async {
223 : assert(
224 0 : maxClients == null || maxClients >= minClients,
225 : "maxClients must be greater than or equal to minClients",
226 : );
227 0 : final clientsToUse = [...clients].take(maxClients ?? clients.length);
228 :
229 0 : if (clientsToUse.length < minClients) {
230 0 : throw Exception("Not enough clients available");
231 : }
232 :
233 0 : final results = await Future.wait(
234 0 : [
235 0 : for (final client in clientsToUse)
236 0 : performTaskForClient(
237 : task,
238 : client: client,
239 : timeout: timeout,
240 : maxTries: maxTriesPerClient,
241 : ),
242 : ],
243 : );
244 :
245 0 : return consilidate(results);
246 : }
247 :
248 3 : Future<ValueOrError<T, EvmRpcClient>> performTask<T>(
249 : Future<T> Function(EvmRpcClient client) task, {
250 : Duration timeout = const Duration(seconds: 30),
251 : int? maxTries,
252 : }) async {
253 15 : if (refreshType == RefreshType.onTask && _refreshCompleter.isCompleted == false) {
254 3 : refreshClients();
255 : }
256 :
257 6 : if (awaitRefresh) await refreshFuture;
258 :
259 6 : final currentClients = [...clients];
260 :
261 6 : if (clients.isEmpty) {
262 0 : throw Exception("No working clients available");
263 : }
264 :
265 3 : final errors = <Error<T, EvmRpcClient>>[];
266 :
267 6 : for (final client in currentClients) {
268 9 : final result = await task(client).timeout(timeout).toValueOrError(extra: client);
269 :
270 : switch (result) {
271 3 : case Value<T, EvmRpcClient> value:
272 : return value;
273 0 : case Error<T, EvmRpcClient> error:
274 0 : errors.add(error);
275 0 : Logger.logError(
276 0 : error.error,
277 0 : s: error.stackTrace,
278 : hint: 'RPC Task Error',
279 : );
280 0 : if (eagerError) {
281 : return error;
282 : }
283 : break;
284 : }
285 : }
286 :
287 0 : return Error(
288 0 : "All clients failed to perform the task: $errors",
289 : children: errors,
290 : );
291 : }
292 : }
293 :
294 : final class QueuedRpcManager extends SimpleRpcManager {
295 4 : QueuedRpcManager({
296 : required super.allClients,
297 : required super.awaitRefresh,
298 : required super.clientRefreshRate,
299 : required super.eagerError,
300 : required super.refreshType,
301 : });
302 :
303 : final taskQueue = TaskQueue<dynamic, EvmRpcClient>();
304 :
305 : int currentClientIndex = 0;
306 : bool isWorking = false;
307 :
308 4 : Future<void> workOnQueue() async {
309 4 : if (isWorking) return;
310 4 : isWorking = true;
311 :
312 20 : if (refreshType == RefreshType.onTask && _refreshCompleter.isCompleted == false) {
313 4 : refreshClients();
314 : }
315 :
316 8 : if (awaitRefresh) await refreshFuture;
317 :
318 8 : final currentClients = [...clients];
319 :
320 4 : final errors = <Error<dynamic, EvmRpcClient>>[];
321 :
322 8 : while (taskQueue.isNotEmpty) {
323 8 : final task = taskQueue.dequeue();
324 : if (task == null) continue;
325 :
326 4 : if (currentClients.isEmpty) {
327 0 : task.complete(
328 0 : Error(
329 : "No working clients available",
330 : children: errors,
331 : ),
332 : );
333 : continue;
334 : }
335 :
336 12 : final currentClient = task.client ?? currentClients[currentClientIndex];
337 :
338 8 : task.startTime ??= DateTime.now();
339 4 : if (task.isTimedOut()) {
340 0 : task.complete(
341 0 : Error(
342 0 : "Task timed out after ${task.timeout}",
343 : children: errors,
344 : ),
345 : );
346 : continue;
347 : }
348 8 : task.tries++;
349 :
350 4 : final result = await performTaskForClient(
351 4 : task.task,
352 : client: currentClient,
353 4 : timeout: task.timeout,
354 : maxTries: 1,
355 : );
356 :
357 4 : if (result is Value<dynamic, EvmRpcClient>) {
358 4 : task.complete(result);
359 4 : errors.clear();
360 : continue;
361 : }
362 :
363 0 : if (result is Error<dynamic, EvmRpcClient>) {
364 0 : errors.add(result);
365 0 : if (task.tries >= task.maxTries) {
366 0 : task.complete(
367 0 : Error(
368 0 : "Failed to perform the task: $errors after ${task.tries} tries",
369 : children: errors,
370 : ),
371 : );
372 0 : errors.clear();
373 : continue;
374 : }
375 :
376 0 : if (task.tries == currentClients.length) {
377 0 : task.complete(
378 0 : Error(
379 0 : "All clients failed to perform the task: $errors",
380 : children: errors,
381 : ),
382 : );
383 0 : errors.clear();
384 : continue;
385 : }
386 :
387 : // Switch to the next client
388 0 : if (task.client == null) {
389 0 : currentClientIndex = (currentClientIndex + 1) % currentClients.length;
390 : }
391 0 : taskQueue.enqueueFront(task);
392 : }
393 : }
394 :
395 4 : isWorking = false;
396 : }
397 :
398 4 : Future<ValueOrError<T, EvmRpcClient>> performTask<T>(
399 : Future<T> Function(EvmRpcClient client) task, {
400 : Duration timeout = const Duration(seconds: 30),
401 : int? maxTries,
402 : }) {
403 12 : final _task = Task(task, timeout, maxTries ?? clients.length);
404 8 : taskQueue.enqueue(_task);
405 :
406 4 : workOnQueue();
407 :
408 4 : return _task.future;
409 : }
410 :
411 0 : @override
412 : Future<R> performTaskForClients<T, R>(
413 : Future<T> Function(EvmRpcClient client) task, {
414 : required R Function(
415 : List<ValueOrError<T, EvmRpcClient>> results,
416 : ) consilidate,
417 : Duration timeout = const Duration(seconds: 30),
418 : int maxTriesPerClient = 2,
419 : int minClients = 2,
420 : int? maxClients,
421 : bool enforceParallel = false,
422 : }) async {
423 : /// If [enforceParallel] is true, perform the task on all clients at once => SimpleRpcManager Implementation
424 : if (enforceParallel) {
425 0 : return super.performTaskForClients(
426 : task,
427 : consilidate: consilidate,
428 : timeout: timeout,
429 : maxTriesPerClient: maxTriesPerClient,
430 : minClients: minClients,
431 : maxClients: maxClients,
432 : );
433 : }
434 : assert(
435 0 : maxClients == null || maxClients >= minClients,
436 : "maxClients must be greater than or equal to minClients",
437 : );
438 :
439 0 : final clientsToUse = clients.take(maxClients ?? clients.length);
440 :
441 0 : if (clientsToUse.length < minClients) {
442 0 : throw Exception("Not enough clients available");
443 : }
444 :
445 0 : final tasks = [
446 0 : for (final client in clientsToUse)
447 0 : Task(
448 : task,
449 : timeout,
450 : maxTriesPerClient,
451 : client: client,
452 : ),
453 : ];
454 :
455 0 : taskQueue.enqueueAll(tasks);
456 :
457 0 : workOnQueue();
458 :
459 0 : final results = await Future.wait(
460 0 : [
461 0 : for (final task in tasks) task.future,
462 : ],
463 0 : ).timeout(timeout);
464 :
465 0 : return consilidate(results);
466 : }
467 : }
468 :
469 : class Task<T, C> {
470 : final Future<T> Function(C client) task;
471 : final Completer<ValueOrError<T, C>> _completer = Completer();
472 :
473 : final Duration timeout;
474 : DateTime? startTime;
475 :
476 : final int maxTries;
477 : int tries = 0;
478 :
479 : final C? client;
480 :
481 4 : bool isTimedOut() {
482 4 : if (startTime == null) return false;
483 20 : return DateTime.now().difference(startTime!) > timeout;
484 : }
485 :
486 12 : Future<ValueOrError<T, C>> get future => _completer.future;
487 :
488 4 : void complete(ValueOrError<dynamic, C> result) {
489 8 : _completer.complete(
490 4 : result.when(
491 16 : value: (value) => Value<T, C>(value.value as T, extra: value.extra),
492 0 : error: (error) => Error<T, C>(
493 0 : error.error,
494 0 : stackTrace: error.stackTrace,
495 0 : extra: error.extra,
496 0 : children: error.children,
497 : ),
498 : ),
499 : );
500 : }
501 :
502 4 : Task(this.task, this.timeout, this.maxTries, {this.client});
503 : }
504 :
505 : class TaskQueue<T, C> {
506 : final List<Task<T, C>> _list = [];
507 :
508 4 : void enqueue(Task<T, C> task) {
509 8 : _list.add(task);
510 : }
511 :
512 0 : void enqueueAll(Iterable<Task<T, C>> tasks) {
513 0 : _list.addAll(tasks);
514 : }
515 :
516 0 : void enqueueFront(Task<T, C> task) {
517 0 : _list.insert(0, task);
518 : }
519 :
520 4 : Task<T, C>? dequeue() {
521 8 : if (_list.isEmpty) {
522 : return null;
523 : }
524 8 : return _list.removeAt(0);
525 : }
526 :
527 0 : bool get isEmpty => _list.isEmpty;
528 :
529 12 : bool get isNotEmpty => _list.isNotEmpty;
530 :
531 0 : Task<T, C>? peek() {
532 0 : return _list.isNotEmpty ? _list.first : null;
533 : }
534 :
535 0 : int get length => _list.length;
536 : }
|