LCOV - code coverage report
Current view: top level - crypto/evm/repositories/rpc - queued_rpc_interface.dart (source / functions) Coverage Total Hit
Test: lcov.info Lines: 48.9 % 174 85
Test Date: 2025-04-01 01:23:07 Functions: - 0 0

            Line data    Source code
       1              : import 'dart:async';
       2              : 
       3              : import 'package:walletkit_dart/src/common/logger.dart';
       4              : import 'package:walletkit_dart/walletkit_dart.dart';
       5              : 
       6              : sealed class ValueOrError<T, E> {
       7              :   final E? extra;
       8              : 
       9            5 :   const ValueOrError({this.extra});
      10              : 
      11            5 :   factory ValueOrError.value(
      12              :     T value, {
      13              :     E? extra,
      14              :   }) =>
      15            5 :       Value(value, extra: extra);
      16              : 
      17            0 :   factory ValueOrError.error(
      18              :     Object error, {
      19              :     StackTrace? stackTrace,
      20              :     E? extra,
      21              :   }) =>
      22            0 :       Error<T, E>(
      23              :         error,
      24              :         stackTrace: stackTrace,
      25              :         extra: extra,
      26              :       );
      27              : 
      28            5 :   R when<R>({
      29              :     required R Function(Value<T, E> value) value,
      30              :     required R Function(Error<T, E> error) error,
      31              :   }) {
      32              :     return switch (this) {
      33           10 :       Value<T, E> v => value(v),
      34            0 :       Error<T, E> e => error(e),
      35              :     };
      36              :   }
      37              : }
      38              : 
      39              : final class Value<T, E> extends ValueOrError<T, E> {
      40              :   final T value;
      41              : 
      42            5 :   const Value(this.value, {super.extra});
      43              : }
      44              : 
      45              : final class Error<T, E> extends ValueOrError<T, E> {
      46              :   final Object error;
      47              :   final StackTrace? stackTrace;
      48              :   final List<Error<dynamic, E>>? children;
      49              : 
      50            0 :   const Error(
      51              :     this.error, {
      52              :     this.stackTrace,
      53              :     this.children,
      54              :     super.extra,
      55              :   });
      56              : 
      57            0 :   @override
      58              :   String toString() {
      59            0 :     return 'Error{error: $error, extra: $extra,stackTrace: $stackTrace, children: $children}';
      60              :   }
      61              : }
      62              : 
      63              : extension ValueOrErrorExtension<T> on Future<T> {
      64            5 :   Future<ValueOrError<T, E>> toValueOrError<E>({
      65              :     E? extra,
      66              :   }) {
      67            5 :     return then(
      68           10 :       (value) => ValueOrError<T, E>.value(value),
      69            5 :     ).catchError(
      70            0 :       (e, s) => Error<T, E>(
      71              :         e,
      72              :         stackTrace: s,
      73              :         extra: extra,
      74              :       ),
      75              :     );
      76              :   }
      77              : }
      78              : 
      79              : enum RefreshType { onInit, onTask }
      80              : 
      81              : abstract class RpcManager {
      82              :   /// All clients available to the manager
      83              :   final List<EvmRpcClient> allClients;
      84              : 
      85              :   final RefreshType refreshType;
      86              : 
      87              :   final bool eagerError;
      88              : 
      89              :   /// Whether to wait for the clients to be refreshed before performing a task
      90              :   final bool awaitRefresh;
      91              : 
      92              :   /// Clients which successfully returned the current block number
      93              :   List<EvmRpcClient> clients;
      94              : 
      95              :   /// The rate at which the clients are refreshed. Only applicable if [refreshType] is [RefreshType.onInit]
      96              :   final Duration? clientRefreshRate;
      97              : 
      98              :   final Completer<void> _refreshCompleter = Completer();
      99              : 
     100              :   /// Future that completes when the clients are refreshed at least once
     101           15 :   Future<void> get refreshFuture => _refreshCompleter.future;
     102              : 
     103              :   bool _isRefreshingClients = false;
     104              : 
     105            5 :   RpcManager({
     106              :     required this.allClients,
     107              :     required this.eagerError,
     108              :     required this.awaitRefresh,
     109              :     required this.clientRefreshRate,
     110              :     required this.refreshType,
     111            5 :   }) : clients = List.from(allClients) {
     112           10 :     if (refreshType == RefreshType.onInit) {
     113            0 :       refreshClients();
     114            0 :       if (clientRefreshRate != null) {
     115            0 :         Timer.periodic(clientRefreshRate!, (_) => refreshClients());
     116              :       }
     117              :     }
     118              :   }
     119              : 
     120            5 :   Future refreshClients() async {
     121            5 :     if (_isRefreshingClients) return;
     122            5 :     _isRefreshingClients = true;
     123           15 :     final futures = allClients.map((client) async {
     124              :       try {
     125           10 :         await client.getBlockNumber().timeout(const Duration(seconds: 10));
     126              :         return client;
     127              :       } catch (e, s) {
     128            2 :         Logger.logError(e, s: s, hint: 'Client Refresh Error');
     129              :         return null;
     130              :       }
     131              :     });
     132              : 
     133            5 :     final results = await Future.wait(futures);
     134              : 
     135           15 :     clients = results.whereType<EvmRpcClient>().toList();
     136              : 
     137           10 :     if (!_refreshCompleter.isCompleted) {
     138           10 :       _refreshCompleter.complete();
     139              :     }
     140            5 :     _isRefreshingClients = false;
     141           35 :     Logger.log('Selected clients: ${clients.map((e) => e.rpcUrl).toList()}');
     142              :   }
     143              : 
     144              :   ///
     145              :   /// Perform a single task using one of the available clients
     146              :   /// Retries the task on the next client if the current client fails
     147              :   ///
     148              :   Future<ValueOrError<T, EvmRpcClient>> performTask<T>(
     149              :     Future<T> Function(EvmRpcClient client) task, {
     150              :     Duration timeout = const Duration(seconds: 30),
     151              :     int? maxTries,
     152              :   });
     153              : 
     154              :   ///
     155              :   /// Perform a single task on all available clients
     156              :   ///
     157              :   Future<R> performTaskForClients<T, R>(
     158              :     Future<T> Function(EvmRpcClient client) task, {
     159              :     required R Function(
     160              :       List<ValueOrError<T, EvmRpcClient>> results,
     161              :     ) consilidate,
     162              :     Duration timeout = const Duration(seconds: 30),
     163              :     int maxTriesPerClient = 2,
     164              :     int minClients = 2,
     165              :     int? maxClients,
     166              :     bool enforceParallel = false,
     167              :   });
     168              : 
     169            4 :   Future<ValueOrError<T, EvmRpcClient>> performTaskForClient<T>(
     170              :     Future<T> Function(EvmRpcClient client) task, {
     171              :     required EvmRpcClient client,
     172              :     Duration timeout = const Duration(seconds: 30),
     173              :     int maxTries = 2,
     174              :   }) async {
     175            4 :     if (maxTries == 1) {
     176           12 :       return task(client).timeout(timeout).toValueOrError(extra: client);
     177              :     }
     178            0 :     final errors = <Error<T, EvmRpcClient>>[];
     179              : 
     180            0 :     for (int i = 0; i < maxTries; i++) {
     181              :       final result =
     182            0 :           await task(client).timeout(timeout).toValueOrError(extra: client);
     183              : 
     184              :       switch (result) {
     185            0 :         case Value<T, EvmRpcClient> value:
     186              :           return value;
     187            0 :         case Error<T, EvmRpcClient> error:
     188            0 :           errors.add(error);
     189            0 :           Logger.logError(
     190            0 :             error.error,
     191            0 :             s: error.stackTrace,
     192              :             hint: 'RPC Task Error',
     193              :           );
     194              :       }
     195              :     }
     196            0 :     return Error(
     197            0 :       "Failed to perform the task after $maxTries tries",
     198              :       stackTrace: null,
     199              :       children: errors,
     200              :     );
     201              :   }
     202              : }
     203              : 
     204              : final class SimpleRpcManager extends RpcManager {
     205            5 :   SimpleRpcManager({
     206              :     required super.allClients,
     207              :     required super.awaitRefresh,
     208              :     required super.clientRefreshRate,
     209              :     required super.eagerError,
     210              :     required super.refreshType,
     211              :   });
     212              : 
     213            0 :   Future<R> performTaskForClients<T, R>(
     214              :     Future<T> Function(EvmRpcClient client) task, {
     215              :     required R Function(
     216              :       List<ValueOrError<T, EvmRpcClient>> results,
     217              :     ) consilidate,
     218              :     Duration timeout = const Duration(seconds: 30),
     219              :     int maxTriesPerClient = 2,
     220              :     int minClients = 2,
     221              :     int? maxClients,
     222              :     bool enforceParallel = false,
     223              :   }) async {
     224              :     assert(
     225            0 :       maxClients == null || maxClients >= minClients,
     226              :       "maxClients must be greater than or equal to minClients",
     227              :     );
     228            0 :     final clientsToUse = [...clients].take(maxClients ?? clients.length);
     229              : 
     230            0 :     if (clientsToUse.length < minClients) {
     231            0 :       throw Exception("Not enough clients available");
     232              :     }
     233              : 
     234            0 :     final results = await Future.wait(
     235            0 :       [
     236            0 :         for (final client in clientsToUse)
     237            0 :           performTaskForClient(
     238              :             task,
     239              :             client: client,
     240              :             timeout: timeout,
     241              :             maxTries: maxTriesPerClient,
     242              :           ),
     243              :       ],
     244              :     );
     245              : 
     246            0 :     return consilidate(results);
     247              :   }
     248              : 
     249            3 :   Future<ValueOrError<T, EvmRpcClient>> performTask<T>(
     250              :     Future<T> Function(EvmRpcClient client) task, {
     251              :     Duration timeout = const Duration(seconds: 30),
     252              :     int? maxTries,
     253              :   }) async {
     254            6 :     if (refreshType == RefreshType.onTask &&
     255            9 :         _refreshCompleter.isCompleted == false) {
     256            3 :       refreshClients();
     257              :     }
     258              : 
     259            6 :     if (awaitRefresh) await refreshFuture;
     260              : 
     261            6 :     final currentClients = [...clients];
     262              : 
     263            6 :     if (clients.isEmpty) {
     264            0 :       throw Exception("No working clients available");
     265              :     }
     266              : 
     267            3 :     final errors = <Error<T, EvmRpcClient>>[];
     268              : 
     269            6 :     for (final client in currentClients) {
     270              :       final result =
     271            9 :           await task(client).timeout(timeout).toValueOrError(extra: client);
     272              : 
     273              :       switch (result) {
     274            3 :         case Value<T, EvmRpcClient> value:
     275              :           return value;
     276            0 :         case Error<T, EvmRpcClient> error:
     277            0 :           errors.add(error);
     278            0 :           Logger.logError(
     279            0 :             error.error,
     280            0 :             s: error.stackTrace,
     281              :             hint: 'RPC Task Error',
     282              :           );
     283            0 :           if (eagerError) {
     284              :             return error;
     285              :           }
     286              :           break;
     287              :       }
     288              :     }
     289              : 
     290            0 :     return Error(
     291            0 :       "All clients failed to perform the task: $errors",
     292              :       children: errors,
     293              :     );
     294              :   }
     295              : }
     296              : 
     297              : final class QueuedRpcManager extends SimpleRpcManager {
     298            4 :   QueuedRpcManager({
     299              :     required super.allClients,
     300              :     required super.awaitRefresh,
     301              :     required super.clientRefreshRate,
     302              :     required super.eagerError,
     303              :     required super.refreshType,
     304              :   });
     305              : 
     306              :   final taskQueue = TaskQueue<dynamic, EvmRpcClient>();
     307              : 
     308              :   int currentClientIndex = 0;
     309              :   bool isWorking = false;
     310              : 
     311            4 :   Future<void> workOnQueue() async {
     312            4 :     if (isWorking) return;
     313            4 :     isWorking = true;
     314              : 
     315            8 :     if (refreshType == RefreshType.onTask &&
     316           12 :         _refreshCompleter.isCompleted == false) {
     317            4 :       refreshClients();
     318              :     }
     319              : 
     320            8 :     if (awaitRefresh) await refreshFuture;
     321              : 
     322            8 :     final currentClients = [...clients];
     323              : 
     324            4 :     final errors = <Error<dynamic, EvmRpcClient>>[];
     325              : 
     326            8 :     while (taskQueue.isNotEmpty) {
     327            8 :       final task = taskQueue.dequeue();
     328              :       if (task == null) continue;
     329              : 
     330            4 :       if (currentClients.isEmpty) {
     331            0 :         task.complete(
     332            0 :           Error(
     333              :             "No working clients available",
     334              :             children: errors,
     335              :           ),
     336              :         );
     337              :         continue;
     338              :       }
     339              : 
     340           12 :       final currentClient = task.client ?? currentClients[currentClientIndex];
     341              : 
     342            8 :       task.startTime ??= DateTime.now();
     343            4 :       if (task.isTimedOut()) {
     344            0 :         task.complete(
     345            0 :           Error(
     346            0 :             "Task timed out after ${task.timeout}",
     347              :             children: errors,
     348              :           ),
     349              :         );
     350              :         continue;
     351              :       }
     352            8 :       task.tries++;
     353              : 
     354            4 :       final result = await performTaskForClient(
     355            4 :         task.task,
     356              :         client: currentClient,
     357            4 :         timeout: task.timeout,
     358              :         maxTries: 1,
     359              :       );
     360              : 
     361            4 :       if (result is Value<dynamic, EvmRpcClient>) {
     362            4 :         task.complete(result);
     363            4 :         errors.clear();
     364              :         continue;
     365              :       }
     366              : 
     367            0 :       if (result is Error<dynamic, EvmRpcClient>) {
     368            0 :         errors.add(result);
     369            0 :         if (task.tries >= task.maxTries) {
     370            0 :           task.complete(
     371            0 :             Error(
     372            0 :               "Failed to perform the task: $errors after ${task.tries} tries",
     373              :               children: errors,
     374              :             ),
     375              :           );
     376            0 :           errors.clear();
     377              :           continue;
     378              :         }
     379              : 
     380            0 :         if (task.tries == currentClients.length) {
     381            0 :           task.complete(
     382            0 :             Error(
     383            0 :               "All clients failed to perform the task: $errors",
     384              :               children: errors,
     385              :             ),
     386              :           );
     387            0 :           errors.clear();
     388              :           continue;
     389              :         }
     390              : 
     391              :         // Switch to the next client
     392            0 :         if (task.client == null) {
     393            0 :           currentClientIndex = (currentClientIndex + 1) % currentClients.length;
     394              :         }
     395            0 :         taskQueue.enqueueFront(task);
     396              :       }
     397              :     }
     398              : 
     399            4 :     isWorking = false;
     400              :   }
     401              : 
     402            4 :   Future<ValueOrError<T, EvmRpcClient>> performTask<T>(
     403              :     Future<T> Function(EvmRpcClient client) task, {
     404              :     Duration timeout = const Duration(seconds: 30),
     405              :     int? maxTries,
     406              :   }) {
     407           12 :     final _task = Task(task, timeout, maxTries ?? clients.length);
     408            8 :     taskQueue.enqueue(_task);
     409              : 
     410            4 :     workOnQueue();
     411              : 
     412            4 :     return _task.future;
     413              :   }
     414              : 
     415            0 :   @override
     416              :   Future<R> performTaskForClients<T, R>(
     417              :     Future<T> Function(EvmRpcClient client) task, {
     418              :     required R Function(
     419              :       List<ValueOrError<T, EvmRpcClient>> results,
     420              :     ) consilidate,
     421              :     Duration timeout = const Duration(seconds: 30),
     422              :     int maxTriesPerClient = 2,
     423              :     int minClients = 2,
     424              :     int? maxClients,
     425              :     bool enforceParallel = false,
     426              :   }) async {
     427              :     /// If [enforceParallel] is true, perform the task on all clients at once => SimpleRpcManager Implementation
     428              :     if (enforceParallel) {
     429            0 :       return super.performTaskForClients(
     430              :         task,
     431              :         consilidate: consilidate,
     432              :         timeout: timeout,
     433              :         maxTriesPerClient: maxTriesPerClient,
     434              :         minClients: minClients,
     435              :         maxClients: maxClients,
     436              :       );
     437              :     }
     438              :     assert(
     439            0 :       maxClients == null || maxClients >= minClients,
     440              :       "maxClients must be greater than or equal to minClients",
     441              :     );
     442              : 
     443            0 :     final clientsToUse = clients.take(maxClients ?? clients.length);
     444              : 
     445            0 :     if (clientsToUse.length < minClients) {
     446            0 :       throw Exception("Not enough clients available");
     447              :     }
     448              : 
     449            0 :     final tasks = [
     450            0 :       for (final client in clientsToUse)
     451            0 :         Task(
     452              :           task,
     453              :           timeout,
     454              :           maxTriesPerClient,
     455              :           client: client,
     456              :         ),
     457              :     ];
     458              : 
     459            0 :     taskQueue.enqueueAll(tasks);
     460              : 
     461            0 :     workOnQueue();
     462              : 
     463            0 :     final results = await Future.wait(
     464            0 :       [
     465            0 :         for (final task in tasks) task.future,
     466              :       ],
     467            0 :     ).timeout(timeout);
     468              : 
     469            0 :     return consilidate(results);
     470              :   }
     471              : }
     472              : 
     473              : class Task<T, C> {
     474              :   final Future<T> Function(C client) task;
     475              :   final Completer<ValueOrError<T, C>> _completer = Completer();
     476              : 
     477              :   final Duration timeout;
     478              :   DateTime? startTime;
     479              : 
     480              :   final int maxTries;
     481              :   int tries = 0;
     482              : 
     483              :   final C? client;
     484              : 
     485            4 :   bool isTimedOut() {
     486            4 :     if (startTime == null) return false;
     487           20 :     return DateTime.now().difference(startTime!) > timeout;
     488              :   }
     489              : 
     490           12 :   Future<ValueOrError<T, C>> get future => _completer.future;
     491              : 
     492            4 :   void complete(ValueOrError<dynamic, C> result) {
     493            8 :     _completer.complete(
     494            4 :       result.when(
     495           16 :         value: (value) => Value<T, C>(value.value as T, extra: value.extra),
     496            0 :         error: (error) => Error<T, C>(
     497            0 :           error.error,
     498            0 :           stackTrace: error.stackTrace,
     499            0 :           extra: error.extra,
     500            0 :           children: error.children,
     501              :         ),
     502              :       ),
     503              :     );
     504              :   }
     505              : 
     506            4 :   Task(this.task, this.timeout, this.maxTries, {this.client});
     507              : }
     508              : 
     509              : class TaskQueue<T, C> {
     510              :   final List<Task<T, C>> _list = [];
     511              : 
     512            4 :   void enqueue(Task<T, C> task) {
     513            8 :     _list.add(task);
     514              :   }
     515              : 
     516            0 :   void enqueueAll(Iterable<Task<T, C>> tasks) {
     517            0 :     _list.addAll(tasks);
     518              :   }
     519              : 
     520            0 :   void enqueueFront(Task<T, C> task) {
     521            0 :     _list.insert(0, task);
     522              :   }
     523              : 
     524            4 :   Task<T, C>? dequeue() {
     525            8 :     if (_list.isEmpty) {
     526              :       return null;
     527              :     }
     528            8 :     return _list.removeAt(0);
     529              :   }
     530              : 
     531            0 :   bool get isEmpty => _list.isEmpty;
     532              : 
     533           12 :   bool get isNotEmpty => _list.isNotEmpty;
     534              : 
     535            0 :   Task<T, C>? peek() {
     536            0 :     return _list.isNotEmpty ? _list.first : null;
     537              :   }
     538              : 
     539            0 :   int get length => _list.length;
     540              : }
        

Generated by: LCOV version 2.0-1