Subject 在 RxSwift 中是一种特殊类型的 Observable 对象,因为它同时也遵循 Observer 协议,所以当使用的时候它既可以作为 Observable 对象发射元素(emit element),也可以作为 Observer 接收事件(on event)。RxSwift 中主要有以下四种 Subject:

  • PublishSubject
  • BehaviorSubject
  • ReplaySubject
  • Variable

这篇我们先来探讨 PublishSubject。

PublishSubject

let disposeBag = DisposeBag()
let ps = PublishSubject<String>()
ps.subscribe { (event) in
    print(event.element ?? "nothing")
}.disposed(by: disposeBag)
        
ps.onNext("hello world")

正如 PublishSubject 这个名字暗示的,它作为 Observer 时接收到一个事件:onNext("hello world"),同时它将这个事件发射出去(publish)。我们来调试看看上面这段代码是怎么运行的。

略过 disposeBag 和 ps 的创建,看看 subscribe 方法:

extension ObservableType {
    
    ......
    
    public func subscribe(_ on: @escaping (Event<E>) -> Void)
        -> Disposable {
            let observer = AnonymousObserver { e in
                on(e)
            }
            return self.asObservable().subscribe(observer)
    }
    
    ......
    
}

也就是说在 subscribe 方法内部,它创建了一个匿名的 AnonymousObserver 对象 observer,并将它作为参数继续调用 self.asObservable().subscribe 方法。

public func asObservable() -> Observable<E> {
    return self
}

正如我们知道的,PublishSubject 也是一个 Observable 对象,asObservable 方法直接返回了自己。接着看 subscribe 方法:

public override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
    _lock.lock()
    let subscription = _synchronized_subscribe(observer)
    _lock.unlock()
    return subscription
}

func _synchronized_subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == E {
    
    ......
        
    let key = _observers.insert(observer.on)
    return SubscriptionDisposable(owner: self, key: key)
}

直接看方法关键的一句:let key = _observers.insert(observer.on)。_observers 是一个 Bag<(Event<Element>) -> ()> 结构体类型对象,看一下它的 insert 方法:

mutating func insert(_ element: T) -> BagKey {
    let key = _nextKey

    _nextKey = BagKey(rawValue: _nextKey.rawValue &+ 1)

    if _key0 == nil {
        _key0 = key
        _value0 = element
        return key
    }

    ......
    
}

由于我们是第一次调用 ps 的 subscribe 方法,因此对这个 Bag 对象来说,_key0 为 nil,因此方法直接设置 _key0_value0 的值并返回。注意到这里是将一个方法(funtion)设置为 _value0。由于在实际使用中一个 Observable 对象大多数情况下仅有一个 subscriber,因此这里用 _key0_value0 来提高代码的执行效率。当然,如果 subscriber 数量多于一个,就会用到字典来存储,这里暂且不表。

subscribe 方法主要就是这么运行的。再来看 .onNext 方法:

extension ObserverType {
    public func onNext(_ element: E) {
        on(.next(element))
    }
}

public func on(_ event: Event<Element>) {
    #if DEBUG
        _synchronizationTracker.register(synchronizationErrorMessage: .default)
        defer { _synchronizationTracker.unregister() }
    #endif
    dispatch(_synchronized_on(event), event)
}

func _synchronized_on(_ event: Event<E>) -> Observers {
    _lock.lock(); defer { _lock.unlock() }
    switch event {
    case .next(_):
        if _isDisposed || _stopped {
            return Observers()
        }
            
        return _observers
    
    ......
    
}
    
func dispatch<E>(_ bag: Bag<(Event<E>) -> ()>, _ event: Event<E>) {
    if bag._onlyFastPath {
        bag._value0?(event)
        return
    }
    
    ......
    
}

这里的代码很简单:根据 .next 事件获取到 _observers,也就是一个 Bag 结构体对象,在 dispatch 方法里,执行 _value0 方法。

分析到这里,对于 PublishSubject 的运行原理可以说是很清晰了,因此也知道了为什么 PublishSubject 必须得先 subscribe 才能接收到 onNext 事件。