Project Icon

combine-schedulers

Combine框架的多功能调度器扩展库

combine-schedulers为Combine框架提供了多种实用的调度器实现,包括AnyScheduler、TestScheduler和ImmediateScheduler等。这些工具可简化异步代码的测试和调试过程,提升开发效率。该库还支持自定义动画调度、并发API和定时器发布者,为iOS和macOS应用开发提供灵活的调度解决方案。

⏰ Combine调度器

CI

一些调度器,使Combine更易于测试和更加versatile。

动机

Combine框架提供了Scheduler协议,这是一个强大的抽象,用于描述如何以及何时执行工作单元。它统一了许多不同的执行工作的方式,如DispatchQueueRunLoopOperationQueue

但是,一旦在你的响应式代码中使用这些调度器,你就会立即使发布者变为异步,因此更难测试,迫使你使用expectations并等待时间流逝,以便你的发布者执行。

这个库提供了新的调度器,允许你将任何异步发布者转换为同步发布者,以便于测试和调试。

了解更多

这个库是在Point-Free的多个剧集中设计的,这是一个由Brandon WilliamsStephen Celis主持的探索函数式编程和Swift的视频系列。

你可以在这里观看所有剧集。

视频海报图片

AnyScheduler

AnySchedulerScheduler协议提供了一个类型擦除的包装器,这在对多种类型的调度器进行泛型操作时非常有用,而无需在代码中引入泛型。Combine框架提供了许多类型擦除的包装器,如AnySubscriberAnyPublisherAnyCancellable,但出于某种原因没有提供AnyScheduler

当你想从外部自定义某些代码中使用的调度器,但又不想引入泛型来使其可定制时,这种类型非常有用。例如,假设你有一个ObservableObject视图模型,当调用某个方法时执行API请求:

class EpisodeViewModel: ObservableObject {
  @Published var episode: Episode?

  let apiClient: ApiClient

  init(apiClient: ApiClient) {
    self.apiClient = apiClient
  }

  func reloadButtonTapped() {
    self.apiClient.fetchEpisode()
      .receive(on: DispatchQueue.main)
      .assign(to: &self.$episode)
  }
}

注意,我们在reloadButtonTapped方法中使用DispatchQueue.main,因为fetchEpisode端点很可能在后台线程上传递其输出(就像URLSession的情况一样)。

这段代码看起来很简单,但.receive(on: DispatchQueue.main)的存在使得这段代码更难测试,因为你必须使用XCTestexpectations来显式地等待一小段时间让队列执行。这可能导致测试不稳定,并使测试套件的执行时间比必要的更长。

解决这个测试问题的一种方法是使用"immediate"调度器而不是DispatchQueue.main,这将导致fetchEpisode尽快传递其输出,而不会发生线程跳转。为了实现这一点,我们需要将调度器注入到我们的视图模型中,以便从外部控制它:

class EpisodeViewModel<S: Scheduler>: ObservableObject {
  @Published var episode: Episode?

  let apiClient: ApiClient
  let scheduler: S

  init(apiClient: ApiClient, scheduler: S) {
    self.apiClient = apiClient
    self.scheduler = scheduler
  }

  func reloadButtonTapped() {
    self.apiClient.fetchEpisode()
      .receive(on: self.scheduler)
      .assign(to: &self.$episode)
  }
}

现在我们可以在生产环境中使用DispatchQueue.main初始化这个视图模型,在测试中使用DispatchQueue.immediate初始化它。听起来是个胜利!

然而,在我们的视图模型中引入这个泛型是相当重量级的,因为它向外界大声宣布这个类型使用了调度器,更糟糕的是,它将影响任何接触这个视图模型并且也想要可测试的代码。例如,任何使用这个视图模型的视图如果也想控制调度器,就需要引入泛型,这在我们想要编写快照测试时会很有用。

