⏰ Combine调度器
一些调度器,使Combine更易于测试和更加versatile。
动机
Combine框架提供了Scheduler
协议,这是一个强大的抽象,用于描述如何以及何时执行工作单元。它统一了许多不同的执行工作的方式,如DispatchQueue
、RunLoop
和OperationQueue
。
但是,一旦在你的响应式代码中使用这些调度器,你就会立即使发布者变为异步,因此更难测试,迫使你使用expectations并等待时间流逝,以便你的发布者执行。
这个库提供了新的调度器,允许你将任何异步发布者转换为同步发布者,以便于测试和调试。
了解更多
这个库是在Point-Free的多个剧集中设计的,这是一个由Brandon Williams和Stephen Celis主持的探索函数式编程和Swift的视频系列。
你可以在这里观看所有剧集。
AnyScheduler
AnyScheduler
为Scheduler
协议提供了一个类型擦除的包装器,这在对多种类型的调度器进行泛型操作时非常有用,而无需在代码中引入泛型。Combine框架提供了许多类型擦除的包装器,如AnySubscriber
、AnyPublisher
和AnyCancellable
,但出于某种原因没有提供AnyScheduler
。
当你想从外部自定义某些代码中使用的调度器,但又不想引入泛型来使其可定制时,这种类型非常有用。例如,假设你有一个ObservableObject
视图模型,当调用某个方法时执行API请求:
class EpisodeViewModel: ObservableObject {
@Published var episode: Episode?
let apiClient: ApiClient
init(apiClient: ApiClient) {
self.apiClient = apiClient
}
func reloadButtonTapped() {
self.apiClient.fetchEpisode()
.receive(on: DispatchQueue.main)
.assign(to: &self.$episode)
}
}
注意,我们在reloadButtonTapped
方法中使用DispatchQueue.main
,因为fetchEpisode
端点很可能在后台线程上传递其输出(就像URLSession
的情况一样)。
这段代码看起来很简单,但.receive(on: DispatchQueue.main)
的存在使得这段代码更难测试,因为你必须使用XCTest
expectations来显式地等待一小段时间让队列执行。这可能导致测试不稳定,并使测试套件的执行时间比必要的更长。
解决这个测试问题的一种方法是使用"immediate"调度器而不是DispatchQueue.main
,这将导致fetchEpisode
尽快传递其输出,而不会发生线程跳转。为了实现这一点,我们需要将调度器注入到我们的视图模型中,以便从外部控制它:
class EpisodeViewModel<S: Scheduler>: ObservableObject {
@Published var episode: Episode?
let apiClient: ApiClient
let scheduler: S
init(apiClient: ApiClient, scheduler: S) {
self.apiClient = apiClient
self.scheduler = scheduler
}
func reloadButtonTapped() {
self.apiClient.fetchEpisode()
.receive(on: self.scheduler)
.assign(to: &self.$episode)
}
}
现在我们可以在生产环境中使用DispatchQueue.main
初始化这个视图模型,在测试中使用DispatchQueue.immediate
初始化它。听起来是个胜利!
然而,在我们的视图模型中引入这个泛型是相当重量级的,因为它向外界大声宣布这个类型使用了调度器,更糟糕的是,它将影响任何接触这个视图模型并且也想要可测试的代码。例如,任何使用这个视图模型的视图如果也想控制调度器,就需要引入泛型,这在我们想要编写快照测试时会很有用。
我们可以使用AnyScheduler
来允许替换不同的调度器,而不是引入泛型。它允许我们在调度器方面有一定的泛型性,但实际上并不引入泛型。
我们可以说我们只想要一个其关联类型与DispatchQueue
匹配的调度器,而不是在视图模型中保持一个泛型调度器:
class EpisodeViewModel: ObservableObject {
@Published var episode: Episode?
let apiClient: ApiClient
let scheduler: AnySchedulerOf<DispatchQueue>
init(apiClient: ApiClient, scheduler: AnySchedulerOf<DispatchQueue>) {
self.apiClient = apiClient
self.scheduler = scheduler
}
func reloadButtonTapped() {
self.apiClient.fetchEpisode()
.receive(on: self.scheduler)
.assign(to: &self.$episode)
}
}
然后,在生产环境中,我们可以创建一个使用实时DispatchQueue
的视图模型,但我们只需要先擦除其类型:
let viewModel = EpisodeViewModel(
apiClient: ...,
scheduler: DispatchQueue.main.eraseToAnyScheduler()
)
对于常见的调度器,如DispatchQueue
、OperationQueue
和RunLoop
,AnyScheduler
上甚至有一个静态辅助方法,进一步简化了这一点:
let viewModel = EpisodeViewModel(
apiClient: ...,
scheduler: .main
)
然后在测试中我们可以使用immediate调度器:
let viewModel = EpisodeViewModel(
apiClient: ...,
scheduler: .immediate
)
因此,总的来说,AnyScheduler
非常适合允许控制在类、函数等中使用的调度器,而无需引入泛型,这可以帮助简化代码并减少实现细节的泄露。
TestScheduler
一个可以以确定性方式控制其当前时间和执行的调度器。这个调度器对于测试时间流如何影响使用异步操作符的发布者非常有用,例如debounce
、throttle
、delay
、timeout
、receive(on:)
、subscribe(on:)
等。
例如,考虑以下race
操作符,它并行运行两个futures,但只发出第一个完成的:
func race<Output, Failure: Error>(
_ first: Future<Output, Failure>,
_ second: Future<Output, Failure>
) -> AnyPublisher<Output, Failure> {
first
.merge(with: second)
.prefix(1)
.eraseToAnyPublisher()
}
尽管这个发布者相当简单,我们可能仍然想为它编写一些测试。
为此,我们可以创建一个测试调度器,并创建两个futures,一个在一秒后发出,一个在两秒后发出:
let scheduler = DispatchQueue.test
let first = Future<Int, Never> { callback in
scheduler.schedule(after: scheduler.now.advanced(by: 1)) { callback(.success(1)) }
}
let second = Future<Int, Never> { callback in
scheduler.schedule(after: scheduler.now.advanced(by: 2)) { callback(.success(2)) }
}
然后我们可以race这些futures并将它们的发出值收集到一个数组中:
var output: [Int] = []
let cancellable = race(first, second).sink { output.append($0) }
然后我们可以确定性地在调度器中向前移动时间,看看发布者如何发出。我们可以从向前移动一秒开始:
scheduler.advance(by: 1)
XCTAssertEqual(output, [1])
这证明我们从发布者那里得到了第一次发出,因为已经过了一秒钟。如果我们再向前推进一秒,我们可以证明我们不会得到更多的发出:
scheduler.advance(by: 1)
XCTAssertEqual(output, [1])
这是一个非常简单的例子,展示了如何使用测试调度器控制时间流,但这种技术可以用于测试任何涉及Combine异步操作的发布者。
ImmediateScheduler
Combine框架自带一个ImmediateScheduler
类型,但它为SchedulerTimeType
和SchedulerOptions
的关联类型定义了所有新类型。这意味着你不能轻易地在实时DispatchQueue
和同步执行工作的"immediate"DispatchQueue
之间切换。唯一的方法是为使用该调度器的任何代码引入泛型,这可能会变得笨拙。
因此,这个库的ImmediateScheduler
使用与现有调度器相同的关联类型,这意味着你可以使用DispatchQueue.immediate
来获得一个看起来像调度队列但立即执行其工作的调度器。同样,你可以构造RunLoop.immediate
和OperationQueue.immediate
。
这个调度器对于编写针对使用异步操作符的发布者的测试很有用,比如receive(on:)
、subscribe(on:)
等,因为它强制发布者立即发出,而不需要使用XCTestExpectation
等待线程跳转或延迟。
这个调度器与TestScheduler
的不同之处在于你不能显式控制时间如何流过你的发布者,而是立即将时间折叠成一个点。
作为一个基本例子,假设你有一个视图模型,在按钮被点击后等待10秒钟才加载一些数据:
class HomeViewModel: ObservableObject {
@Published var episodes: [Episode]?
let apiClient: ApiClient
init(apiClient: ApiClient) {
self.apiClient = apiClient
}
func reloadButtonTapped() {
Just(())
.delay(for: .seconds(10), scheduler: DispatchQueue.main)
.flatMap { apiClient.fetchEpisodes() }
.assign(to: &self.$episodes)
}
}
为了测试这段代码,你真的需要等待10秒钟才能让发布者发出:
func testViewModel() {
let viewModel = HomeViewModel(apiClient: .mock)
viewModel.reloadButtonTapped()
_ = XCTWaiter.wait(for: [XCTestExpectation()], timeout: 10)
XCTAssert(viewModel.episodes, [Episode(id: 42)])
}
另外,我们可以显式地将调度器传递到视图模型初始化器中,以便从外部控制:
class HomeViewModel: ObservableObject {
@Published var episodes: [Episode]?
let apiClient: ApiClient
let scheduler: AnySchedulerOf<DispatchQueue>
init(apiClient: ApiClient, scheduler: AnySchedulerOf<DispatchQueue>) {
self.apiClient = apiClient
self.scheduler = scheduler
}
func reloadButtonTapped() {
Just(())
.delay(for: .seconds(10), scheduler: self.scheduler)
.flatMap { self.apiClient.fetchEpisodes() }
.assign(to: &self.$episodes)
}
}
然后在测试中使用immediate调度器:
func testViewModel() {
let viewModel = HomeViewModel(
apiClient: .mock,
scheduler: .immediate
)
viewModel.reloadButtonTapped()
// 不再需要等待...
XCTAssert(viewModel.episodes, [Episode(id: 42)])
}
动画调度器
CombineSchedulers提供了一些辅助工具,用于在SwiftUI和UIKit中进行异步动画。
如果SwiftUI状态 例如,要在视图模型中为 API 响应添加动画效果,你可以指定接收此状态的调度器应该使用动画:
self.apiClient.fetchEpisode()
.receive(on: self.scheduler.animation())
.assign(to: &self.$episode)
如果你正在使用 Combine 为 UIKit 功能提供支持,你可以使用 .animate
方法,它与 UIView.animate
类似:
self.apiClient.fetchEpisode()
.receive(on: self.scheduler.animate(withDuration: 0.3))
.assign(to: &self.$episode)
UnimplementedScheduler
一个在使用时会导致测试失败的调度器。
这个调度器可以提供额外的确定性,确保被测试的代码路径不需要使用调度器。
随着视图模型变得更加复杂,只有部分逻辑可能需要调度器。在为不需要调度器的逻辑编写单元测试时,应该提供一个未实现的调度器。这直接在测试中记录了该功能不使用调度器。如果它使用了,或将来使用了,测试将会失败。
例如,以下视图模型有几个职责:
class EpisodeViewModel: ObservableObject {
@Published var episode: Episode?
let apiClient: ApiClient
let mainQueue: AnySchedulerOf<DispatchQueue>
init(apiClient: ApiClient, mainQueue: AnySchedulerOf<DispatchQueue>) {
self.apiClient = apiClient
self.mainQueue = mainQueue
}
func reloadButtonTapped() {
self.apiClient.fetchEpisode()
.receive(on: self.mainQueue)
.assign(to: &self.$episode)
}
func favoriteButtonTapped() {
self.episode?.isFavorite.toggle()
}
}
- 它允许用户点击按钮刷新一些剧集数据
- 它允许用户切换剧集是否为他们的收藏
API 客户端在后台队列上传递剧集,所以视图模型必须在主队列上接收它,然后才能修改其状态。
然而,点击收藏按钮不涉及调度。这意味着可以使用未实现的调度器编写测试:
func testFavoriteButton() {
let viewModel = EpisodeViewModel(
apiClient: .mock,
mainQueue: .unimplemented
)
viewModel.episode = .mock
viewModel.favoriteButtonTapped()
XCTAssert(viewModel.episode?.isFavorite == true)
viewModel.favoriteButtonTapped()
XCTAssert(viewModel.episode?.isFavorite == false)
}
使用 .unimplemented
,这个测试强烈声明收藏一个剧集不需要调度器来完成任务,这意味着可以合理地假设该功能很简单,不涉及任何异步操作。
将来,如果收藏一个剧集需要发送涉及调度器的 API 请求,这个测试将开始失败,这是好事!这将迫使我们解决引入的复杂性。如果我们使用任何其他调度器,它会静默地接收这些额外的工作,测试将继续通过。
UIScheduler
一个尽快在主队列上执行其工作的调度器。这个调度器的灵感来自 ReactiveSwift 项目中的等效调度器。
如果从主线程调用 UIScheduler.shared.schedule
,则工作单元将立即执行。这与 DispatchQueue.main.schedule
形成对比,后者在执行之前会产生线程跳转,因为它在底层使用 DispatchQueue.main.async
。
这个调度器对于需要在主线程上尽快执行工作,且线程跳转会产生问题的情况很有用,例如执行动画时。
并发 API
这个库为与 Combine 调度器交互提供了 async
友好的 API。
// 暂停当前任务 1 秒
try await scheduler.sleep(for: .seconds(1))
// 每 1 秒执行一次工作
for await instant in scheduler.timer(interval: .seconds(1)) {
...
}
Publishers.Timer
一个在重复间隔发射调度器当前时间的发布者。
这个发布者是 Foundation 的 Timer.publisher
的替代品,其主要区别在于它允许你为计时器使用任何调度器,而不仅仅是 RunLoop
。这很有用,因为 RunLoop
调度器在测试方面不可测试,如果你想针对使用 Timer.publisher
的发布者编写测试,你必须显式等待时间流逝才能得到发射。这可能导致脆弱的测试,并大大增加测试执行的时间。
它的使用方式与 Foundation 的计时器类似,只是你指定一个调度器而不是运行循环:
Publishers.Timer(every: .seconds(1), scheduler: DispatchQueue.main)
.autoconnect()
.sink { print("Timer", $0) }
或者,你可以在调度器上调用 timerPublisher
方法,以在该调度器上派生一个重复计时器:
DispatchQueue.main.timerPublisher(every: .seconds(1))
.autoconnect()
.sink { print("Timer", $0) }
但这个计时器最好的部分是你可以将它与 TestScheduler
一起使用,这样你编写的任何涉及计时器的 Combine 代码都变得更可测试。这显示了我们如何轻松模拟在计时器中前进 1,000 秒的想法:
let scheduler = DispatchQueue.test
var output: [Int] = []
Publishers.Timer(every: 1, scheduler: scheduler)
.autoconnect()
.sink { _ in output.append(output.count) }
.store(in: &self.cancellables)
XCTAssertEqual(output, [])
scheduler.advance(by: 1)
XCTAssertEqual(output, [0])
scheduler.advance(by: 1)
XCTAssertEqual(output, [0, 1])
scheduler.advance(by: 1_000)
XCTAssertEqual(output, Array(0...1_001))
兼容性
此库兼容 iOS 13.2 及更高版本。请注意,Combine 框架和 iOS 13.1 及更低版本中存在一些 bug,在尝试比较 DispatchQueue.SchedulerTimeType
值时会导致崩溃,而 TestScheduler
依赖于这个操作。
安装
你可以通过添加包依赖将 CombineSchedulers 添加到 Xcode 项目中。
- 从文件菜单中,选择 Swift Packages › 添加包依赖...
- 在包仓库 URL 文本字段中输入 "https://github.com/pointfreeco/combine-schedulers"
- 根据你的项目结构:
- 如果你有一个需要访问库的单一应用目标,那么直接将 CombineSchedulers 添加到你的应用程序中。
- 如果你想从多个目标使用这个库,你必须创建一个依赖于 CombineSchedulers 的共享框架,然后从你的其他目标依赖于该框架。
文档
Combine Schedulers API 的最新文档可在这里获得。
其他库
许可证
该库在 MIT 许可下发布。详情请见 LICENSE。