Alice 4: Future 的操作符

操作符是 Future 的关键,好的操作符设计能让异步任务的调度如虎添翼,比如链式调用:

let future = asyncTask()

future
    .flatMap {
        anotherAsyncTask()
    }
    .validate {
        $0.isValid
    }
    .timeout(3)
    .wait()

我为 Future 实现了很多操作符。

其中有标配的 mapflatMapwhenAnywhenAll 等,几乎所有 future & promise 库都会提供这些实现。有常用的 waittimeoutvalidatereduce 等,这是一些常用扩展,如果没有内置,用户大概率要在未来某天自己实现。还有一些个人趣味的 flatmuteasAnyasVoid,我的 API 哲学的关键字是友好,所以即使是两步变一步,只要该操作符足够友好,我也会加进来,:-D。

map & tryMap & mapError

map 系列的作用是 transform。其中 map 用来 transform successtryMap 用来 try transform successmapError 用来 transform error

getUser()
    .map { 
        $0.name
    }
    .whenComplete {
        //  
    }

它的实现就在合适的地方 transform 一下。

public func map<U>(_ body: @escaping (Success) -> U) -> Future<U, Failure> {
    let p = Promise<U, Failure>()
    self.whenComplete { r in
        switch r {
        case .success(let s):
            p.succeed(body(s))
        case .failure(let e):
            p.fail(e)
        }
    }
    return p.future
}

public func mapError<E>(_ body: @escaping (Error) -> E) -> Future<Success, E> {
    let p = Promise<Success, E>()
    self.whenComplete { r in
        switch r {
        case .success(let s):
            p.succeed(s)
        case .failure(let e):
            p.fail(body(e))
        }
    }
    return p.future
}

flatMap

flatMap 也是 map,它要 body 返回一个新的 future,这是大概是最常见的异步任务组合。

getUser()
    .flatMap {
        getTweets(by: $0.id)
    }
    .whenComplete {
    }

它的实现也不复杂:

public func flatMap<U>(_ body: @escaping (Success) -> Future<U, Failure>) -> Future<U, Failure> {
    let p = Promise<U, Failure>()
    self.whenComplete { r in
        switch r {
        case .success(let s):
            body(s).pipe(to: p)
        case .failure(let e):
            p.fail(e)
        }
    }
    return p.future
}

其中 pipe 的实现只有一行,就是把结果 pipe 到 promise:

public func pipe(to promise: Promise<Success, Failure>) {
    self.whenComplete(promise.complete)
}

validate

validate 用来检查成功值。

getUser()
    .validate({
        $0.isAdmin
    }, whenFail: .notAdmin)

它的 API 设计可能与你的想象有些不同,validate 除了需要一个 (Success) -> Bool 类型的闭包外,还需要一个 () -> Failure 类型的参数,指示 whenFail 应该做什么。

它的实现是对 map 的小小改动——其实很多操作符都是这样。

self.whenComplete { r in
    switch r {
    case .success(let s):
        if body(s) {
            p.succeed(s)
        } else {
            p.fail(whenFail())
        }
    case .failure(let e):
        p.fail(e)
    }
}

wait

接下来是 wait,有时你需要等待异步任务的完成:

let data = getUser().wait()

我使用了 DispatchSemaphore

public func wait() -> Success? {
    let sema = DispatchSemaphore(value: 0)
    self.whenComplete { _ in
        sema.signal()
    }
    sema.wait()
    
    return self.inspectWithoutLock()!.value
}

注意!不要同步等待,会死锁的,想象一下 DispatchQueue.main.sync { }

whenAll* & whenAny*

whenAllwhenAny 顾名思义,一个是全部完成,一个是任一完成。

whenAll 的实现用了 Atom<Int>,当为 0 时,表示所有任务都完成啦。

public static func whenAllComplete<C: Collection>(_ thenables: C) -> Future<[Result<C.Element.Success, C.Element.Failure>], C.Element.Failure> where C.Element: Thenable {
    let p = Promise<[Result<C.Element.Success, C.Element.Failure>], C.Element.Failure>()
    
    let count = Atom(thenables.count)
    
    for t in thenables {
        t.whenComplete { _ in
            if count.sub(1) == 1 {
                p.succeed(thenables.map({ $0.inspectWithoutLock()! }))
            }
        }
    }
    
    return p.future
}

whenAny 简单,因为 Future 的完成本来就是一次性的,所以直接:

public static func whenAnyComplete<S: Sequence>(_ thenables: S) -> Future<S.Element.Success, S.Element.Failure> where S.Element: Thenable {
    let p = Promise<S.Element.Success, S.Element.Failure>()
    for t in thenables {
        t.pipe(to: p)
    }
    return p.future
}

你可能发现了,上述包括 mapflatMapwaitwhen** 在内的所有操作符都挂在了 protocol Thenable 下。

那是因为,我们需要 Thenable 定义这样的方法:

public static func whenAllComplete<T1: Thenable, T2: Thenable>(
    _ thenable1: T1,
    _ thenable2: T2
)
    -> Future<(Result<T1.Success, T1.Failure>, Result<T2.Success, T2.Failure>), T1.Failure>
    where T1.Failure == T2.Failure 

Thenable 其实是一个 Future 的抽象,我们(目前)不计划有别的 Thenable 实现。

yield

yield 是另一个必备需求,即切换线程。在后台线程里读到的图片文件要在主线程里显示:

readImage()
    .yiled(on: DispatchQueue.main)
    .whenComplete {
        self.imageView.image = $0
    }

这里我设计了 protocol Scheduler,它的定义如下:

public protocol Scheduler {
    
    func schedule(_ task: @escaping () -> Void)
}

Async 默认让 DispatchQueue OperationQueue RunLoop 遵守该协议,方便你的 yield

更多

以上便是一些示例,还有非常非常多游泳的操作符,你可以到 Alice 的 GitHub 主页 查看源码实现——

// retry
public static func retry<Success, Failure>(count: Int, task: @escaping () -> Future<Success, Failure>) -> Future<Success, Failure> {
    return self._retry(count: count, task: task)
}

static func _retry<Success, Failure>(count: Int, task: @escaping () -> Future<Success, Failure>) -> Future<Success, Failure> {
    return task().flatMapError {
        if count == 0 {
            return Future<Success, Failure>.failure($0)
        } else {
            return _retry(count: count - 1, task: task)
        }
    }
}

// reduce
public static func reduce<S: Sequence, U: Thenable>(_ thenables: S, initial: U, nextPartial: @escaping (U.Success, S.Element.Success) -> Future<U.Success, U.Failure>) -> Future<U.Success, U.Failure> where S.Element: Thenable, S.Element.Failure == U.Failure {
    return thenables.reduce(initial.toFuture()) { (fu, ft) -> Future<U.Success, U.Failure> in
        return whenAllSucceed(fu, ft).flatMap { (ut) in
            return nextPartial(ut.0, ut.1)
        }
    }
}
    
// flat
public func flat() -> Future<Success.Success, Failure> {
    return self.flatMap { $0.toFuture() }
}

// mute
public func mute() { }

// Very, very many other useful and cute operators...

这些操作符中,有一些是 alias,即别名,比如 any -> whenAnyCompletefinal -> always,在不同的地方可能不同的名字更合适,我一向不拒绝通过扩展让 api 变得更友好的方式。😉

如果你有一些好操作符的点子,欢迎 pr!如果你不喜欢哪些已有的 api,也欢迎抱怨!还记得吗?我说过,想让 Alice 在交流中的写就,在反馈中完成。


#swift #http #future #alice #alice-serial

Comments