我们可以使用AnyScheduler来允许替换不同的调度器,而不是引入泛型。它允许我们在调度器方面有一定的泛型性,但实际上并不引入泛型。

我们可以说我们只想要一个其关联类型与DispatchQueue匹配的调度器,而不是在视图模型中保持一个泛型调度器:

class EpisodeViewModel: ObservableObject {
  @Published var episode: Episode?

  let apiClient: ApiClient
  let scheduler: AnySchedulerOf<DispatchQueue>

  init(apiClient: ApiClient, scheduler: AnySchedulerOf<DispatchQueue>) {
    self.apiClient = apiClient
    self.scheduler = scheduler
  }

  func reloadButtonTapped() {
    self.apiClient.fetchEpisode()
      .receive(on: self.scheduler)
      .assign(to: &self.$episode)
  }
}

然后,在生产环境中,我们可以创建一个使用实时DispatchQueue的视图模型,但我们只需要先擦除其类型:

let viewModel = EpisodeViewModel(
  apiClient: ...,
  scheduler: DispatchQueue.main.eraseToAnyScheduler()
)

对于常见的调度器,如DispatchQueueOperationQueueRunLoopAnyScheduler上甚至有一个静态辅助方法,进一步简化了这一点:

let viewModel = EpisodeViewModel(
  apiClient: ...,
  scheduler: .main
)

然后在测试中我们可以使用immediate调度器:

let viewModel = EpisodeViewModel(
  apiClient: ...,
  scheduler: .immediate
)

因此,总的来说,AnyScheduler非常适合允许控制在类、函数等中使用的调度器,而无需引入泛型,这可以帮助简化代码并减少实现细节的泄露。

TestScheduler

一个可以以确定性方式控制其当前时间和执行的调度器。这个调度器对于测试时间流如何影响使用异步操作符的发布者非常有用,例如debouncethrottledelaytimeoutreceive(on:)subscribe(on:)等。

例如,考虑以下race操作符,它并行运行两个futures,但只发出第一个完成的:

func race<Output, Failure: Error>(
  _ first: Future<Output, Failure>,
  _ second: Future<Output, Failure>
) -> AnyPublisher<Output, Failure> {
  first
    .merge(with: second)
    .prefix(1)
    .eraseToAnyPublisher()
}

尽管这个发布者相当简单,我们可能仍然想为它编写一些测试。

为此,我们可以创建一个测试调度器,并创建两个futures,一个在一秒后发出,一个在两秒后发出:

let scheduler = DispatchQueue.test

let first = Future<Int, Never> { callback in
  scheduler.schedule(after: scheduler.now.advanced(by: 1)) { callback(.success(1)) }
}
let second = Future<Int, Never> { callback in
  scheduler.schedule(after: scheduler.now.advanced(by: 2)) { callback(.success(2)) }
}

然后我们可以race这些futures并将它们的发出值收集到一个数组中:

var output: [Int] = []
let cancellable = race(first, second).sink { output.append($0) }

然后我们可以确定性地在调度器中向前移动时间,看看发布者如何发出。我们可以从向前移动一秒开始:

scheduler.advance(by: 1)
XCTAssertEqual(output, [1])

这证明我们从发布者那里得到了第一次发出,因为已经过了一秒钟。如果我们再向前推进一秒,我们可以证明我们不会得到更多的发出:

scheduler.advance(by: 1)
XCTAssertEqual(output, [1])

这是一个非常简单的例子,展示了如何使用测试调度器控制时间流,但这种技术可以用于测试任何涉及Combine异步操作的发布者。

ImmediateScheduler

Combine框架自带一个ImmediateScheduler类型,但它为SchedulerTimeTypeSchedulerOptions的关联类型定义了所有新类型。这意味着你不能轻易地在实时DispatchQueue和同步执行工作的"immediate"DispatchQueue之间切换。唯一的方法是为使用该调度器的任何代码引入泛型,这可能会变得笨拙。

