LCOV - code coverage report
Current view: top level - crypto/evm/repositories/rpc - queued_rpc_interface.dart (source / functions) Coverage Total Hit
Test: lcov.info Lines: 48.3 % 172 83
Test Date: 2025-07-02 01:23:33 Functions: - 0 0

            Line data    Source code
       1              : import 'dart:async';
       2              : 
       3              : import 'package:walletkit_dart/src/common/logger.dart';
       4              : import 'package:walletkit_dart/walletkit_dart.dart';
       5              : 
       6              : sealed class ValueOrError<T, E> {
       7              :   final E? extra;
       8              : 
       9            5 :   const ValueOrError({this.extra});
      10              : 
      11            5 :   factory ValueOrError.value(
      12              :     T value, {
      13              :     E? extra,
      14              :   }) =>
      15            5 :       Value(value, extra: extra);
      16              : 
      17            0 :   factory ValueOrError.error(
      18              :     Object error, {
      19              :     StackTrace? stackTrace,
      20              :     E? extra,
      21              :   }) =>
      22            0 :       Error<T, E>(
      23              :         error,
      24              :         stackTrace: stackTrace,
      25              :         extra: extra,
      26              :       );
      27              : 
      28            5 :   R when<R>({
      29              :     required R Function(Value<T, E> value) value,
      30              :     required R Function(Error<T, E> error) error,
      31              :   }) {
      32              :     return switch (this) {
      33           10 :       Value<T, E> v => value(v),
      34            0 :       Error<T, E> e => error(e),
      35              :     };
      36              :   }
      37              : }
      38              : 
      39              : final class Value<T, E> extends ValueOrError<T, E> {
      40              :   final T value;
      41              : 
      42            5 :   const Value(this.value, {super.extra});
      43              : }
      44              : 
      45              : final class Error<T, E> extends ValueOrError<T, E> {
      46              :   final Object error;
      47              :   final StackTrace? stackTrace;
      48              :   final List<Error<dynamic, E>>? children;
      49              : 
      50            0 :   const Error(
      51              :     this.error, {
      52              :     this.stackTrace,
      53              :     this.children,
      54              :     super.extra,
      55              :   });
      56              : 
      57            0 :   @override
      58              :   String toString() {
      59            0 :     return 'Error{error: $error, extra: $extra,stackTrace: $stackTrace, children: $children}';
      60              :   }
      61              : }
      62              : 
      63              : extension ValueOrErrorExtension<T> on Future<T> {
      64            5 :   Future<ValueOrError<T, E>> toValueOrError<E>({
      65              :     E? extra,
      66              :   }) {
      67            5 :     return then(
      68           10 :       (value) => ValueOrError<T, E>.value(value),
      69            5 :     ).catchError(
      70            0 :       (e, s) => Error<T, E>(
      71              :         e,
      72              :         stackTrace: s,
      73              :         extra: extra,
      74              :       ),
      75              :     );
      76              :   }
      77              : }
      78              : 
      79              : enum RefreshType { onInit, onTask }
      80              : 
      81              : abstract class RpcManager {
      82              :   /// All clients available to the manager
      83              :   final List<EvmRpcClient> allClients;
      84              : 
      85              :   final RefreshType refreshType;
      86              : 
      87              :   final bool eagerError;
      88              : 
      89              :   /// Whether to wait for the clients to be refreshed before performing a task
      90              :   final bool awaitRefresh;
      91              : 
      92              :   /// Clients which successfully returned the current block number
      93              :   List<EvmRpcClient> clients;
      94              : 
      95              :   /// The rate at which the clients are refreshed. Only applicable if [refreshType] is [RefreshType.onInit]
      96              :   final Duration? clientRefreshRate;
      97              : 
      98              :   final Completer<void> _refreshCompleter = Completer();
      99              : 
     100              :   /// Future that completes when the clients are refreshed at least once
     101           15 :   Future<void> get refreshFuture => _refreshCompleter.future;
     102              : 
     103              :   bool _isRefreshingClients = false;
     104              : 
     105            5 :   RpcManager({
     106              :     required this.allClients,
     107              :     required this.eagerError,
     108              :     required this.awaitRefresh,
     109              :     required this.clientRefreshRate,
     110              :     required this.refreshType,
     111            5 :   }) : clients = List.from(allClients) {
     112           10 :     if (refreshType == RefreshType.onInit) {
     113            0 :       refreshClients();
     114            0 :       if (clientRefreshRate != null) {
     115            0 :         Timer.periodic(clientRefreshRate!, (_) => refreshClients());
     116              :       }
     117              :     }
     118              :   }
     119              : 
     120            5 :   Future refreshClients() async {
     121            5 :     if (_isRefreshingClients) return;
     122            5 :     _isRefreshingClients = true;
     123           15 :     final futures = allClients.map((client) async {
     124              :       try {
     125           10 :         await client.getBlockNumber().timeout(const Duration(seconds: 10));
     126              :         return client;
     127              :       } catch (e, s) {
     128            2 :         Logger.logError(e, s: s, hint: 'Client Refresh Error');
     129              :         return null;
     130              :       }
     131              :     });
     132              : 
     133            5 :     final results = await Future.wait(futures);
     134              : 
     135           15 :     clients = results.whereType<EvmRpcClient>().toList();
     136              : 
     137           10 :     if (!_refreshCompleter.isCompleted) {
     138           10 :       _refreshCompleter.complete();
     139              :     }
     140            5 :     _isRefreshingClients = false;
     141           35 :     Logger.log('Selected clients: ${clients.map((e) => e.rpcUrl).toList()}');
     142              :   }
     143              : 
     144              :   ///
     145              :   /// Perform a single task using one of the available clients
     146              :   /// Retries the task on the next client if the current client fails
     147              :   ///
     148              :   Future<ValueOrError<T, EvmRpcClient>> performTask<T>(
     149              :     Future<T> Function(EvmRpcClient client) task, {
     150              :     Duration timeout = const Duration(seconds: 30),
     151              :     int? maxTries,
     152              :   });
     153              : 
     154              :   ///
     155              :   /// Perform a single task on all available clients
     156              :   ///
     157              :   Future<R> performTaskForClients<T, R>(
     158              :     Future<T> Function(EvmRpcClient client) task, {
     159              :     required R Function(
     160              :       List<ValueOrError<T, EvmRpcClient>> results,
     161              :     ) consilidate,
     162              :     Duration timeout = const Duration(seconds: 30),
     163              :     int maxTriesPerClient = 2,
     164              :     int minClients = 2,
     165              :     int? maxClients,
     166              :     bool enforceParallel = false,
     167              :   });
     168              : 
     169            4 :   Future<ValueOrError<T, EvmRpcClient>> performTaskForClient<T>(
     170              :     Future<T> Function(EvmRpcClient client) task, {
     171              :     required EvmRpcClient client,
     172              :     Duration timeout = const Duration(seconds: 30),
     173              :     int maxTries = 2,
     174              :   }) async {
     175            4 :     if (maxTries == 1) {
     176           12 :       return task(client).timeout(timeout).toValueOrError(extra: client);
     177              :     }
     178            0 :     final errors = <Error<T, EvmRpcClient>>[];
     179              : 
     180            0 :     for (int i = 0; i < maxTries; i++) {
     181            0 :       final result = await task(client).timeout(timeout).toValueOrError(extra: client);
     182              : 
     183              :       switch (result) {
     184            0 :         case Value<T, EvmRpcClient> value:
     185              :           return value;
     186            0 :         case Error<T, EvmRpcClient> error:
     187            0 :           errors.add(error);
     188            0 :           Logger.logError(
     189            0 :             error.error,
     190            0 :             s: error.stackTrace,
     191              :             hint: 'RPC Task Error',
     192              :           );
     193              :       }
     194              :     }
     195            0 :     return Error(
     196            0 :       "Failed to perform the task after $maxTries tries",
     197              :       stackTrace: null,
     198              :       children: errors,
     199              :     );
     200              :   }
     201              : }
     202              : 
     203              : final class SimpleRpcManager extends RpcManager {
     204            5 :   SimpleRpcManager({
     205              :     required super.allClients,
     206              :     required super.awaitRefresh,
     207              :     required super.clientRefreshRate,
     208              :     required super.eagerError,
     209              :     required super.refreshType,
     210              :   });
     211              : 
     212            0 :   Future<R> performTaskForClients<T, R>(
     213              :     Future<T> Function(EvmRpcClient client) task, {
     214              :     required R Function(
     215              :       List<ValueOrError<T, EvmRpcClient>> results,
     216              :     ) consilidate,
     217              :     Duration timeout = const Duration(seconds: 30),
     218              :     int maxTriesPerClient = 2,
     219              :     int minClients = 2,
     220              :     int? maxClients,
     221              :     bool enforceParallel = false,
     222              :   }) async {
     223              :     assert(
     224            0 :       maxClients == null || maxClients >= minClients,
     225              :       "maxClients must be greater than or equal to minClients",
     226              :     );
     227            0 :     final clientsToUse = [...clients].take(maxClients ?? clients.length);
     228              : 
     229            0 :     if (clientsToUse.length < minClients) {
     230            0 :       throw Exception("Not enough clients available");
     231              :     }
     232              : 
     233            0 :     final results = await Future.wait(
     234            0 :       [
     235            0 :         for (final client in clientsToUse)
     236            0 :           performTaskForClient(
     237              :             task,
     238              :             client: client,
     239              :             timeout: timeout,
     240              :             maxTries: maxTriesPerClient,
     241              :           ),
     242              :       ],
     243              :     );
     244              : 
     245            0 :     return consilidate(results);
     246              :   }
     247              : 
     248            3 :   Future<ValueOrError<T, EvmRpcClient>> performTask<T>(
     249              :     Future<T> Function(EvmRpcClient client) task, {
     250              :     Duration timeout = const Duration(seconds: 30),
     251              :     int? maxTries,
     252              :   }) async {
     253           15 :     if (refreshType == RefreshType.onTask && _refreshCompleter.isCompleted == false) {
     254            3 :       refreshClients();
     255              :     }
     256              : 
     257            6 :     if (awaitRefresh) await refreshFuture;
     258              : 
     259            6 :     final currentClients = [...clients];
     260              : 
     261            6 :     if (clients.isEmpty) {
     262            0 :       throw Exception("No working clients available");
     263              :     }
     264              : 
     265            3 :     final errors = <Error<T, EvmRpcClient>>[];
     266              : 
     267            6 :     for (final client in currentClients) {
     268            9 :       final result = await task(client).timeout(timeout).toValueOrError(extra: client);
     269              : 
     270              :       switch (result) {
     271            3 :         case Value<T, EvmRpcClient> value:
     272              :           return value;
     273            0 :         case Error<T, EvmRpcClient> error:
     274            0 :           errors.add(error);
     275            0 :           Logger.logError(
     276            0 :             error.error,
     277            0 :             s: error.stackTrace,
     278              :             hint: 'RPC Task Error',
     279              :           );
     280            0 :           if (eagerError) {
     281              :             return error;
     282              :           }
     283              :           break;
     284              :       }
     285              :     }
     286              : 
     287            0 :     return Error(
     288            0 :       "All clients failed to perform the task: $errors",
     289              :       children: errors,
     290              :     );
     291              :   }
     292              : }
     293              : 
     294              : final class QueuedRpcManager extends SimpleRpcManager {
     295            4 :   QueuedRpcManager({
     296              :     required super.allClients,
     297              :     required super.awaitRefresh,
     298              :     required super.clientRefreshRate,
     299              :     required super.eagerError,
     300              :     required super.refreshType,
     301              :   });
     302              : 
     303              :   final taskQueue = TaskQueue<dynamic, EvmRpcClient>();
     304              : 
     305              :   int currentClientIndex = 0;
     306              :   bool isWorking = false;
     307              : 
     308            4 :   Future<void> workOnQueue() async {
     309            4 :     if (isWorking) return;
     310            4 :     isWorking = true;
     311              : 
     312           20 :     if (refreshType == RefreshType.onTask && _refreshCompleter.isCompleted == false) {
     313            4 :       refreshClients();
     314              :     }
     315              : 
     316            8 :     if (awaitRefresh) await refreshFuture;
     317              : 
     318            8 :     final currentClients = [...clients];
     319              : 
     320            4 :     final errors = <Error<dynamic, EvmRpcClient>>[];
     321              : 
     322            8 :     while (taskQueue.isNotEmpty) {
     323            8 :       final task = taskQueue.dequeue();
     324              :       if (task == null) continue;
     325              : 
     326            4 :       if (currentClients.isEmpty) {
     327            0 :         task.complete(
     328            0 :           Error(
     329              :             "No working clients available",
     330              :             children: errors,
     331              :           ),
     332              :         );
     333              :         continue;
     334              :       }
     335              : 
     336           12 :       final currentClient = task.client ?? currentClients[currentClientIndex];
     337              : 
     338            8 :       task.startTime ??= DateTime.now();
     339            4 :       if (task.isTimedOut()) {
     340            0 :         task.complete(
     341            0 :           Error(
     342            0 :             "Task timed out after ${task.timeout}",
     343              :             children: errors,
     344              :           ),
     345              :         );
     346              :         continue;
     347              :       }
     348            8 :       task.tries++;
     349              : 
     350            4 :       final result = await performTaskForClient(
     351            4 :         task.task,
     352              :         client: currentClient,
     353            4 :         timeout: task.timeout,
     354              :         maxTries: 1,
     355              :       );
     356              : 
     357            4 :       if (result is Value<dynamic, EvmRpcClient>) {
     358            4 :         task.complete(result);
     359            4 :         errors.clear();
     360              :         continue;
     361              :       }
     362              : 
     363            0 :       if (result is Error<dynamic, EvmRpcClient>) {
     364            0 :         errors.add(result);
     365            0 :         if (task.tries >= task.maxTries) {
     366            0 :           task.complete(
     367            0 :             Error(
     368            0 :               "Failed to perform the task: $errors after ${task.tries} tries",
     369              :               children: errors,
     370              :             ),
     371              :           );
     372            0 :           errors.clear();
     373              :           continue;
     374              :         }
     375              : 
     376            0 :         if (task.tries == currentClients.length) {
     377            0 :           task.complete(
     378            0 :             Error(
     379            0 :               "All clients failed to perform the task: $errors",
     380              :               children: errors,
     381              :             ),
     382              :           );
     383            0 :           errors.clear();
     384              :           continue;
     385              :         }
     386              : 
     387              :         // Switch to the next client
     388            0 :         if (task.client == null) {
     389            0 :           currentClientIndex = (currentClientIndex + 1) % currentClients.length;
     390              :         }
     391            0 :         taskQueue.enqueueFront(task);
     392              :       }
     393              :     }
     394              : 
     395            4 :     isWorking = false;
     396              :   }
     397              : 
     398            4 :   Future<ValueOrError<T, EvmRpcClient>> performTask<T>(
     399              :     Future<T> Function(EvmRpcClient client) task, {
     400              :     Duration timeout = const Duration(seconds: 30),
     401              :     int? maxTries,
     402              :   }) {
     403           12 :     final _task = Task(task, timeout, maxTries ?? clients.length);
     404            8 :     taskQueue.enqueue(_task);
     405              : 
     406            4 :     workOnQueue();
     407              : 
     408            4 :     return _task.future;
     409              :   }
     410              : 
     411            0 :   @override
     412              :   Future<R> performTaskForClients<T, R>(
     413              :     Future<T> Function(EvmRpcClient client) task, {
     414              :     required R Function(
     415              :       List<ValueOrError<T, EvmRpcClient>> results,
     416              :     ) consilidate,
     417              :     Duration timeout = const Duration(seconds: 30),
     418              :     int maxTriesPerClient = 2,
     419              :     int minClients = 2,
     420              :     int? maxClients,
     421              :     bool enforceParallel = false,
     422              :   }) async {
     423              :     /// If [enforceParallel] is true, perform the task on all clients at once => SimpleRpcManager Implementation
     424              :     if (enforceParallel) {
     425            0 :       return super.performTaskForClients(
     426              :         task,
     427              :         consilidate: consilidate,
     428              :         timeout: timeout,
     429              :         maxTriesPerClient: maxTriesPerClient,
     430              :         minClients: minClients,
     431              :         maxClients: maxClients,
     432              :       );
     433              :     }
     434              :     assert(
     435            0 :       maxClients == null || maxClients >= minClients,
     436              :       "maxClients must be greater than or equal to minClients",
     437              :     );
     438              : 
     439            0 :     final clientsToUse = clients.take(maxClients ?? clients.length);
     440              : 
     441            0 :     if (clientsToUse.length < minClients) {
     442            0 :       throw Exception("Not enough clients available");
     443              :     }
     444              : 
     445            0 :     final tasks = [
     446            0 :       for (final client in clientsToUse)
     447            0 :         Task(
     448              :           task,
     449              :           timeout,
     450              :           maxTriesPerClient,
     451              :           client: client,
     452              :         ),
     453              :     ];
     454              : 
     455            0 :     taskQueue.enqueueAll(tasks);
     456              : 
     457            0 :     workOnQueue();
     458              : 
     459            0 :     final results = await Future.wait(
     460            0 :       [
     461            0 :         for (final task in tasks) task.future,
     462              :       ],
     463            0 :     ).timeout(timeout);
     464              : 
     465            0 :     return consilidate(results);
     466              :   }
     467              : }
     468              : 
     469              : class Task<T, C> {
     470              :   final Future<T> Function(C client) task;
     471              :   final Completer<ValueOrError<T, C>> _completer = Completer();
     472              : 
     473              :   final Duration timeout;
     474              :   DateTime? startTime;
     475              : 
     476              :   final int maxTries;
     477              :   int tries = 0;
     478              : 
     479              :   final C? client;
     480              : 
     481            4 :   bool isTimedOut() {
     482            4 :     if (startTime == null) return false;
     483           20 :     return DateTime.now().difference(startTime!) > timeout;
     484              :   }
     485              : 
     486           12 :   Future<ValueOrError<T, C>> get future => _completer.future;
     487              : 
     488            4 :   void complete(ValueOrError<dynamic, C> result) {
     489            8 :     _completer.complete(
     490            4 :       result.when(
     491           16 :         value: (value) => Value<T, C>(value.value as T, extra: value.extra),
     492            0 :         error: (error) => Error<T, C>(
     493            0 :           error.error,
     494            0 :           stackTrace: error.stackTrace,
     495            0 :           extra: error.extra,
     496            0 :           children: error.children,
     497              :         ),
     498              :       ),
     499              :     );
     500              :   }
     501              : 
     502            4 :   Task(this.task, this.timeout, this.maxTries, {this.client});
     503              : }
     504              : 
     505              : class TaskQueue<T, C> {
     506              :   final List<Task<T, C>> _list = [];
     507              : 
     508            4 :   void enqueue(Task<T, C> task) {
     509            8 :     _list.add(task);
     510              :   }
     511              : 
     512            0 :   void enqueueAll(Iterable<Task<T, C>> tasks) {
     513            0 :     _list.addAll(tasks);
     514              :   }
     515              : 
     516            0 :   void enqueueFront(Task<T, C> task) {
     517            0 :     _list.insert(0, task);
     518              :   }
     519              : 
     520            4 :   Task<T, C>? dequeue() {
     521            8 :     if (_list.isEmpty) {
     522              :       return null;
     523              :     }
     524            8 :     return _list.removeAt(0);
     525              :   }
     526              : 
     527            0 :   bool get isEmpty => _list.isEmpty;
     528              : 
     529           12 :   bool get isNotEmpty => _list.isNotEmpty;
     530              : 
     531            0 :   Task<T, C>? peek() {
     532            0 :     return _list.isNotEmpty ? _list.first : null;
     533              :   }
     534              : 
     535            0 :   int get length => _list.length;
     536              : }
        

Generated by: LCOV version 2.0-1