Line data Source code
1 : import 'dart:async';
2 :
3 : import 'package:walletkit_dart/src/common/logger.dart';
4 : import 'package:walletkit_dart/walletkit_dart.dart';
5 :
6 : sealed class ValueOrError<T, E> {
7 : final E? extra;
8 :
9 5 : const ValueOrError({this.extra});
10 :
11 5 : factory ValueOrError.value(
12 : T value, {
13 : E? extra,
14 : }) =>
15 5 : Value(value, extra: extra);
16 :
17 0 : factory ValueOrError.error(
18 : Object error, {
19 : StackTrace? stackTrace,
20 : E? extra,
21 : }) =>
22 0 : Error<T, E>(
23 : error,
24 : stackTrace: stackTrace,
25 : extra: extra,
26 : );
27 :
28 5 : R when<R>({
29 : required R Function(Value<T, E> value) value,
30 : required R Function(Error<T, E> error) error,
31 : }) {
32 : return switch (this) {
33 10 : Value<T, E> v => value(v),
34 0 : Error<T, E> e => error(e),
35 : };
36 : }
37 : }
38 :
39 : final class Value<T, E> extends ValueOrError<T, E> {
40 : final T value;
41 :
42 5 : const Value(this.value, {super.extra});
43 : }
44 :
45 : final class Error<T, E> extends ValueOrError<T, E> {
46 : final Object error;
47 : final StackTrace? stackTrace;
48 : final List<Error<dynamic, E>>? children;
49 :
50 0 : const Error(
51 : this.error, {
52 : this.stackTrace,
53 : this.children,
54 : super.extra,
55 : });
56 :
57 0 : @override
58 : String toString() {
59 0 : return 'Error{error: $error, extra: $extra,stackTrace: $stackTrace, children: $children}';
60 : }
61 : }
62 :
63 : extension ValueOrErrorExtension<T> on Future<T> {
64 5 : Future<ValueOrError<T, E>> toValueOrError<E>({
65 : E? extra,
66 : }) {
67 5 : return then(
68 10 : (value) => ValueOrError<T, E>.value(value),
69 5 : ).catchError(
70 0 : (e, s) => Error<T, E>(
71 : e,
72 : stackTrace: s,
73 : extra: extra,
74 : ),
75 : );
76 : }
77 : }
78 :
79 : enum RefreshType { onInit, onTask }
80 :
81 : abstract class RpcManager {
82 : /// All clients available to the manager
83 : final List<EvmRpcClient> allClients;
84 :
85 : final RefreshType refreshType;
86 :
87 : final bool eagerError;
88 :
89 : /// Whether to wait for the clients to be refreshed before performing a task
90 : final bool awaitRefresh;
91 :
92 : /// Clients which successfully returned the current block number
93 : List<EvmRpcClient> clients;
94 :
95 : /// The rate at which the clients are refreshed. Only applicable if [refreshType] is [RefreshType.onInit]
96 : final Duration? clientRefreshRate;
97 :
98 : final Completer<void> _refreshCompleter = Completer();
99 :
100 : /// Future that completes when the clients are refreshed at least once
101 15 : Future<void> get refreshFuture => _refreshCompleter.future;
102 :
103 : bool _isRefreshingClients = false;
104 :
105 5 : RpcManager({
106 : required this.allClients,
107 : required this.eagerError,
108 : required this.awaitRefresh,
109 : required this.clientRefreshRate,
110 : required this.refreshType,
111 5 : }) : clients = List.from(allClients) {
112 10 : if (refreshType == RefreshType.onInit) {
113 0 : refreshClients();
114 0 : if (clientRefreshRate != null) {
115 0 : Timer.periodic(clientRefreshRate!, (_) => refreshClients());
116 : }
117 : }
118 : }
119 :
120 5 : Future refreshClients() async {
121 5 : if (_isRefreshingClients) return;
122 5 : _isRefreshingClients = true;
123 15 : final futures = allClients.map((client) async {
124 : try {
125 10 : await client.getBlockNumber().timeout(const Duration(seconds: 10));
126 : return client;
127 : } catch (e, s) {
128 2 : Logger.logError(e, s: s, hint: 'Client Refresh Error');
129 : return null;
130 : }
131 : });
132 :
133 5 : final results = await Future.wait(futures);
134 :
135 15 : clients = results.whereType<EvmRpcClient>().toList();
136 :
137 10 : if (!_refreshCompleter.isCompleted) {
138 10 : _refreshCompleter.complete();
139 : }
140 5 : _isRefreshingClients = false;
141 35 : Logger.log('Selected clients: ${clients.map((e) => e.rpcUrl).toList()}');
142 : }
143 :
144 : ///
145 : /// Perform a single task using one of the available clients
146 : /// Retries the task on the next client if the current client fails
147 : ///
148 : Future<ValueOrError<T, EvmRpcClient>> performTask<T>(
149 : Future<T> Function(EvmRpcClient client) task, {
150 : Duration timeout = const Duration(seconds: 30),
151 : int? maxTries,
152 : });
153 :
154 : ///
155 : /// Perform a single task on all available clients
156 : ///
157 : Future<R> performTaskForClients<T, R>(
158 : Future<T> Function(EvmRpcClient client) task, {
159 : required R Function(
160 : List<ValueOrError<T, EvmRpcClient>> results,
161 : ) consilidate,
162 : Duration timeout = const Duration(seconds: 30),
163 : int maxTriesPerClient = 2,
164 : int minClients = 2,
165 : int? maxClients,
166 : bool enforceParallel = false,
167 : });
168 :
169 4 : Future<ValueOrError<T, EvmRpcClient>> performTaskForClient<T>(
170 : Future<T> Function(EvmRpcClient client) task, {
171 : required EvmRpcClient client,
172 : Duration timeout = const Duration(seconds: 30),
173 : int maxTries = 2,
174 : }) async {
175 4 : if (maxTries == 1) {
176 12 : return task(client).timeout(timeout).toValueOrError(extra: client);
177 : }
178 0 : final errors = <Error<T, EvmRpcClient>>[];
179 :
180 0 : for (int i = 0; i < maxTries; i++) {
181 : final result =
182 0 : await task(client).timeout(timeout).toValueOrError(extra: client);
183 :
184 : switch (result) {
185 0 : case Value<T, EvmRpcClient> value:
186 : return value;
187 0 : case Error<T, EvmRpcClient> error:
188 0 : errors.add(error);
189 0 : Logger.logError(
190 0 : error.error,
191 0 : s: error.stackTrace,
192 : hint: 'RPC Task Error',
193 : );
194 : }
195 : }
196 0 : return Error(
197 0 : "Failed to perform the task after $maxTries tries",
198 : stackTrace: null,
199 : children: errors,
200 : );
201 : }
202 : }
203 :
204 : final class SimpleRpcManager extends RpcManager {
205 5 : SimpleRpcManager({
206 : required super.allClients,
207 : required super.awaitRefresh,
208 : required super.clientRefreshRate,
209 : required super.eagerError,
210 : required super.refreshType,
211 : });
212 :
213 0 : Future<R> performTaskForClients<T, R>(
214 : Future<T> Function(EvmRpcClient client) task, {
215 : required R Function(
216 : List<ValueOrError<T, EvmRpcClient>> results,
217 : ) consilidate,
218 : Duration timeout = const Duration(seconds: 30),
219 : int maxTriesPerClient = 2,
220 : int minClients = 2,
221 : int? maxClients,
222 : bool enforceParallel = false,
223 : }) async {
224 : assert(
225 0 : maxClients == null || maxClients >= minClients,
226 : "maxClients must be greater than or equal to minClients",
227 : );
228 0 : final clientsToUse = [...clients].take(maxClients ?? clients.length);
229 :
230 0 : if (clientsToUse.length < minClients) {
231 0 : throw Exception("Not enough clients available");
232 : }
233 :
234 0 : final results = await Future.wait(
235 0 : [
236 0 : for (final client in clientsToUse)
237 0 : performTaskForClient(
238 : task,
239 : client: client,
240 : timeout: timeout,
241 : maxTries: maxTriesPerClient,
242 : ),
243 : ],
244 : );
245 :
246 0 : return consilidate(results);
247 : }
248 :
249 3 : Future<ValueOrError<T, EvmRpcClient>> performTask<T>(
250 : Future<T> Function(EvmRpcClient client) task, {
251 : Duration timeout = const Duration(seconds: 30),
252 : int? maxTries,
253 : }) async {
254 6 : if (refreshType == RefreshType.onTask &&
255 9 : _refreshCompleter.isCompleted == false) {
256 3 : refreshClients();
257 : }
258 :
259 6 : if (awaitRefresh) await refreshFuture;
260 :
261 6 : final currentClients = [...clients];
262 :
263 6 : if (clients.isEmpty) {
264 0 : throw Exception("No working clients available");
265 : }
266 :
267 3 : final errors = <Error<T, EvmRpcClient>>[];
268 :
269 6 : for (final client in currentClients) {
270 : final result =
271 9 : await task(client).timeout(timeout).toValueOrError(extra: client);
272 :
273 : switch (result) {
274 3 : case Value<T, EvmRpcClient> value:
275 : return value;
276 0 : case Error<T, EvmRpcClient> error:
277 0 : errors.add(error);
278 0 : Logger.logError(
279 0 : error.error,
280 0 : s: error.stackTrace,
281 : hint: 'RPC Task Error',
282 : );
283 0 : if (eagerError) {
284 : return error;
285 : }
286 : break;
287 : }
288 : }
289 :
290 0 : return Error(
291 0 : "All clients failed to perform the task: $errors",
292 : children: errors,
293 : );
294 : }
295 : }
296 :
297 : final class QueuedRpcManager extends SimpleRpcManager {
298 4 : QueuedRpcManager({
299 : required super.allClients,
300 : required super.awaitRefresh,
301 : required super.clientRefreshRate,
302 : required super.eagerError,
303 : required super.refreshType,
304 : });
305 :
306 : final taskQueue = TaskQueue<dynamic, EvmRpcClient>();
307 :
308 : int currentClientIndex = 0;
309 : bool isWorking = false;
310 :
311 4 : Future<void> workOnQueue() async {
312 4 : if (isWorking) return;
313 4 : isWorking = true;
314 :
315 8 : if (refreshType == RefreshType.onTask &&
316 12 : _refreshCompleter.isCompleted == false) {
317 4 : refreshClients();
318 : }
319 :
320 8 : if (awaitRefresh) await refreshFuture;
321 :
322 8 : final currentClients = [...clients];
323 :
324 4 : final errors = <Error<dynamic, EvmRpcClient>>[];
325 :
326 8 : while (taskQueue.isNotEmpty) {
327 8 : final task = taskQueue.dequeue();
328 : if (task == null) continue;
329 :
330 4 : if (currentClients.isEmpty) {
331 0 : task.complete(
332 0 : Error(
333 : "No working clients available",
334 : children: errors,
335 : ),
336 : );
337 : continue;
338 : }
339 :
340 12 : final currentClient = task.client ?? currentClients[currentClientIndex];
341 :
342 8 : task.startTime ??= DateTime.now();
343 4 : if (task.isTimedOut()) {
344 0 : task.complete(
345 0 : Error(
346 0 : "Task timed out after ${task.timeout}",
347 : children: errors,
348 : ),
349 : );
350 : continue;
351 : }
352 8 : task.tries++;
353 :
354 4 : final result = await performTaskForClient(
355 4 : task.task,
356 : client: currentClient,
357 4 : timeout: task.timeout,
358 : maxTries: 1,
359 : );
360 :
361 4 : if (result is Value<dynamic, EvmRpcClient>) {
362 4 : task.complete(result);
363 4 : errors.clear();
364 : continue;
365 : }
366 :
367 0 : if (result is Error<dynamic, EvmRpcClient>) {
368 0 : errors.add(result);
369 0 : if (task.tries >= task.maxTries) {
370 0 : task.complete(
371 0 : Error(
372 0 : "Failed to perform the task: $errors after ${task.tries} tries",
373 : children: errors,
374 : ),
375 : );
376 0 : errors.clear();
377 : continue;
378 : }
379 :
380 0 : if (task.tries == currentClients.length) {
381 0 : task.complete(
382 0 : Error(
383 0 : "All clients failed to perform the task: $errors",
384 : children: errors,
385 : ),
386 : );
387 0 : errors.clear();
388 : continue;
389 : }
390 :
391 : // Switch to the next client
392 0 : if (task.client == null) {
393 0 : currentClientIndex = (currentClientIndex + 1) % currentClients.length;
394 : }
395 0 : taskQueue.enqueueFront(task);
396 : }
397 : }
398 :
399 4 : isWorking = false;
400 : }
401 :
402 4 : Future<ValueOrError<T, EvmRpcClient>> performTask<T>(
403 : Future<T> Function(EvmRpcClient client) task, {
404 : Duration timeout = const Duration(seconds: 30),
405 : int? maxTries,
406 : }) {
407 12 : final _task = Task(task, timeout, maxTries ?? clients.length);
408 8 : taskQueue.enqueue(_task);
409 :
410 4 : workOnQueue();
411 :
412 4 : return _task.future;
413 : }
414 :
415 0 : @override
416 : Future<R> performTaskForClients<T, R>(
417 : Future<T> Function(EvmRpcClient client) task, {
418 : required R Function(
419 : List<ValueOrError<T, EvmRpcClient>> results,
420 : ) consilidate,
421 : Duration timeout = const Duration(seconds: 30),
422 : int maxTriesPerClient = 2,
423 : int minClients = 2,
424 : int? maxClients,
425 : bool enforceParallel = false,
426 : }) async {
427 : /// If [enforceParallel] is true, perform the task on all clients at once => SimpleRpcManager Implementation
428 : if (enforceParallel) {
429 0 : return super.performTaskForClients(
430 : task,
431 : consilidate: consilidate,
432 : timeout: timeout,
433 : maxTriesPerClient: maxTriesPerClient,
434 : minClients: minClients,
435 : maxClients: maxClients,
436 : );
437 : }
438 : assert(
439 0 : maxClients == null || maxClients >= minClients,
440 : "maxClients must be greater than or equal to minClients",
441 : );
442 :
443 0 : final clientsToUse = clients.take(maxClients ?? clients.length);
444 :
445 0 : if (clientsToUse.length < minClients) {
446 0 : throw Exception("Not enough clients available");
447 : }
448 :
449 0 : final tasks = [
450 0 : for (final client in clientsToUse)
451 0 : Task(
452 : task,
453 : timeout,
454 : maxTriesPerClient,
455 : client: client,
456 : ),
457 : ];
458 :
459 0 : taskQueue.enqueueAll(tasks);
460 :
461 0 : workOnQueue();
462 :
463 0 : final results = await Future.wait(
464 0 : [
465 0 : for (final task in tasks) task.future,
466 : ],
467 0 : ).timeout(timeout);
468 :
469 0 : return consilidate(results);
470 : }
471 : }
472 :
473 : class Task<T, C> {
474 : final Future<T> Function(C client) task;
475 : final Completer<ValueOrError<T, C>> _completer = Completer();
476 :
477 : final Duration timeout;
478 : DateTime? startTime;
479 :
480 : final int maxTries;
481 : int tries = 0;
482 :
483 : final C? client;
484 :
485 4 : bool isTimedOut() {
486 4 : if (startTime == null) return false;
487 20 : return DateTime.now().difference(startTime!) > timeout;
488 : }
489 :
490 12 : Future<ValueOrError<T, C>> get future => _completer.future;
491 :
492 4 : void complete(ValueOrError<dynamic, C> result) {
493 8 : _completer.complete(
494 4 : result.when(
495 16 : value: (value) => Value<T, C>(value.value as T, extra: value.extra),
496 0 : error: (error) => Error<T, C>(
497 0 : error.error,
498 0 : stackTrace: error.stackTrace,
499 0 : extra: error.extra,
500 0 : children: error.children,
501 : ),
502 : ),
503 : );
504 : }
505 :
506 4 : Task(this.task, this.timeout, this.maxTries, {this.client});
507 : }
508 :
509 : class TaskQueue<T, C> {
510 : final List<Task<T, C>> _list = [];
511 :
512 4 : void enqueue(Task<T, C> task) {
513 8 : _list.add(task);
514 : }
515 :
516 0 : void enqueueAll(Iterable<Task<T, C>> tasks) {
517 0 : _list.addAll(tasks);
518 : }
519 :
520 0 : void enqueueFront(Task<T, C> task) {
521 0 : _list.insert(0, task);
522 : }
523 :
524 4 : Task<T, C>? dequeue() {
525 8 : if (_list.isEmpty) {
526 : return null;
527 : }
528 8 : return _list.removeAt(0);
529 : }
530 :
531 0 : bool get isEmpty => _list.isEmpty;
532 :
533 12 : bool get isNotEmpty => _list.isNotEmpty;
534 :
535 0 : Task<T, C>? peek() {
536 0 : return _list.isNotEmpty ? _list.first : null;
537 : }
538 :
539 0 : int get length => _list.length;
540 : }
|