因此,这个库的ImmediateScheduler使用与现有调度器相同的关联类型,这意味着你可以使用DispatchQueue.immediate来获得一个看起来像调度队列但立即执行其工作的调度器。同样,你可以构造RunLoop.immediateOperationQueue.immediate

这个调度器对于编写针对使用异步操作符的发布者的测试很有用,比如receive(on:)subscribe(on:)等,因为它强制发布者立即发出,而不需要使用XCTestExpectation等待线程跳转或延迟。

这个调度器与TestScheduler的不同之处在于你不能显式控制时间如何流过你的发布者,而是立即将时间折叠成一个点。

作为一个基本例子,假设你有一个视图模型,在按钮被点击后等待10秒钟才加载一些数据:

class HomeViewModel: ObservableObject {
  @Published var episodes: [Episode]?

  let apiClient: ApiClient

  init(apiClient: ApiClient) {
    self.apiClient = apiClient
  }

  func reloadButtonTapped() {
    Just(())
      .delay(for: .seconds(10), scheduler: DispatchQueue.main)
      .flatMap { apiClient.fetchEpisodes() }
      .assign(to: &self.$episodes)
  }
}

为了测试这段代码,你真的需要等待10秒钟才能让发布者发出:

func testViewModel() {
  let viewModel = HomeViewModel(apiClient: .mock)

  viewModel.reloadButtonTapped()

  _ = XCTWaiter.wait(for: [XCTestExpectation()], timeout: 10)

  XCTAssert(viewModel.episodes, [Episode(id: 42)])
}

另外,我们可以显式地将调度器传递到视图模型初始化器中,以便从外部控制:

class HomeViewModel: ObservableObject {
  @Published var episodes: [Episode]?

  let apiClient: ApiClient
  let scheduler: AnySchedulerOf<DispatchQueue>

  init(apiClient: ApiClient, scheduler: AnySchedulerOf<DispatchQueue>) {
    self.apiClient = apiClient
    self.scheduler = scheduler
  }

  func reloadButtonTapped() {
    Just(())
      .delay(for: .seconds(10), scheduler: self.scheduler)
      .flatMap { self.apiClient.fetchEpisodes() }
      .assign(to: &self.$episodes)
  }
}

然后在测试中使用immediate调度器:

func testViewModel() {
  let viewModel = HomeViewModel(
    apiClient: .mock,
    scheduler: .immediate
  )

  viewModel.reloadButtonTapped()

  // 不再需要等待...

  XCTAssert(viewModel.episodes, [Episode(id: 42)])
}

动画调度器

CombineSchedulers提供了一些辅助工具,用于在SwiftUI和UIKit中进行异步动画。

如果SwiftUI状态 例如,要在视图模型中为 API 响应添加动画效果,你可以指定接收此状态的调度器应该使用动画:

self.apiClient.fetchEpisode()
  .receive(on: self.scheduler.animation())
  .assign(to: &self.$episode)

如果你正在使用 Combine 为 UIKit 功能提供支持,你可以使用 .animate 方法,它与 UIView.animate 类似:

self.apiClient.fetchEpisode()
  .receive(on: self.scheduler.animate(withDuration: 0.3))
  .assign(to: &self.$episode)

UnimplementedScheduler

一个在使用时会导致测试失败的调度器。

这个调度器可以提供额外的确定性,确保被测试的代码路径不需要使用调度器。

随着视图模型变得更加复杂,只有部分逻辑可能需要调度器。在为不需要调度器的逻辑编写单元测试时,应该提供一个未实现的调度器。这直接在测试中记录了该功能不使用调度器。如果它使用了,或将来使用了,测试将会失败。

例如,以下视图模型有几个职责:

class EpisodeViewModel: ObservableObject {
  @Published var episode: Episode?

  let apiClient: ApiClient
  let mainQueue: AnySchedulerOf<DispatchQueue>

