signed

QiShunwang

“诚信为本、客户至上”

RxHttp---实现并行和串行、多串行(一)

2021/4/26 21:08:58   来源:

RxHttp实现并行和串行、多串行(一)

在开发中,一个页面往往有多个请求,有时会需要串行和并行。使用传统的方法,比较复杂。本文使用RxHttp请求框架作为演示案例。

并行

现在很多页面是这样的,上面是Banner条,Banner条下面是数据列表(假设是学生列表),这样就涉及到两个接口,一个是获取Banner条数据,另一是获取学生列表数据,这两个接口没有任何关系,所以我们可以并行去实现

//Banner 的Observable对象                                                
Observable<Banner> bannerObservable = RxHttp.get("http://...")        
    .asObject(Banner.class);                                          
                                                                      
//学生的Observable对象                                                     
Observable<List<Student>> studentObservable = RxHttp.get("http://...")
    .asList(Student.class);            
                                                                      
//这里使用RxJava组合符中的merge操作符,将两个被观察者合并为一个                                
Observable.merge(bannerObservable, studentObservable)                 
    .as(RxLife.asOnMain(this)) //感知生命周期,自动关闭请求                        
    .subscribe(o -> {                                                 
        //请求成功,回调2次,一次是Banner数据,一次Student列表                   
        if (o instanceof Banner) {                                    
            //获取到banner数据                                             
        } else if (o instanceof List) {                               
            //获取到学生列表数据                                               
        }                                                             
    }, throwable -> {                                                 
        //出现异常                                                        
    }, () -> {                                                        
        //2个请求执行完毕,开始更新UI                                             
    });   

可以看到,我们首先通过RxHttp类拿到Banner和Student的两个Observable对象,然后通过merge操作符,将两个Observable对象合并为一个,并订阅观察者,这样就能在onNext回调中拿到Banner和Student数据,并在onComplete回调中更新UI。
可是,这样就完了吗?熟悉RxJava的同学应该知道,RxJava在出现异常后并且回调到onError接口时,就会停止工作,那么如果Banner接口先出现异常,岂不是收不到Student信息了?是的,那么,我们应该如何去处理呢,其实很简单,RxJava为我们提供了异常捕获操作符,如:onErrorResumeNext和onErrorReturn,作用就是出现异常了,我们如何去补救它。在这,我们使用onErrorResumeNext操作符,代码如下

//Banner 的Observable对象                                                
Observable<Banner> bannerObservable = RxHttp.get("http://...")        
    .asObject(Banner.class)
    .onErrorResumeNext(Observable.empty()); //出现异常,发送一个空的Observable对象                                         
                                                                      
//学生的Observable对象                                                     
Observable<List<Student>> studentObservable = RxHttp.get("http://...")
    .asList(Student.class);            
                                                                      
//这里使用RxJava组合符中的merge操作符,将两个被观察者合并为一个                                
Observable.merge(bannerObservable, studentObservable)                 
    .as(RxLife.asOnMain(this)) //感知生命周期,自动关闭请求                        
    .subscribe(o -> {                                                 
        //请求成功,回调2次,一次是Banner数据,一次Student列表                   
        if (o instanceof Banner) {                                    
            //获取到banner数据                                             
        } else if (o instanceof List) {                               
            //获取到学生列表数据                                               
        }                                                             
    }, throwable -> {                                                 
        //出现异常                                                        
    }, () -> {                                                        
        //2个请求执行完毕,开始更新UI                                             
    });

上面我们只加了onErrorResumeNext(Observable.empty())这一行代码,Observable.empty()是一个不会发射任何事件的Observable对象。所以,这个时候如果Banner的Observable出现异常,就不会发射任何事件,Student 的Observable对象便可继续执行,只是在onNext回调中,就只能收到一次Student的回调(请求成功的话),并且随后执行onComplete回调更新UI,这样就能保证即使Banner接口出错了,我们依然可以正常现实学生列表数据。说的抽象一点就是保证A接口不影响B接口,但是B可以影响A接口,如果要保证A、B两个接口互不影响,分别对A、B接口处理异常即可,如果有3个、4个甚至更多的请求,可以使用Observable.mergeArray操作符。

串行

最近一个项目中,使用到了RxHttp框架请求的串行方法,需要获取token后,再登录。这时候使用了flaMap操作符进行实现。

  RxHttp.get(Url.baseUrl+Url.GET_PRIVATE_KEY)
                .setAssemblyEnabled(false)
                .asString()
                .flatMap(new Function<String , ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(String s) throws Throwable {
                                 //获取token后代码逻辑    
                             }

                         }
                        return  RxHttp.postJson(Url.LOGIN_APP)
                                .add("methodName", "login")
                                .setAssemblyEnabled(false)
                                .subscribeOnCurrent()
                                .asString();

                    }
                })
                .to(RxLife.toMain(this))
                .subscribe(s -> {     
              //login请求后代码逻辑
               }
                }, (OnError) error -> {
                    LogUtils.e(error.getErrorMsg());
                });

注:

RxHttp中的asXXX系列方法,内部会默认开启IO线程执行Http请求,所以我们在发送单个请求时,无需指定请求执行线程;然而在多个请求串行时,为提升效率,我们希望一个线程可以执行多个请求,故我们需要使用subscribeOnCurrent方法指定请求在当前线程执行。