从源码深入了解 Dio 的 CancelToken
2021/12/17 22:20:53
本文主要是介绍从源码深入了解 Dio 的 CancelToken,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
上一篇讲了 Dio 的 CancelToken 的使用,本篇来从源码解析 CancelToken 是如何实现取消网络请求的。相关的内容如下:
- CancelToken 类的实现
- CancelToken 如何取消网络请求
CancelToken 类
CalcelToken类的代码并不多,我们直接复制下来一个个过一遍。
import 'dart:async'; import 'dio_error.dart'; import 'options.dart'; /// You can cancel a request by using a cancel token. /// One token can be shared with different requests. /// when a token's [cancel] method invoked, all requests /// with this token will be cancelled. class CancelToken { CancelToken() { _completer = Completer<DioError>(); } /// Whether is throw by [cancel] static bool isCancel(DioError e) { return e.type == DioErrorType.cancel; } /// If request have been canceled, save the cancel Error. DioError? _cancelError; /// If request have been canceled, save the cancel Error. DioError? get cancelError => _cancelError; late Completer<DioError> _completer; RequestOptions? requestOptions; /// whether cancelled bool get isCancelled => _cancelError != null; /// When cancelled, this future will be resolved. Future<DioError> get whenCancel => _completer.future; /// Cancel the request void cancel([dynamic reason]) { _cancelError = DioError( type: DioErrorType.cancel, error: reason, requestOptions: requestOptions ?? RequestOptions(path: ''), ); _cancelError!.stackTrace = StackTrace.current; _completer.complete(_cancelError); } }
首先看注释,我们可以了解到 CancelToken 的一个非常有用的地方,一个 CancelToken 可以和多个请求关联,取消时可以同时取消多个关联的请求。这对于我们一个页面有多个请求时非常有用。大部分的是一些属性:
_cancelError
:被取消后存储的取消错误信息,对外可以通过 get 方式可以获取。_completer
:一个Completer<DioError>
对象,Completer
本是一个抽象类,用于管理异步操作事件。构建时返回的是一个Future
对象,可以调用对应的complete
(对应正常完成) 或completeError
(对应错误处理)。该属性为私有属性,外部不可访问。requestOptions
:RequestOptions
对象,是请求的一些可选属性(比如headers
,请求参数,请求方式等等),可以为空。该属性是公共属性,说明可以在外部修改。isCancelled
:布尔值,用于标识是否取消,实际是通过_cancelError
是否为空判断的,如果不为空说明是被取消了。whenCancel
:实际就是_completer
的future
对象,可以用来处理操作的响应,这样也相当于对_completer
做了一个封装,只暴露了其future
对象。cancel
:取消方法,也就是核心方法了,这个方法构建了一个DioError
对象(用于存储取消的错误),这里如果调用时传了reason
对象,也会将reason
传递到error
参数中,然后就是requestOptions
参数,如果requestOptions
为空则构建一个空的RequestOptions
对象。同时还会将当前的堆栈信息存入到_cancelError 的 stackTrace 中,方便跟踪堆栈信息。最后是调用_completer.complete
异步方法。这个是关键方法,我们看一下这个方法做了什么事情。
Completer类
我们进入 Completer 类来看一下 complete方法做了什么事情:
/// All listeners on the future are informed about the value. void complete([FutureOr<T>? value]);
可以看到这个方法是一个抽象方法,意味着应该是由 Completer
的具体实现类来实现的。同时从注释可以看到,这个方法是会调用监听器来告知 complete
方法的 value
泛型对象。可以理解为是通知观察者处理该对象。那我们就可以猜测是在请求的时候,如果有 cancelToken
参数时,应该是给 cancelToken
增加了一个监听器。继续来看 Dio 的请求代码的实现。
Dio 的请求代码
到 Dio 的源码 dio.dart看的时候,发现全部请求其实是fetch<T>(RequestOptionsrequestOptions)
的别名,也就是实际全部的请求都是通过该方法完成的。我们看一下这个方法的源码。代码很长,如果有兴趣的可以仔细阅读一下,我们这里只找出与 cancelToken 相关的代码。
@override Future<Response<T>> fetch<T>(RequestOptions requestOptions) async { if (requestOptions.cancelToken != null) { requestOptions.cancelToken!.requestOptions = requestOptions; } if (T != dynamic && !(requestOptions.responseType == ResponseType.bytes || requestOptions.responseType == ResponseType.stream)) { if (T == String) { requestOptions.responseType = ResponseType.plain; } else { requestOptions.responseType = ResponseType.json; } } // Convert the request interceptor to a functional callback in which // we can handle the return value of interceptor callback. FutureOr Function(dynamic) _requestInterceptorWrapper( void Function( RequestOptions options, RequestInterceptorHandler handler, ) interceptor, ) { return (dynamic _state) async { var state = _state as InterceptorState; if (state.type == InterceptorResultType.next) { return listenCancelForAsyncTask( requestOptions.cancelToken, Future(() { return checkIfNeedEnqueue(interceptors.requestLock, () { var requestHandler = RequestInterceptorHandler(); interceptor(state.data, requestHandler); return requestHandler.future; }); }), ); } else { return state; } }; } // Convert the response interceptor to a functional callback in which // we can handle the return value of interceptor callback. FutureOr<dynamic> Function(dynamic) _responseInterceptorWrapper( interceptor) { return (_state) async { var state = _state as InterceptorState; if (state.type == InterceptorResultType.next || state.type == InterceptorResultType.resolveCallFollowing) { return listenCancelForAsyncTask( requestOptions.cancelToken, Future(() { return checkIfNeedEnqueue(interceptors.responseLock, () { var responseHandler = ResponseInterceptorHandler(); interceptor(state.data, responseHandler); return responseHandler.future; }); }), ); } else { return state; } }; } // Convert the error interceptor to a functional callback in which // we can handle the return value of interceptor callback. FutureOr<dynamic> Function(dynamic, StackTrace stackTrace) _errorInterceptorWrapper(interceptor) { return (err, stackTrace) { if (err is! InterceptorState) { err = InterceptorState(assureDioError( err, requestOptions, stackTrace, )); } if (err.type == InterceptorResultType.next || err.type == InterceptorResultType.rejectCallFollowing) { return listenCancelForAsyncTask( requestOptions.cancelToken, Future(() { return checkIfNeedEnqueue(interceptors.errorLock, () { var errorHandler = ErrorInterceptorHandler(); interceptor(err.data, errorHandler); return errorHandler.future; }); }), ); } else { throw err; } }; } // Build a request flow in which the processors(interceptors) // execute in FIFO order. // Start the request flow var future = Future<dynamic>(() => InterceptorState(requestOptions)); // Add request interceptors to request flow interceptors.forEach((Interceptor interceptor) { future = future.then(_requestInterceptorWrapper(interceptor.onRequest)); }); // Add dispatching callback to request flow future = future.then(_requestInterceptorWrapper(( RequestOptions reqOpt, RequestInterceptorHandler handler, ) { requestOptions = reqOpt; _dispatchRequest(reqOpt).then( (value) => handler.resolve(value, true), one rror: (e) { handler.reject(e, true); }, ); })); // Add response interceptors to request flow interceptors.forEach((Interceptor interceptor) { future = future.then(_responseInterceptorWrapper(interceptor.onResponse)); }); // Add error handlers to request flow interceptors.forEach((Interceptor interceptor) { future = future.catchError(_errorInterceptorWrapper(interceptor.onError)); }); // Normalize errors, we convert error to the DioError return future.then<Response<T>>((data) { return assureResponse<T>( data is InterceptorState ? data.data : data, requestOptions, ); }).catchError((err, stackTrace) { var isState = err is InterceptorState; if (isState) { if ((err as InterceptorState).type == InterceptorResultType.resolve) { return assureResponse<T>(err.data, requestOptions); } } throw assureDioError( isState ? err.data : err, requestOptions, stackTrace, ); }); }
首先在一开始就检查了当前请求 requestOptions
的 cancelToken
是不是为空,如果不为空,就设置 cancelToken
的 requestOptions
为当前请求的requestOptions
,相当于在 cancelToken
缓存了所有的请求参数。
接下来是拦截器的处理,包括了请求拦截器,响应拦截器和错误拦截器。分别定义了一个内置的拦截器包装方法,用于将拦截器封装为函数式回调,以便进行统一的拦截处理。这个我们跳过,关键是每个拦截器的包装方法都有一个listenCancelForAsyncTask
方法,在拦截器状态是 next
(说明还有拦截要处理)的时候,会调用该方法并返回其返回值。这个方法第一个参数就是 cancelToken
。从方法名看就是监听异步任务的取消事件,看看这个方法做了什么事情。
异步任务取消事件监听
listenCancelForAsyncTask
方法很简单,其实就是返回了一个 Future.any 对象,然后在这个 Future里,如果 cancelToken
不为空的话,在响应 cancelToken
的取消事件时执行后续的处理。Future.any
的特性是将一系列的异步函数按统一的接口组装起来,按次序执行(上一个拦截器的处理完后轮到下一个拦截器执行),以执行 onValue
(正常情况)和onError
(异常情况) 方法。
这里如果 cancelToken
不为空,就会把cancelToken
的取消事件方法放到拦截器中,然后出现异常的时候会将异常抛出。这其实相当于是前置拦截,就是说如果请求还没处理(未加入处理队列)的时候,直接使用拦截器拦截。而如果请求已经加入到了处理队列,就需要在队列调度中处理了。
static Future<T> listenCancelForAsyncTask<T>( CancelToken? cancelToken, Future<T> future) { return Future.any([ if (cancelToken != null) cancelToken.whenCancel.then((e) => throw e), future, ]); }
/// Returns the result of the first future in [futures] to complete. /// /// The returned future is completed with the result of the first /// future in [futures] to report that it is complete, /// whether it's with a value or an error. /// The results of all the other futures are discarded. /// /// If [futures] is empty, or if none of its futures complete, /// the returned future never completes. static Future<T> any<T>(Iterable<Future<T>> futures) { var completer = new Completer<T>.sync(); void onValue(T value) { if (!completer.isCompleted) completer.complete(value); } void one rror(Object error, StackTrace stack) { if (!completer.isCompleted) completer.completeError(error, stack); } for (var future in futures) { future.then(onValue, one rror: one rror); } return completer.future; }
请求调度
实际的请求调度是在 dio_mixin.dart 中的_dispatchRequest
方法完成的,该方法实际在上面的 fetch
方法中调用。这个方法有两个地方用到了 cancelToken
,一个是使用 httpClientAdapter
的 fetch
方法时传入了 cancelToken
的whenCancel
属性。在 httpClientAdapter
引用是为了在取消请求后,能够回调告知监听器请求被取消。另外就是调用了一个 checkCancelled
方法,用于检查是否要停止请求。
responseBody = await httpClientAdapter.fetch( reqOpt, stream, cancelToken?.whenCancel, ); // If the request has been cancelled, stop request and throw error. static void checkCancelled(CancelToken? cancelToken) { if (cancelToken != null && cancelToken.cancelError != null) { throw cancelToken.cancelError!; } }
从这里我们就能够大致明白基本的机制了。实际上我们调用 cancelToken
的 cancel
方法的时候,标记了 cancelToken
的错误信息 cancelError
,以让_dispatchRequest
被调度的时候来检测是否取消。在_dispatchRequest
中如果检测到cancelError
不为空,就会抛出一个 cancelError
,中止当前以及接下来的请求。
总结
从源码来看,有一堆的 Future,以及各类包装方法,阅读起来相当费劲,这也看出来 Dio 的厉害之处,让一般人想这些网络请求头都能想炸去。实际我们从源码以及调试跟踪来看,CancelToken 的机制是:
- 事前取消:如果请求的 cancelToken 不为空,就会将 cancelToken 的异步处理加入到拦截器,取消后直接在拦截器环节就把请求拦住,不会到后续的调度环节。
- 事中取消:如果请求已经加入到了调度队列,此时取消的话会抛出异常,中止请求的发出。
- 事后取消:请求已经发出了,当服务端返回结果的时候,会在响应处理环节(包括出错)拦截,中止后续的响应处理。即便是服务端返回了数据,也会被拦截,不过这个实际意义不太大,无法降低服务端的负荷,只是避免后续的数据处理过程了。
这篇关于从源码深入了解 Dio 的 CancelToken的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-09-28pyqt 怎么打包整个项目-icode9专业技术文章分享
- 2024-09-28laravel Commands 创建带有参数的 Artisan 命令的步骤和示例-icode9专业技术文章分享
- 2024-09-28antd怎么实现渲染tiff图片-icode9专业技术文章分享
- 2024-09-28英文半角中划线和中文全角的中划线有什么区别-icode9专业技术文章分享
- 2024-09-28nvm npm 和node 他们之间有什么关系-icode9专业技术文章分享
- 2024-09-28Node Version Manager (nvm)使用教程-icode9专业技术文章分享
- 2024-09-28nvm命令太慢,是什么原因-icode9专业技术文章分享
- 2024-09-28Kotlin 如何增加、删除和修改 MutableStateFlow 中的值。-icode9专业技术文章分享
- 2024-09-28Kotlin的stateFlow.update 写法介绍-icode9专业技术文章分享
- 2024-09-28kotlin 怎么获取当前时间格式-icode9专业技术文章分享