Skip to content

学习大纲

image.png

是否需要学习RxJava

个人看法,编程语言首选kotlin,函数式编程中Flow可以完美的替代掉RxJava来做异步处理和线程切换,所以新项目不要使用RxJava,但是在学习的过程中还是会中一些项目中使用了RxJava,如果想要研究这些项目那么还是需要对RxJava有一定的了解,因此建议是能看懂就可以,以及简单的使用,太深的研究高级使用技巧没有必要。 学习一个技术,或者在一个项目中使用一个技术,要考量:

  1. 同事的掌握程度
  2. 框架的兼容性与维护性
  3. 有没有更好的替代方案
  4. 能不能解决一些实际问题

image.png 首先RxJava的出现是解决一个难题,就是异步线程的切换繁琐,使用RxJava中的操作符能够简化代码。 image.png

RxJava是什么

一个处理异步事件和响应式编程的依赖库image.png

RxJava的优势以及使用场景

image.png

RxJava的三个概念

主要的设计模式:观察者模式 image.png

引入依赖

image.png

groovy
implementation "io.reactivex.rxjava2:rxandroid:$rootProject.ext.rxAndroid"
implementation "io.reactivex.rxjava2:rxjava:$rootProject.ext.rxjava"

操作符

创建操作符

高版本的AS无法运行Java main项目,配置如下即可: https://blog.csdn.net/HXWANHC/article/details/107673318image.pngimage.png

Create操作符的基本使用

java
package com.luchuan.project.smartpen.android;

import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;

/**
 * Package:com.luchuan.project.smartpen.android
 * Author:starr
 * Time:2023/10/5  18:36
 * Description:
 */
public class CreateDemo {

    public static void main(String[] args) {
        CreateDemo demo = new CreateDemo();
        demo.test1();
    }

    public void test1() {
        //Observable是被观察者
        Observable.create(new ObservableOnSubscribe<Object>() {
            //subscribe 是一个通知的方法,用于建立观察者和被观察者之间的订阅关系
            //emitter是一个发射器,负责发射一个个事件
            @Override
            public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
                System.out.println("subscribe...");
                //事件产生的地方
                emitter.onNext("1");
                emitter.onNext(2);
                emitter.onNext("3");
                emitter.onComplete();
            }
        }).subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe...");
            }

            @Override
            public void onNext(Object o) {
                System.out.println("onNext..." + o);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError..." + e.toString());
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete...");
            }
        });
    }
}

在被观察者中通过emitter发送事件和结束事件,最终都会回调观察者的对应的方法,从而实现通知。注意观察者的onSubscribe方法默认会被调用,表示两者的订阅关系建立。 image.png

Consumer-一种特殊的观察者

Consumer只需要实现一个方法,可以根据业务需要传入正常事件的消费者和异常事件的消费者。

java
 public void test1() {
        //Observable是被观察者
        Disposable d = Observable.create(new ObservableOnSubscribe<Object>() {
            //subscribe 是一个通知的方法,用于建立观察者和被观察者之间的订阅关系
            //emitter是一个发射器,负责发射一个个事件
            @Override
            public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
                System.out.println("subscribe...");
                //事件产生的地方
                emitter.onNext("1");
                emitter.onNext(2);
                emitter.onNext("3");
                emitter.onError(new Throwable("手动构造的异常"));
                emitter.onComplete();
            }
            //正常的事件消费者
        }).subscribe(new Consumer<Object>() {
            @Override
            public void accept(Object o) throws Exception {
                System.out.println("accept..." + o);
            }
            //异常事件的消费者
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                System.out.println("accept..." + throwable.toString());
            }
        });
    }

image.png

Just操作符

使用Just操作符相当简洁,每一个T就是一个事件。最多可以一次传入10个事件 image.png

java
 Observable.just("a", "b", "c")
                .subscribe(observer);

image.png

from系列操作符

其中fromArray()就是just操作符的plus版,可以传入无限多个事件,所以使用时可以使用fromArray操作符平替just操作符 image.png fromCallable操作符,通过call回调发射事件 image.png

转换操作符

image.png

map操作符

java
//1.被观察者首先通过just创建一个事件
//2.通过map中的apply方法,将事件转换为新的事件
//3.最后通知给观察者
Observable.just("a")
        .map(new Function<String, Object>() {
            @Override
            public Object apply(String s) throws Exception {
                return "b";
            }
        }).subscribe(observer);

