Skip to content

非常好的一个课程: 43-RxLifecycle手写实现_哔哩哔哩_bilibiliimage.png

image.png

观察者模式

主要有两个角色,观察者和被观察者, 被观察者会有一个持有观察者的引用的列表,通过遍历这个列表去依次调用观察者的方法,通知观察者做出响应。例如UI控件的点击事件也可以看成是一种观察者模式。 image.png

RxJava中的观察者是一种变种的观察者模式,将被观察者和事件解耦,被观察者是ObservableSource,但是事件是Emitter,最终由它的实现类CreateEmitter通知观察者列表,通过将被观察者和事件解耦,由事件去驱动观察者做出响应。 CreateEmitter持有一个观察者列表,它发送事件时,可以直接调用观察者来对事件做出处理 image.png

  • ObservableSource:被观察者的顶层接口,其中提供一个订阅方法subscribe(),通过这个方法建立被观察者和观察者间的联系,相当于标准观察模式中的addObserver()方法。
  • Observer:观察者接口,提供处理事件的回调方法,以及建立订阅关系时候会默认调用的方法onSubscribe()
  • Observable:被观察者抽象类,虽然并不是具体的被观察者,但是是RxJava框架的入口。继承顶层ObservableSource接口并实现其中的订阅方法subscribe(),但是这个方法并不是先具体和哪一个观察者建立订阅,而是提供一个抽象的方法subscribeActual(),供开发者去实现,让开发指定具体的观察者。
  • Emitter:事件发射器的接口,提供发送事件的方法。被观察者和观察者虽然建立订阅关系了,但是并没有事件,那么事件就有Emitter定义,并且通过一个接口和被观察者建立联系
  • ObservableOnSubscribe:被观察者和事件建立建立联系的接口
  • ObservableXXXX:具体的被观察者实现类,持有ObservableOnSubscribe接口的引用。在subscribeActual方法中调用观察者的订阅方法,并创建具体的发射器实例,最后事件发射器实例和被观察者绑定
  • XXXEmitter:事件发射器的具体实现,持有观察者的引用

image.png

手写观察者模式

ObservableSource

java
public interface ObservableSource<T> {
    /**
     * 相当于标准的观察者模式中的addObserver方法
     * @param observer
     */
    void subscribe(Observer observer);
}

Observer

java
public interface Observer<T> {

    //建立订阅关系时候会默认调用的方法
    void onSubscribe();

    //使用泛型,因为事件的类型是不确定的
    void onNext(T t);

    void onComplete();

    void onError(Throwable throwable);
}

Observable

java
public abstract class Observable<T> implements ObservableSource<T>{

    /**
     * 实现顶层的订阅方法建立订阅关系
     * 但是并不解决一下问题:
     * 具体和哪一个观察者建立订阅?
     * 怎么建立订阅?
     * 而是将这些交给开发人员去做,提供一个抽象方法,保证扩展性
     * @param observer
     */
    @Override
    public void subscribe(Observer observer) {

        //注意这里并没有指定具体和谁建立订阅,而是调用了一个抽象方法
        subscribeActual(observer);
    }

    /**
     * 提供一个抽象方法,供开发者去实现
     * @param observer
     */
    protected abstract void subscribeActual(Observer observer);

    /**
     * create操作符
     */
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source){
        //返回一个具体的被观察者
        return new ObservableCreate<>(source);
    }
}

Emitter

java
public interface Emitter<T> {

    void onNext(T t);

    void onComplete();

    void onError(Throwable throwable);
}

手写create操作符

具体被观察者create操作符

java
public class ObservableCreate<T> extends Observable<T> {

    //持有ObservableOnSubscribe的引用,并构造传入
    final ObservableOnSubscribe<T> source;
    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer observer) {
        //建立订阅的时候调用
        observer.onSubscribe();
        //创建具体的发射器实例,并闯入具体的事件T,和具体的观察者
        CreateEmitter<T> emitter=new CreateEmitter<T>(observer);
        //最好还需要将事件和被观察者绑定在一起
        source.subscribe(emitter);
    }

    /**
     * 具体的事件发射器,实现Emitter接口
     *
     * @param <T>
     */
    static class CreateEmitter<T> implements Emitter<T> {
        //持有观察者的引用,在Emitter接口的方法中调用观察者的对应回调接口
        Observer<T> observer;
        boolean done;

        public CreateEmitter(Observer<T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if(done)return;
            observer.onNext(t);
        }

        @Override
        public void onComplete() {
            if (done)return;
            observer.onComplete();
            done=true;
        }

        @Override
        public void onError(Throwable throwable) {
            if (done)return;
            observer.onError(throwable);
        }
    }
}

Observable

java
public abstract class Observable<T> implements ObservableSource<T>{

    ···

    /**
     * create操作符
     */
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source){
        //返回一个具体的被观察者
        return new ObservableCreate<>(source);
    }
}

具体使用

java
public static void main(String[] args) {
    Observable.create(new ObservableOnSubscribe<Object>() {
        @Override
        public void subscribe(Emitter<Object> emitter) {
            emitter.onNext("111");
            emitter.onNext("222");
            emitter.onNext("333");
        }
    }).subscribe(new Observer() {
        @Override
        public void onSubscribe() {
            System.out.println("onSubscribe");
        }

        @Override
        public void onNext(Object o) {
            System.out.println("onNext"+o.toString());
        }

        @Override
        public void onComplete() {
            System.out.println("onComplete");
        }

        @Override
        public void onError(Throwable throwable) {
            System.out.println("onError");
        }
    });
}

装饰器模式