  init(apiClient: ApiClient, mainQueue: AnySchedulerOf<DispatchQueue>) {
    self.apiClient = apiClient
    self.mainQueue = mainQueue
  }

  func reloadButtonTapped() {
    self.apiClient.fetchEpisode()
      .receive(on: self.mainQueue)
      .assign(to: &self.$episode)
  }

  func favoriteButtonTapped() {
    self.episode?.isFavorite.toggle()
  }
}
  • 它允许用户点击按钮刷新一些剧集数据
  • 它允许用户切换剧集是否为他们的收藏

API 客户端在后台队列上传递剧集,所以视图模型必须在主队列上接收它,然后才能修改其状态。

然而,点击收藏按钮不涉及调度。这意味着可以使用未实现的调度器编写测试:

func testFavoriteButton() {
  let viewModel = EpisodeViewModel(
    apiClient: .mock,
    mainQueue: .unimplemented
  )
  viewModel.episode = .mock

  viewModel.favoriteButtonTapped()
  XCTAssert(viewModel.episode?.isFavorite == true)

  viewModel.favoriteButtonTapped()
  XCTAssert(viewModel.episode?.isFavorite == false)
}

使用 .unimplemented,这个测试强烈声明收藏一个剧集不需要调度器来完成任务,这意味着可以合理地假设该功能很简单,不涉及任何异步操作。

将来,如果收藏一个剧集需要发送涉及调度器的 API 请求,这个测试将开始失败,这是好事!这将迫使我们解决引入的复杂性。如果我们使用任何其他调度器,它会静默地接收这些额外的工作,测试将继续通过。

UIScheduler

一个尽快在主队列上执行其工作的调度器。这个调度器的灵感来自 ReactiveSwift 项目中的等效调度器。

如果从主线程调用 UIScheduler.shared.schedule,则工作单元将立即执行。这与 DispatchQueue.main.schedule 形成对比,后者在执行之前会产生线程跳转,因为它在底层使用 DispatchQueue.main.async

这个调度器对于需要在主线程上尽快执行工作,且线程跳转会产生问题的情况很有用,例如执行动画时。

并发 API

这个库为与 Combine 调度器交互提供了 async 友好的 API。

// 暂停当前任务 1 秒
try await scheduler.sleep(for: .seconds(1))

// 每 1 秒执行一次工作
for await instant in scheduler.timer(interval: .seconds(1)) {
  ...
}

Publishers.Timer

一个在重复间隔发射调度器当前时间的发布者。

这个发布者是 Foundation 的 Timer.publisher 的替代品,其主要区别在于它允许你为计时器使用任何调度器,而不仅仅是 RunLoop。这很有用,因为 RunLoop 调度器在测试方面不可测试,如果你想针对使用 Timer.publisher 的发布者编写测试,你必须显式等待时间流逝才能得到发射。这可能导致脆弱的测试,并大大增加测试执行的时间。

它的使用方式与 Foundation 的计时器类似,只是你指定一个调度器而不是运行循环:

Publishers.Timer(every: .seconds(1), scheduler: DispatchQueue.main)
  .autoconnect()
  .sink { print("Timer", $0) }

或者,你可以在调度器上调用 timerPublisher 方法,以在该调度器上派生一个重复计时器:

DispatchQueue.main.timerPublisher(every: .seconds(1))
  .autoconnect()
  .sink { print("Timer", $0) }

但这个计时器最好的部分是你可以将它与 TestScheduler 一起使用,这样你编写的任何涉及计时器的 Combine 代码都变得更可测试。这显示了我们如何轻松模拟在计时器中前进 1,000 秒的想法:

let scheduler = DispatchQueue.test
var output: [Int] = []

Publishers.Timer(every: 1, scheduler: scheduler)
  .autoconnect()
  .sink { _ in output.append(output.count) }
  .store(in: &self.cancellables)

XCTAssertEqual(output, [])

scheduler.advance(by: 1)
XCTAssertEqual(output, [0])

