Alice 4: Future 的操作符
操作符是 Future 的关键,好的操作符设计能让异步任务的调度如虎添翼,比如链式调用:
let future = asyncTask()
future
.flatMap {
anotherAsyncTask()
}
.validate {
$0.isValid
}
.timeout(3)
.wait()
我为 Future
实现了很多操作符。
其中有标配的 map
,flatMap
,whenAny
,whenAll
等,几乎所有 future & promise
库都会提供这些实现。有常用的 wait
,timeout
,validate
,reduce
等,这是一些常用扩展,如果没有内置,用户大概率要在未来某天自己实现。还有一些个人趣味的 flat
,mute
,asAny
,asVoid
,我的 API 哲学的关键字是友好,所以即使是两步变一步,只要该操作符足够友好,我也会加进来,:-D。
map & tryMap & mapError
map 系列的作用是 transform。其中 map
用来 transform success
,tryMap
用来 try
transform success
,mapError
用来 transform error
。
getUser()
.map {
$0.name
}
.whenComplete {
//
}
它的实现就在合适的地方 transform 一下。
public func map<U>(_ body: @escaping (Success) -> U) -> Future<U, Failure> {
let p = Promise<U, Failure>()
self.whenComplete { r in
switch r {
case .success(let s):
p.succeed(body(s))
case .failure(let e):
p.fail(e)
}
}
return p.future
}
public func mapError<E>(_ body: @escaping (Error) -> E) -> Future<Success, E> {
let p = Promise<Success, E>()
self.whenComplete { r in
switch r {
case .success(let s):
p.succeed(s)
case .failure(let e):
p.fail(body(e))
}
}
return p.future
}
flatMap
flatMap
也是 map,它要 body
返回一个新的 future
,这是大概是最常见的异步任务组合。
getUser()
.flatMap {
getTweets(by: $0.id)
}
.whenComplete {
}
它的实现也不复杂:
public func flatMap<U>(_ body: @escaping (Success) -> Future<U, Failure>) -> Future<U, Failure> {
let p = Promise<U, Failure>()
self.whenComplete { r in
switch r {
case .success(let s):
body(s).pipe(to: p)
case .failure(let e):
p.fail(e)
}
}
return p.future
}
其中 pipe
的实现只有一行,就是把结果 pipe 到 promise:
public func pipe(to promise: Promise<Success, Failure>) {
self.whenComplete(promise.complete)
}
validate
validate
用来检查成功值。
getUser()
.validate({
$0.isAdmin
}, whenFail: .notAdmin)
它的 API 设计可能与你的想象有些不同,validate
除了需要一个 (Success) -> Bool
类型的闭包外,还需要一个 () -> Failure
类型的参数,指示 whenFail
应该做什么。
它的实现是对 map
的小小改动——其实很多操作符都是这样。
self.whenComplete { r in
switch r {
case .success(let s):
if body(s) {
p.succeed(s)
} else {
p.fail(whenFail())
}
case .failure(let e):
p.fail(e)
}
}
wait
接下来是 wait
,有时你需要等待异步任务的完成:
let data = getUser().wait()
我使用了 DispatchSemaphore
:
public func wait() -> Success? {
let sema = DispatchSemaphore(value: 0)
self.whenComplete { _ in
sema.signal()
}
sema.wait()
return self.inspectWithoutLock()!.value
}
注意!不要同步等待,会死锁的,想象一下 DispatchQueue.main.sync { }
。
whenAll* & whenAny*
whenAll
与 whenAny
顾名思义,一个是全部完成,一个是任一完成。
whenAll
的实现用了 Atom<Int>
,当为 0 时,表示所有任务都完成啦。
public static func whenAllComplete<C: Collection>(_ thenables: C) -> Future<[Result<C.Element.Success, C.Element.Failure>], C.Element.Failure> where C.Element: Thenable {
let p = Promise<[Result<C.Element.Success, C.Element.Failure>], C.Element.Failure>()
let count = Atom(thenables.count)
for t in thenables {
t.whenComplete { _ in
if count.sub(1) == 1 {
p.succeed(thenables.map({ $0.inspectWithoutLock()! }))
}
}
}
return p.future
}
whenAny
简单,因为 Future
的完成本来就是一次性的,所以直接:
public static func whenAnyComplete<S: Sequence>(_ thenables: S) -> Future<S.Element.Success, S.Element.Failure> where S.Element: Thenable {
let p = Promise<S.Element.Success, S.Element.Failure>()
for t in thenables {
t.pipe(to: p)
}
return p.future
}
你可能发现了,上述包括 map
,flatMap
,wait
,when**
在内的所有操作符都挂在了 protocol Thenable
下。
那是因为,我们需要 Thenable
定义这样的方法:
public static func whenAllComplete<T1: Thenable, T2: Thenable>(
_ thenable1: T1,
_ thenable2: T2
)
-> Future<(Result<T1.Success, T1.Failure>, Result<T2.Success, T2.Failure>), T1.Failure>
where T1.Failure == T2.Failure
Thenable
其实是一个 Future
的抽象,我们(目前)不计划有别的 Thenable
实现。
yield
yield
是另一个必备需求,即切换线程。在后台线程里读到的图片文件要在主线程里显示:
readImage()
.yiled(on: DispatchQueue.main)
.whenComplete {
self.imageView.image = $0
}
这里我设计了 protocol Scheduler
,它的定义如下:
public protocol Scheduler {
func schedule(_ task: @escaping () -> Void)
}
Async
默认让 DispatchQueue
OperationQueue
RunLoop
遵守该协议,方便你的 yield
。
更多
以上便是一些示例,还有非常非常多游泳的操作符,你可以到 Alice 的 GitHub 主页 查看源码实现——
// retry
public static func retry<Success, Failure>(count: Int, task: @escaping () -> Future<Success, Failure>) -> Future<Success, Failure> {
return self._retry(count: count, task: task)
}
static func _retry<Success, Failure>(count: Int, task: @escaping () -> Future<Success, Failure>) -> Future<Success, Failure> {
return task().flatMapError {
if count == 0 {
return Future<Success, Failure>.failure($0)
} else {
return _retry(count: count - 1, task: task)
}
}
}
// reduce
public static func reduce<S: Sequence, U: Thenable>(_ thenables: S, initial: U, nextPartial: @escaping (U.Success, S.Element.Success) -> Future<U.Success, U.Failure>) -> Future<U.Success, U.Failure> where S.Element: Thenable, S.Element.Failure == U.Failure {
return thenables.reduce(initial.toFuture()) { (fu, ft) -> Future<U.Success, U.Failure> in
return whenAllSucceed(fu, ft).flatMap { (ut) in
return nextPartial(ut.0, ut.1)
}
}
}
// flat
public func flat() -> Future<Success.Success, Failure> {
return self.flatMap { $0.toFuture() }
}
// mute
public func mute() { }
// Very, very many other useful and cute operators...
这些操作符中,有一些是 alias
,即别名,比如 any
-> whenAnyComplete
,final
-> always
,在不同的地方可能不同的名字更合适,我一向不拒绝通过扩展让 api 变得更友好的方式。😉
如果你有一些好操作符的点子,欢迎 pr!如果你不喜欢哪些已有的 api,也欢迎抱怨!还记得吗?我说过,想让 Alice 在交流中的写就,在反馈中完成。