RxSwift-调度者

Schedulers(调度者)**是Rx**实现多线程的核心模块,它主要用于控制任务在哪个线程或队列运行。

  • MainScheduler: 代表主线程
  • SerialDispatchQueueScheduler: 抽象了串行DispatchQueue,执行串行任务使用此scheduler。
  • ConcurrentDispatchQueueScheduler:**抽象了并行**DispatchQueue,执行并行任务可以使用此scheduler。
  • **OperationQueueScheduler:**抽象了NSOperationQueue 常见用法
    Observable.of(1,2,3,4)
    .subscribe(on: ConcurrentDispatchQueueScheduler(qos: .userInitiated))
    .observe(on: MainScheduler.instance)
    .subscribe(onNext: { data in
    print(data)
    })
    .disposed(by: disposeBag)
  1. subscribe(on:):决定数据序列的构建函数在哪个 Scheduler 上运行。
  2. observe(on:):表示在哪个Scheduler上监听并执行响应处理。

subscribe(on:)流程分析

subscribe(on:)是怎么让序列构建函数在指定的Scheduler上运行的呢?我们先从方法源码入手

public func subscribe(on scheduler: ImmediateSchedulerType)
-> Observable<Element> {
SubscribeOn(source: self, scheduler: scheduler)
}

SubscribeOnProducer的子类,可以发现,对序列的处理,都是继承Producer,进行相应的逻辑处理,再创建相应的Sink,这样实现灵活扩展。

final private class SubscribeOn<Ob: ObservableType>: Producer<Ob.Element> {
let source: Ob
let scheduler: ImmediateSchedulerType

init(source: Ob, scheduler: ImmediateSchedulerType) {
self.source = source
self.scheduler = scheduler
}

override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Ob.Element {
let sink = SubscribeOnSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}

source保存源序列,scheduler存储任务需要在哪执行的队列。最终的逻辑我们要看sink的run方法

// SubscribeOnSink
func run() -> Disposable {
let disposeEverything = SerialDisposable()
let cancelSchedule = SingleAssignmentDisposable()

disposeEverything.disposable = cancelSchedule

let disposeSchedule = self.parent.scheduler.schedule(()) { _ -> Disposable in
let subscription = self.parent.source.subscribe(self)
disposeEverything.disposable = ScheduledDisposable(scheduler: self.parent.scheduler, disposable: subscription)
return Disposables.create()
}

cancelSchedule.setDisposable(disposeSchedule)

return disposeEverything
}
  • 调用了SubscribeOn保存的schedulerschedule方法,传入闭包回调。
  • schedule方法是ImmediateSchedulerType的协议方法,我们创建的是ConcurrentDispatchQueueScheduler,它遵循ImmediateSchedulerType协议,对应的schedule方法实现如下。
// ConcurrentDispatchQueueScheduler的schedule实现
public final func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
// 调用了DispatchQueueConfiguration的schedule
self.configuration.schedule(state, action: action)
}

// DispatchQueueConfiguration的schedule实现
func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
let cancel = SingleAssignmentDisposable()

// self.queue是我们传值进来的队列,异步执行
self.queue.async {
if cancel.isDisposed {
return
}
// 执行action闭包
cancel.setDisposable(action(state))
}

return cancel
}
  • 最终调用的是DispatchQueueConfiguration的schedule实现
  • 根据传值进来的队列,异步执行,在异步执行函数里面执行schedule的闭包,也就是SubscribeOnSink.run()方法中的红框部分。
  • 闭包里面执行了源序列的subscribe方法,这样就实现了subscribe在我们指定的schedule中执行。

observe(on:)流程分析

observe(on:)的思路和subscribe(on:)类似,只不过subscribe(on:)是在schedule中执行,observe(on:)是把执行的结果返回到schedule中。

// ObserveOnSink
override func onCore(_ event: Event<Element>) {
let shouldStart = self._lock.calculateLocked { () -> Bool in
self._queue.enqueue(event)

switch self._state {
case .stopped:
self._state = .running
return true
case .running:
return false
}
}

if shouldStart {
self._scheduleDisposable.disposable = self._scheduler.scheduleRecursive((), action: self.run)
}
}

ObserveOnSink通过重写onCore(_:)方法,在方法中调用ImmediateSchedulerType协议的scheduleRecursive(_:action:)方法

// ImmediateSchedulerType
public func scheduleRecursive<State>(_ state: State, action: @escaping (_ state: State, _ recurse: (State) -> Void) -> Void) -> Disposable {
let recursiveScheduler = RecursiveImmediateScheduler(action: action, scheduler: self)

recursiveScheduler.schedule(state)

return Disposables.create(with: recursiveScheduler.dispose)
}

// RecursiveImmediateScheduler
func schedule(_ state: State) {
var scheduleState: ScheduleState = .initial
// self._scheduler保存的是ConcurrentDispatchQueueScheduler
let d = self._scheduler.schedule(state) { state -> Disposable in
// best effort
if self._group.isDisposed {
return Disposables.create()
}

let action = self._lock.calculateLocked { () -> Action? in
switch scheduleState {
case let .added(removeKey):
self._group.remove(for: removeKey)
case .initial:
break
case .done:
break
}

scheduleState = .done

return self._action
}
// 此处action回调是在指定的_scheduler中执行
if let action = action {
action(state, self.schedule)
}

return Disposables.create()
}
...
}

// ConcurrentDispatchQueueScheduler
public final func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
return self.configuration.schedule(state, action: action)
}
  • scheduleRecursive(_:action:)->RecursiveImmediateScheduler.schedule(_:)->ConcurrentDispatchQueueScheduler.schedule(_:action:)->configuration.schedule,通过此调用链,可以看出和subscribe(on:)一样,最后都是调用的configuration.schedule
  • action回调是在schedule闭包中,实现了在指定schedule进行监听。