scheduler.advance(by: 1)
XCTAssertEqual(output, [0, 1])

scheduler.advance(by: 1_000)
XCTAssertEqual(output, Array(0...1_001))

兼容性

此库兼容 iOS 13.2 及更高版本。请注意,Combine 框架和 iOS 13.1 及更低版本中存在一些 bug,在尝试比较 DispatchQueue.SchedulerTimeType 值时会导致崩溃,而 TestScheduler 依赖于这个操作。

安装

你可以通过添加包依赖将 CombineSchedulers 添加到 Xcode 项目中。

  1. 文件菜单中,选择 Swift Packages › 添加包依赖...
  2. 在包仓库 URL 文本字段中输入 "https://github.com/pointfreeco/combine-schedulers"
  3. 根据你的项目结构:
    • 如果你有一个需要访问库的单一应用目标,那么直接将 CombineSchedulers 添加到你的应用程序中。
    • 如果你想从多个目标使用这个库,你必须创建一个依赖于 CombineSchedulers 的共享框架,然后从你的其他目标依赖于该框架。

文档

Combine Schedulers API 的最新文档可在这里获得。

其他库

许可证

该库在 MIT 许可下发布。详情请见 LICENSE

项目侧边栏1项目侧边栏2
推荐项目
Project Cover

豆包MarsCode

豆包 MarsCode 是一款革命性的编程助手,通过AI技术提供代码补全、单测生成、代码解释和智能问答等功能,支持100+编程语言,与主流编辑器无缝集成,显著提升开发效率和代码质量。

Project Cover

AI写歌

Suno AI是一个革命性的AI音乐创作平台,能在短短30秒内帮助用户创作出一首完整的歌曲。无论是寻找创作灵感还是需要快速制作音乐,Suno AI都是音乐爱好者和专业人士的理想选择。

Project Cover

白日梦AI

白日梦AI提供专注于AI视频生成的多样化功能,包括文生视频、动态画面和形象生成等,帮助用户快速上手,创造专业级内容。

Project Cover

有言AI

有言平台提供一站式AIGC视频创作解决方案,通过智能技术简化视频制作流程。无论是企业宣传还是个人分享,有言都能帮助用户快速、轻松地制作出专业级别的视频内容。

Project Cover

Kimi

Kimi AI助手提供多语言对话支持,能够阅读和理解用户上传的文件内容,解析网页信息,并结合搜索结果为用户提供详尽的答案。无论是日常咨询还是专业问题,Kimi都能以友好、专业的方式提供帮助。

Project Cover

讯飞绘镜

讯飞绘镜是一个支持从创意到完整视频创作的智能平台,用户可以快速生成视频素材并创作独特的音乐视频和故事。平台提供多样化的主题和精选作品,帮助用户探索创意灵感。

Project Cover

讯飞文书

讯飞文书依托讯飞星火大模型,为文书写作者提供从素材筹备到稿件撰写及审稿的全程支持。通过录音智记和以稿写稿等功能,满足事务性工作的高频需求,帮助撰稿人节省精力,提高效率,优化工作与生活。

Project Cover

阿里绘蛙

绘蛙是阿里巴巴集团推出的革命性AI电商营销平台。利用尖端人工智能技术,为商家提供一键生成商品图和营销文案的服务,显著提升内容创作效率和营销效果。适用于淘宝、天猫等电商平台,让商品第一时间被种草。

Project Cover

AIWritePaper论文写作

AIWritePaper论文写作是一站式AI论文写作辅助工具,简化了选题、文献检索至论文撰写的整个过程。通过简单设定,平台可快速生成高质量论文大纲和全文,配合图表、参考文献等一应俱全,同时提供开题报告和答辩PPT等增值服务,保障数据安全,有效提升写作效率和论文质量。

投诉举报邮箱: service@vectorlightyear.com
@2024 懂AI·鲁ICP备2024100362号-6·鲁公网安备37021002001498号