可见发送的事件是a,但是经过map转换符转换为b,最终观察者订阅得到的是b map操作符其实是将发射的事件做转换,最终又返回一个被观察者,将事件发射出去,因此最终观察者得到的是经过map转换后得到的事件。 image.png

flatMap操作符

这个操作符的场景例如,嵌套调用网络请求时,例如先调用一个注册请求,基于注册请求处理得到结果之后,再执行登录请求。 flatMap操作符和map操作符的区别就是flatMap最终是创建了一个新的被观察者去发射事件。

java
public void test2() {
    //1.flatMap负责基于事件产生新的事件
    //2.flatMap的返回值是一个ObservableSource类型,它是Observable的基类
    //所以返回值还是一个观察者
    //
    Observable.just("register")
        .flatMap(new Function<String, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(String s) throws Exception {
                System.out.println(s + "success");
                return Observable.just("request login");
            }
        }).subscribe(observer);
}

image.png

map和flatMap的区别例子

map相当于酒瓶新酒,不能改变类型,flatMap可以完全改变数据类型; 传统的写法,需要拿到学生列表中的课程列表中的每一项,需要两层for循环 image.png 使用map操作符,虽然可以返回学生列表的课程列表,但是在观察者中还是需要去遍历得到每一项具体的课程 image.png 但是使用flatMap就可以完全省略掉自己写遍历的操作,通过apply,我们返回一个新的被观察者,他发送的事件是每一项课程信息,因此在观察者中的accept中拿到的就是每一项课程信息。由此可以看出两者操作符的区别。 image.png

concatMap

concatMap和flatMap的区别就是保证在多线程的场景下能保证事件的发射顺序。有序的发射事件

java
public void test3() {
    Observable.just("1", "2", "3")
        .concatMap(new Function<String, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(String s) throws Exception {
                return Observable.just(s + "");
            }
        }).subscribe(observer);
    }

image.png

buffer操作符

将事件缓存为指定的个数一次性发送给订阅者在(观察者) image.png

组合操作符

image.png

concat操作符

组合多个被观察者,最终返回一个被观察者,所以可以看到最终观察者也只订阅了一个被观察者。

java
//1.将两个被观察者的所有事件组合成为一个被观察者
Observable.concat(Observable.just(1),
        Observable.just(2))
        .subscribe(observer);

image.pngimage.png

功能操作符

image.png

subscribeOn操作符

发送事件默认就是在主线程

java
Observable.create(new ObservableOnSubscribe<Object>() {
    @Override
    public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
        //创建事件发射事件
        System.out.println("subscribe.." + Thread.currentThread());

        emitter.onNext("a");
        emitter.onNext("b");
        emitter.onComplete();
    }
}).subscribe(observer);

image.png 现在使用subscribeOn操作符切换线程 我们使用调度器指定线程,io和newThread都是指定子线程 image.png 如下就将事件切换到了子线程执行

java
Observable.create(new ObservableOnSubscribe<Object>() {
    @Override
    public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
        //创建事件发射事件
        System.out.println("subscribe.." + Thread.currentThread());

        emitter.onNext("a");
        emitter.onNext("b");
        emitter.onComplete();
    }
	//主要来决定我执行subscribe方法所处的线程,也就是产生事件发射事件所在的线程
}).subscribeOn(Schedulers.io())
        .subscribe(observer);

可以看到此时的观察者回调的函数都是在子线程中执行的。 image.png

observeOn操作符

如果需要在执行完事件之后,需要切回主线程,需要使用observeOn操作符。

java
Observable.create(new ObservableOnSubscribe<Object>() {
    @Override
    public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
        //创建事件发射事件
        System.out.println("subscribe.." + Thread.currentThread());

        emitter.onNext("a");
        emitter.onNext("b");
        emitter.onComplete();
    }
    //主要来决定我执行subscribe方法所处的线程,也就是产生事件发射事件所在的线程
}).subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())//切回主线程
    //下游的事件都会在主线程中执行
    .map(new Function<Object, Object>() {
        @Override
        public Object apply(Object o) throws Exception {
            return "b";
        }
    }).observeOn(Schedulers.io())//再切换回子线程
    //下游的事件又会切回子线程执行
    .subscribe(observer);

doOnNext操作符

在next方法执行之前执行 image.png

过滤操作符

image.png

filter操作符

满足filter条件的事件不会被过滤(即test方法返回true时),会往下流 image.png