通过继承实现扩展带来的问题,继承扩展功能会导致子类的数量剧增,并且上层父类的功能变化,会对所有的子类产生影响,有时候这并不是好事。 image.png 通过组合代替继承去扩展功能,这就是装饰者模式。 例如有一个炒粉父类,现在需要扩展功能得到鸡蛋炒粉,使用继承我们创建了一个鸡蛋炒粉的子类,如果需要在鸡蛋炒粉的功能上扩展,我们又要创建一个子类火腿鸡蛋炒粉,这样可能导致一条很深或者很大的继承树。 使用装饰器模式可以解决这个问题,其中具体构件角色和抽象装饰角色拥有共同的父类,并且抽象装饰角色持有共同父类的引用,基于注入的构件去派生子类扩展功,这样继承树的结构就只有两层。 image.png

手写装饰器模式

框架基于create操作符,扩展了很多其他的操作符。并且这些操作符都是一个Observable的子类,所以很容易就理清楚结构 抽象构件角色:Observable 抽象装饰者角色:AbstractObservableWithUpStream 具体构件角色:ObservableCreate 具体装饰者角色:ObservableMap

手写map操作符

抽象装饰者角色

其中source是需要被装饰的对象

java
public abstract class AbstractObservableWithUpStream<T, U> extends Observable<U> {
    //本身也是一个抽象类所以无需实现Observable下的subscribeActual方法
    //并且是去扩展功能,可能会出现类型转换,所以需要指定两类泛型
    //持有被观察者的引用
    protected final ObservableSource source;

    protected AbstractObservableWithUpStream(ObservableSource source) {
        this.source = source;
    }
}

具体构件角色

java
public class ObservableMap<T, U> extends AbstractObservableWithUpStream<T, U> {

    //用于实现类型转换的接口
    Function<T,U> function;

    //这里的source就是需要被装饰的对象
    protected ObservableMap(ObservableSource source,Function<T,U> function) {
        super(source);
        this.function=function;
    }

    //具体的被观察者map,·不再像create一样继承Observable,而是继承抽象装饰类
    //因此Observable相当于标准装饰器模式中的抽象构件
    //而ObservableCreate是具体的构建,接下来的其他被观察者都是基于Create扩展功能得到的
    @Override

    protected void subscribeActual(Observer observer) {
        //并不能直接调用observer,因为需要对事件做一些处理
        //被装饰的对象调用扩展的功能,MapObserver就是在实现扩展功能
        source.subscribe(new MapObserver(observer,function));

    }

    static class MapObserver<T,U> implements Observer<T>{

        //将下游的观察者以及用来做类型转换的接口构造注入
        final Observer<U> downStream;//注意这里的泛型,因为需要发射的是转换后的事件,所以泛型是U
        final Function<T,U> mapper;

        MapObserver(Observer<U> downStream, Function<T, U> mapper) {
            this.downStream = downStream;
            this.mapper = mapper;
        }

        @Override
        public void onSubscribe() {
            //这里不再需要去实现,因为create操作符(被装饰者)完成了订阅
            //但是还是需要主动调用
            downStream.onSubscribe();
        }

        @Override
        public void onNext(T t) {
            //map的核心操作
            //1.首先对时间做转换
            U u=mapper.apply(t);
            //2.然后由下游的观察者去发射事件
            downStream.onNext(u);
        }

        @Override
        public void onComplete() {
            downStream.onComplete();
        }

        @Override
        public void onError(Throwable throwable) {
            downStream.onError(throwable);
        }
    }
}

/**
 * 用于类型转换的接口,将T类型的事件转换为R类型
 * @param <T>
 * @param <R>
 */
public interface Function<T, R> {
    R apply(T t);
}

手写flatMap操作符

具体装饰者

同样是继承抽象装饰者**AbstractObservableWithUpStream**,然后将将需要装饰的对象注入**ObservableSource<T> source**和类型转换的接口实例,不同的是转换接口需要做的是将类型为T的事件转换为一个观察者类型,最后在onNext方法中通过function接口将事件T转换为**ObservableSource<U>**类型,然后再由这个新的观察者去调用onNext方法去发射事件u

java
public class ObservableFlatMap<T,U> extends AbstractObservableWithUpStream<T,U>{

    //这里不同的是,flatMap是将类型为T的事件转换为一个观察者类型
    //所以第二个泛型是ObservableSource<U>
    Function<T,ObservableSource<U>> function;

    protected ObservableFlatMap(ObservableSource<T> source,Function<T,ObservableSource<U>> function) {
        super(source);
        this.function=function;
    }

    //同样的这个source就是上游的被观察者,需要被装饰的对象
    @Override
    protected void subscribeActual(Observer<U> observer) {
        source.subscribe(new MergeObserver(function,observer));
    }


    static class MergeObserver<T,U> implements Observer<T>{

        final Function<T,ObservableSource<U>> mapper;
        final Observer<U> downStream;

        MergeObserver(Function<T, ObservableSource<U>> mapper, Observer<U> downStream) {
            this.mapper = mapper;
            this.downStream = downStream;
        }


        @Override
        public void onSubscribe() {
            downStream.onSubscribe();
        }

        //核心在这里,首先通过mapper将事件t转换为一个被观察者
        //然后由这个,新的被观察者去发射事件
        @Override
        public void onNext(T t) {
            ObservableSource<U> observable=mapper.apply(t);
            observable.subscribe(new Observer<U>() {
                @Override
                public void onSubscribe() {

                }

                @Override
                public void onNext(U u) {
                    downStream.onNext(u);
                }

                @Override
                public void onComplete() {

                }

                @Override
                public void onError(Throwable throwable) {

                }
            });
        }

        @Override
        public void onComplete() {
            downStream.onComplete();
        }

        @Override
        public void onError(Throwable throwable) {
            downStream.onError(throwable);
        }
    }
}