Line data Source code
1 : import 'dart:async';
2 :
3 : import 'package:walletkit_dart/src/common/logger.dart';
4 : import 'package:walletkit_dart/walletkit_dart.dart';
5 :
6 : enum RefreshType { onInit, onTask }
7 :
8 : abstract class RpcManager {
9 : /// All clients available to the manager
10 : final List<EvmRpcClient> allClients;
11 :
12 : final RefreshType refreshType;
13 :
14 : final bool eagerError;
15 :
16 : /// Whether to wait for the clients to be refreshed before performing a task
17 : final bool awaitRefresh;
18 :
19 : /// Clients which successfully returned the current block number
20 : List<EvmRpcClient> clients;
21 :
22 : /// The rate at which the clients are refreshed. Only applicable if [refreshType] is [RefreshType.onInit]
23 : final Duration? clientRefreshRate;
24 :
25 : final Completer<void> _refreshCompleter = Completer();
26 :
27 : /// Future that completes when the clients are refreshed at least once
28 15 : Future<void> get refreshFuture => _refreshCompleter.future;
29 :
30 : bool _isRefreshingClients = false;
31 :
32 5 : RpcManager({
33 : required this.allClients,
34 : required this.eagerError,
35 : required this.awaitRefresh,
36 : required this.clientRefreshRate,
37 : required this.refreshType,
38 5 : }) : clients = List.from(allClients) {
39 10 : if (refreshType == RefreshType.onInit) {
40 0 : refreshClients();
41 0 : if (clientRefreshRate != null) {
42 0 : Timer.periodic(clientRefreshRate!, (_) => refreshClients());
43 : }
44 : }
45 : }
46 :
47 5 : void refreshClients() async {
48 5 : if (_isRefreshingClients) return;
49 5 : _isRefreshingClients = true;
50 15 : final futures = allClients.map((client) async {
51 : try {
52 10 : await client.getBlockNumber().timeout(const Duration(seconds: 10));
53 : return client;
54 : } catch (e, s) {
55 2 : Logger.logError(e, s: s, hint: 'Client Refresh Error');
56 : return null;
57 : }
58 : });
59 :
60 5 : final results = await Future.wait(futures);
61 :
62 15 : clients = results.whereType<EvmRpcClient>().toList();
63 :
64 10 : if (!_refreshCompleter.isCompleted) {
65 10 : _refreshCompleter.complete();
66 : }
67 5 : _isRefreshingClients = false;
68 35 : Logger.log('Selected clients: ${clients.map((e) => e.rpcUrl).toList()}');
69 : }
70 :
71 : Future<T> performTask<T>(
72 : Future<T> Function(EvmRpcClient client) task, {
73 : Duration timeout = const Duration(seconds: 30),
74 : int? maxTries,
75 : });
76 : }
77 :
78 : final class SimpleRpcManager extends RpcManager {
79 3 : SimpleRpcManager({
80 : required super.allClients,
81 : required super.awaitRefresh,
82 : required super.clientRefreshRate,
83 : required super.eagerError,
84 : required super.refreshType,
85 : });
86 :
87 3 : Future<T> performTask<T>(
88 : Future<T> Function(EvmRpcClient client) task, {
89 : Duration timeout = const Duration(seconds: 30),
90 : int? maxTries,
91 : }) async {
92 6 : if (refreshType == RefreshType.onTask &&
93 9 : _refreshCompleter.isCompleted == false) {
94 3 : refreshClients();
95 : }
96 :
97 6 : if (awaitRefresh) await refreshFuture;
98 :
99 6 : final currentClients = [...clients];
100 :
101 6 : if (clients.isEmpty) {
102 0 : throw Exception("No working clients available");
103 : }
104 :
105 3 : final errors = <String, (Object, StackTrace)>{};
106 :
107 6 : for (final client in currentClients) {
108 : try {
109 6 : final result = await task(client).timeout(timeout);
110 : return result;
111 : } catch (e, s) {
112 0 : errors[client.rpcUrl] = (e, s);
113 :
114 0 : if (eagerError) {
115 0 : throw Exception("Error performing task: $errors");
116 : }
117 :
118 0 : Logger.logError(e, s: s, hint: 'RPC Task Error');
119 : }
120 : }
121 :
122 0 : throw Exception("All clients failed to perform the task: $errors");
123 : }
124 : }
125 :
126 : final class QueuedRpcManager extends RpcManager {
127 4 : QueuedRpcManager({
128 : required super.allClients,
129 : required super.awaitRefresh,
130 : required super.clientRefreshRate,
131 : required super.eagerError,
132 : required super.refreshType,
133 : });
134 :
135 : final taskQueue = TaskQueue<dynamic, EvmRpcClient>();
136 :
137 : int currentClientIndex = 0;
138 : bool isWorking = false;
139 :
140 4 : Future<void> workOnQueue() async {
141 4 : if (isWorking) return;
142 4 : isWorking = true;
143 :
144 8 : if (refreshType == RefreshType.onTask &&
145 12 : _refreshCompleter.isCompleted == false) {
146 4 : refreshClients();
147 : }
148 :
149 8 : if (awaitRefresh) await refreshFuture;
150 :
151 8 : final currentClients = [...clients];
152 :
153 4 : final errors = <String, (Object, StackTrace)>{};
154 :
155 8 : while (taskQueue.isNotEmpty) {
156 8 : final task = taskQueue.dequeue();
157 : if (task == null) continue;
158 :
159 4 : if (currentClients.isEmpty) {
160 0 : task.completer.completeError(Exception("No working clients available"));
161 : continue;
162 : }
163 :
164 8 : final currentClient = currentClients[currentClientIndex];
165 :
166 : try {
167 8 : task.startTime ??= DateTime.now();
168 4 : if (task.isTimedOut()) throw TimeoutException("Task timed out");
169 8 : task.tries++;
170 4 : if (task.isMaxTriesReached) throw Exception("Max tries reached");
171 :
172 16 : final result = await task.task(currentClient).timeout(task.timeout);
173 8 : task.completer.complete(result);
174 4 : errors.clear(); // Clear errors if the task was successful
175 : } catch (e, s) {
176 0 : errors[currentClient.rpcUrl] = (e, s);
177 0 : if (eagerError) {
178 0 : final error = Exception("Error performing task: $errors");
179 0 : task.completer.completeError(error);
180 0 : errors.clear();
181 : continue;
182 : }
183 :
184 0 : if (task.tries == currentClients.length) {
185 : final error =
186 0 : Exception("All clients failed to perform the task: $errors");
187 0 : errors.clear();
188 0 : Logger.logError(error, hint: 'RPC Task Error');
189 0 : task.completer.completeError(error);
190 : continue;
191 : }
192 :
193 : // Switch to the next client
194 0 : currentClientIndex = (currentClientIndex + 1) % currentClients.length;
195 0 : taskQueue.enqueue(task);
196 : continue;
197 : }
198 : }
199 :
200 4 : isWorking = false;
201 : }
202 :
203 4 : Future<T> performTask<T>(
204 : Future<T> Function(EvmRpcClient client) task, {
205 : Duration timeout = const Duration(seconds: 30),
206 : int? maxTries,
207 : }) {
208 12 : final _task = Task(task, timeout, maxTries ?? clients.length);
209 8 : taskQueue.enqueue(_task);
210 :
211 4 : workOnQueue();
212 :
213 4 : return _task.future;
214 : }
215 : }
216 :
217 : class Task<T, C> {
218 : final Future<T> Function(C client) task;
219 : final Completer<T> completer = Completer();
220 :
221 : final Duration timeout;
222 : DateTime? startTime;
223 :
224 : final int maxTries;
225 : int tries = 0;
226 :
227 16 : bool get isMaxTriesReached => tries > maxTries;
228 :
229 4 : bool isTimedOut() {
230 4 : if (startTime == null) return false;
231 20 : return DateTime.now().difference(startTime!) > timeout;
232 : }
233 :
234 12 : Future<T> get future => completer.future;
235 :
236 4 : Task(this.task, this.timeout, this.maxTries);
237 : }
238 :
239 : class TaskQueue<T, C> {
240 : final List<Task<T, C>> _list = [];
241 :
242 4 : void enqueue(Task<T, C> task) {
243 8 : _list.add(task);
244 : }
245 :
246 4 : Task<T, C>? dequeue() {
247 8 : if (_list.isEmpty) {
248 : return null;
249 : }
250 8 : return _list.removeAt(0);
251 : }
252 :
253 0 : bool get isEmpty => _list.isEmpty;
254 :
255 12 : bool get isNotEmpty => _list.isNotEmpty;
256 :
257 0 : Task<T, C>? peek() {
258 0 : return _list.isNotEmpty ? _list.first : null;
259 : }
260 :
261 0 : int get length => _list.length;
262 : }
|