LCOV - code coverage report
Current view: top level - crypto/evm/repositories/rpc - queued_rpc_interface.dart (source / functions) Coverage Total Hit
Test: lcov.info Lines: 71.7 % 92 66
Test Date: 2025-01-30 01:10:00 Functions: - 0 0

            Line data    Source code
       1              : import 'dart:async';
       2              : 
       3              : import 'package:walletkit_dart/src/common/logger.dart';
       4              : import 'package:walletkit_dart/walletkit_dart.dart';
       5              : 
       6              : enum RefreshType { onInit, onTask }
       7              : 
       8              : abstract class RpcManager {
       9              :   /// All clients available to the manager
      10              :   final List<EvmRpcClient> allClients;
      11              : 
      12              :   final RefreshType refreshType;
      13              : 
      14              :   final bool eagerError;
      15              : 
      16              :   /// Whether to wait for the clients to be refreshed before performing a task
      17              :   final bool awaitRefresh;
      18              : 
      19              :   /// Clients which successfully returned the current block number
      20              :   List<EvmRpcClient> clients;
      21              : 
      22              :   /// The rate at which the clients are refreshed. Only applicable if [refreshType] is [RefreshType.onInit]
      23              :   final Duration? clientRefreshRate;
      24              : 
      25              :   final Completer<void> _refreshCompleter = Completer();
      26              : 
      27              :   /// Future that completes when the clients are refreshed at least once
      28           15 :   Future<void> get refreshFuture => _refreshCompleter.future;
      29              : 
      30              :   bool _isRefreshingClients = false;
      31              : 
      32            5 :   RpcManager({
      33              :     required this.allClients,
      34              :     required this.eagerError,
      35              :     required this.awaitRefresh,
      36              :     required this.clientRefreshRate,
      37              :     required this.refreshType,
      38            5 :   }) : clients = List.from(allClients) {
      39           10 :     if (refreshType == RefreshType.onInit) {
      40            0 :       refreshClients();
      41            0 :       if (clientRefreshRate != null) {
      42            0 :         Timer.periodic(clientRefreshRate!, (_) => refreshClients());
      43              :       }
      44              :     }
      45              :   }
      46              : 
      47            5 :   void refreshClients() async {
      48            5 :     if (_isRefreshingClients) return;
      49            5 :     _isRefreshingClients = true;
      50           15 :     final futures = allClients.map((client) async {
      51              :       try {
      52           10 :         await client.getBlockNumber().timeout(const Duration(seconds: 10));
      53              :         return client;
      54              :       } catch (e, s) {
      55            2 :         Logger.logError(e, s: s, hint: 'Client Refresh Error');
      56              :         return null;
      57              :       }
      58              :     });
      59              : 
      60            5 :     final results = await Future.wait(futures);
      61              : 
      62           15 :     clients = results.whereType<EvmRpcClient>().toList();
      63              : 
      64           10 :     if (!_refreshCompleter.isCompleted) {
      65           10 :       _refreshCompleter.complete();
      66              :     }
      67            5 :     _isRefreshingClients = false;
      68           35 :     Logger.log('Selected clients: ${clients.map((e) => e.rpcUrl).toList()}');
      69              :   }
      70              : 
      71              :   Future<T> performTask<T>(
      72              :     Future<T> Function(EvmRpcClient client) task, {
      73              :     Duration timeout = const Duration(seconds: 30),
      74              :     int? maxTries,
      75              :   });
      76              : }
      77              : 
      78              : final class SimpleRpcManager extends RpcManager {
      79            3 :   SimpleRpcManager({
      80              :     required super.allClients,
      81              :     required super.awaitRefresh,
      82              :     required super.clientRefreshRate,
      83              :     required super.eagerError,
      84              :     required super.refreshType,
      85              :   });
      86              : 
      87            3 :   Future<T> performTask<T>(
      88              :     Future<T> Function(EvmRpcClient client) task, {
      89              :     Duration timeout = const Duration(seconds: 30),
      90              :     int? maxTries,
      91              :   }) async {
      92            6 :     if (refreshType == RefreshType.onTask &&
      93            9 :         _refreshCompleter.isCompleted == false) {
      94            3 :       refreshClients();
      95              :     }
      96              : 
      97            6 :     if (awaitRefresh) await refreshFuture;
      98              : 
      99            6 :     final currentClients = [...clients];
     100              : 
     101            6 :     if (clients.isEmpty) {
     102            0 :       throw Exception("No working clients available");
     103              :     }
     104              : 
     105            3 :     final errors = <String, (Object, StackTrace)>{};
     106              : 
     107            6 :     for (final client in currentClients) {
     108              :       try {
     109            6 :         final result = await task(client).timeout(timeout);
     110              :         return result;
     111              :       } catch (e, s) {
     112            0 :         errors[client.rpcUrl] = (e, s);
     113              : 
     114            0 :         if (eagerError) {
     115            0 :           throw Exception("Error performing task: $errors");
     116              :         }
     117              : 
     118            0 :         Logger.logError(e, s: s, hint: 'RPC Task Error');
     119              :       }
     120              :     }
     121              : 
     122            0 :     throw Exception("All clients failed to perform the task: $errors");
     123              :   }
     124              : }
     125              : 
     126              : final class QueuedRpcManager extends RpcManager {
     127            4 :   QueuedRpcManager({
     128              :     required super.allClients,
     129              :     required super.awaitRefresh,
     130              :     required super.clientRefreshRate,
     131              :     required super.eagerError,
     132              :     required super.refreshType,
     133              :   });
     134              : 
     135              :   final taskQueue = TaskQueue<dynamic, EvmRpcClient>();
     136              : 
     137              :   int currentClientIndex = 0;
     138              :   bool isWorking = false;
     139              : 
     140            4 :   Future<void> workOnQueue() async {
     141            4 :     if (isWorking) return;
     142            4 :     isWorking = true;
     143              : 
     144            8 :     if (refreshType == RefreshType.onTask &&
     145           12 :         _refreshCompleter.isCompleted == false) {
     146            4 :       refreshClients();
     147              :     }
     148              : 
     149            8 :     if (awaitRefresh) await refreshFuture;
     150              : 
     151            8 :     final currentClients = [...clients];
     152              : 
     153            4 :     final errors = <String, (Object, StackTrace)>{};
     154              : 
     155            8 :     while (taskQueue.isNotEmpty) {
     156            8 :       final task = taskQueue.dequeue();
     157              :       if (task == null) continue;
     158              : 
     159            4 :       if (currentClients.isEmpty) {
     160            0 :         task.completer.completeError(Exception("No working clients available"));
     161              :         continue;
     162              :       }
     163              : 
     164            8 :       final currentClient = currentClients[currentClientIndex];
     165              : 
     166              :       try {
     167            8 :         task.startTime ??= DateTime.now();
     168            4 :         if (task.isTimedOut()) throw TimeoutException("Task timed out");
     169            8 :         task.tries++;
     170            4 :         if (task.isMaxTriesReached) throw Exception("Max tries reached");
     171              : 
     172           16 :         final result = await task.task(currentClient).timeout(task.timeout);
     173            8 :         task.completer.complete(result);
     174            4 :         errors.clear(); // Clear errors if the task was successful
     175              :       } catch (e, s) {
     176            0 :         errors[currentClient.rpcUrl] = (e, s);
     177            0 :         if (eagerError) {
     178            0 :           final error = Exception("Error performing task: $errors");
     179            0 :           task.completer.completeError(error);
     180            0 :           errors.clear();
     181              :           continue;
     182              :         }
     183              : 
     184            0 :         if (task.tries == currentClients.length) {
     185              :           final error =
     186            0 :               Exception("All clients failed to perform the task: $errors");
     187            0 :           errors.clear();
     188            0 :           Logger.logError(error, hint: 'RPC Task Error');
     189            0 :           task.completer.completeError(error);
     190              :           continue;
     191              :         }
     192              : 
     193              :         // Switch to the next client
     194            0 :         currentClientIndex = (currentClientIndex + 1) % currentClients.length;
     195            0 :         taskQueue.enqueue(task);
     196              :         continue;
     197              :       }
     198              :     }
     199              : 
     200            4 :     isWorking = false;
     201              :   }
     202              : 
     203            4 :   Future<T> performTask<T>(
     204              :     Future<T> Function(EvmRpcClient client) task, {
     205              :     Duration timeout = const Duration(seconds: 30),
     206              :     int? maxTries,
     207              :   }) {
     208           12 :     final _task = Task(task, timeout, maxTries ?? clients.length);
     209            8 :     taskQueue.enqueue(_task);
     210              : 
     211            4 :     workOnQueue();
     212              : 
     213            4 :     return _task.future;
     214              :   }
     215              : }
     216              : 
     217              : class Task<T, C> {
     218              :   final Future<T> Function(C client) task;
     219              :   final Completer<T> completer = Completer();
     220              : 
     221              :   final Duration timeout;
     222              :   DateTime? startTime;
     223              : 
     224              :   final int maxTries;
     225              :   int tries = 0;
     226              : 
     227           16 :   bool get isMaxTriesReached => tries > maxTries;
     228              : 
     229            4 :   bool isTimedOut() {
     230            4 :     if (startTime == null) return false;
     231           20 :     return DateTime.now().difference(startTime!) > timeout;
     232              :   }
     233              : 
     234           12 :   Future<T> get future => completer.future;
     235              : 
     236            4 :   Task(this.task, this.timeout, this.maxTries);
     237              : }
     238              : 
     239              : class TaskQueue<T, C> {
     240              :   final List<Task<T, C>> _list = [];
     241              : 
     242            4 :   void enqueue(Task<T, C> task) {
     243            8 :     _list.add(task);
     244              :   }
     245              : 
     246            4 :   Task<T, C>? dequeue() {
     247            8 :     if (_list.isEmpty) {
     248              :       return null;
     249              :     }
     250            8 :     return _list.removeAt(0);
     251              :   }
     252              : 
     253            0 :   bool get isEmpty => _list.isEmpty;
     254              : 
     255           12 :   bool get isNotEmpty => _list.isNotEmpty;
     256              : 
     257            0 :   Task<T, C>? peek() {
     258            0 :     return _list.isNotEmpty ? _list.first : null;
     259              :   }
     260              : 
     261            0 :   int get length => _list.length;
     262              : }
        

Generated by: LCOV version 2.0-1