rxjava : retry ()를 사용할 수 있지만 지연이 있습니까?
내 Android 앱에서 rxjava를 사용하여 네트워크 요청을 비동기 적으로 처리하고 있습니다. 이제 특정 시간이 지난 후에 만 실패한 네트워크 요청을 다시 시도하고 싶습니다.
Observable에서 retry ()를 사용하지만 특정 지연 후에 만 재 시도하는 방법이 있습니까?
Observable에게 현재 재시도 중임을 알리는 방법이 있습니까 (처음 시도하는 것과 반대)?
debounce () / throttleWithTimeout ()을 보았지만 다른 작업을 수행하는 것 같습니다.
편집하다:
나는 한 가지 방법을 찾았다 고 생각하지만 이것이 올바른 방법인지 다른 더 나은 방법인지 확인하는 데 관심이 있습니다.
내가하는 일은 다음과 같다. 내 Observable.OnSubscribe의 call () 메서드에서 Subscribers onError () 메서드를 호출하기 전에 원하는 시간 동안 스레드를 잠자기 만하면됩니다. 따라서 1000 밀리 초마다 다시 시도하려면 다음과 같이합니다.
@Override
public void call(Subscriber<? super List<ProductNode>> subscriber) {
try {
Log.d(TAG, "trying to load all products with pid: " + pid);
subscriber.onNext(productClient.getProductNodesForParentId(pid));
subscriber.onCompleted();
} catch (Exception e) {
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
e.printStackTrace();
}
subscriber.onError(e);
}
}
이 메서드는 어쨌든 IO 스레드에서 실행되므로 UI를 차단하지 않습니다. 내가 볼 수있는 유일한 문제는 첫 번째 오류조차도 지연으로보고되므로 retry ()가 없어도 지연이 있다는 것입니다. 지연이 적용되지 않은 경우 내가 더 잘하고 싶은 한 후 오류 대신 전 (분명하지만 첫 번째 시도하기 전에) 재 시도.
retryWhen()
연산자를 사용하여 Observable에 재시도 논리를 추가 할 수 있습니다 .
다음 클래스에는 재시도 논리가 포함되어 있습니다.
RxJava 2.x
public class RetryWithDelay implements Function<Observable<? extends Throwable>, Observable<?>> {
private final int maxRetries;
private final int retryDelayMillis;
private int retryCount;
public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
this.maxRetries = maxRetries;
this.retryDelayMillis = retryDelayMillis;
this.retryCount = 0;
}
@Override
public Observable<?> apply(final Observable<? extends Throwable> attempts) {
return attempts
.flatMap(new Function<Throwable, Observable<?>>() {
@Override
public Observable<?> apply(final Throwable throwable) {
if (++retryCount < maxRetries) {
// When this Observable calls onNext, the original
// Observable will be retried (i.e. re-subscribed).
return Observable.timer(retryDelayMillis,
TimeUnit.MILLISECONDS);
}
// Max retries hit. Just pass the error along.
return Observable.error(throwable);
}
});
}
}
RxJava 1.x
public class RetryWithDelay implements
Func1<Observable<? extends Throwable>, Observable<?>> {
private final int maxRetries;
private final int retryDelayMillis;
private int retryCount;
public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
this.maxRetries = maxRetries;
this.retryDelayMillis = retryDelayMillis;
this.retryCount = 0;
}
@Override
public Observable<?> call(Observable<? extends Throwable> attempts) {
return attempts
.flatMap(new Func1<Throwable, Observable<?>>() {
@Override
public Observable<?> call(Throwable throwable) {
if (++retryCount < maxRetries) {
// When this Observable calls onNext, the original
// Observable will be retried (i.e. re-subscribed).
return Observable.timer(retryDelayMillis,
TimeUnit.MILLISECONDS);
}
// Max retries hit. Just pass the error along.
return Observable.error(throwable);
}
});
}
}
용법:
// Add retry logic to existing observable.
// Retry max of 3 times with a delay of 2 seconds.
observable
.retryWhen(new RetryWithDelay(3, 2000));
Paul의 답변 에서 영감을 얻었으며 Abhijit SarkarretryWhen
가 언급 한 문제에 관심 이 없다면 rxJava2로 재가입을 무조건 지연하는 가장 간단한 방법은 다음과 같습니다.
source.retryWhen(throwables -> throwables.delay(1, TimeUnit.SECONDS))
retryWhen 및 repeatWhen 에 대한 더 많은 샘플과 설명을 볼 수 있습니다 .
이것은 내가 본 Ben Christensen의 스 니펫, RetryWhen Example 및 RetryWhenTestsConditional을 기반으로 한 솔루션입니다 ( 작동 하려면 n.getThrowable()
로 변경 해야 n
했습니다). 내가 사용 evant / Gradle을-retrolambda을 안드로이드에 람다 표기법 작업을하는 것이 아니라 (그것은 매우 권장하지만) 당신은 람다를 사용할 필요가 없습니다. 지연을 위해 지수 백 오프를 구현했지만 원하는 백 오프 논리를 연결할 수 있습니다. 완전성을 위해 subscribeOn
및 observeOn
연산자를 추가했습니다 . 내가 사용하고 ReactiveX / RxAndroid을 위해 AndroidSchedulers.mainThread()
.
int ATTEMPT_COUNT = 10;
public class Tuple<X, Y> {
public final X x;
public final Y y;
public Tuple(X x, Y y) {
this.x = x;
this.y = y;
}
}
observable
.subscribeOn(Schedulers.io())
.retryWhen(
attempts -> {
return attempts.zipWith(Observable.range(1, ATTEMPT_COUNT + 1), (n, i) -> new Tuple<Throwable, Integer>(n, i))
.flatMap(
ni -> {
if (ni.y > ATTEMPT_COUNT)
return Observable.error(ni.x);
return Observable.timer((long) Math.pow(2, ni.y), TimeUnit.SECONDS);
});
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
MyRequestObservable.retry를 사용하는 대신 지연에 대한 간접 지정을 처리하는 새 Observable을 반환하는 래퍼 함수 retryObservable (MyRequestObservable, retrycount, seconds)을 사용합니다.
retryObservable(restApi.getObservableStuff(), 3, 30)
.subscribe(new Action1<BonusIndividualList>(){
@Override
public void call(BonusIndividualList arg0)
{
//success!
}
},
new Action1<Throwable>(){
@Override
public void call(Throwable arg0) {
// failed after the 3 retries !
}});
// wrapper code
private static <T> Observable<T> retryObservable(
final Observable<T> requestObservable, final int nbRetry,
final long seconds) {
return Observable.create(new Observable.OnSubscribe<T>() {
@Override
public void call(final Subscriber<? super T> subscriber) {
requestObservable.subscribe(new Action1<T>() {
@Override
public void call(T arg0) {
subscriber.onNext(arg0);
subscriber.onCompleted();
}
},
new Action1<Throwable>() {
@Override
public void call(Throwable error) {
if (nbRetry > 0) {
Observable.just(requestObservable)
.delay(seconds, TimeUnit.SECONDS)
.observeOn(mainThread())
.subscribe(new Action1<Observable<T>>(){
@Override
public void call(Observable<T> observable){
retryObservable(observable,
nbRetry - 1, seconds)
.subscribe(subscriber);
}
});
} else {
// still fail after retries
subscriber.onError(error);
}
}
});
}
});
}
이 예제는 jxjava 2.2.2에서 작동합니다.
지체없이 재시도 :
Single.just(somePaylodData)
.map(data -> someConnection.send(data))
.retry(5)
.doOnSuccess(status -> log.info("Yay! {}", status);
지연 후 재시도 :
Single.just(somePaylodData)
.map(data -> someConnection.send(data))
.retryWhen((Flowable<Throwable> f) -> f.take(5).delay(300, TimeUnit.MILLISECONDS))
.doOnSuccess(status -> log.info("Yay! {}", status)
.doOnError((Throwable error)
-> log.error("I tried five times with a 300ms break"
+ " delay in between. But it was in vain."));
someConnection.send ()가 실패하면 소스 싱글이 실패합니다. 이런 일이 발생하면 retryWhen 내부에서 관찰 가능한 오류가 오류를 내 보냅니다. 이 방출을 300ms 지연시키고 재시도 신호를 보내기 위해 다시 보냅니다. take (5)는 5 개의 오류를 수신 한 후 observable의 시그널링이 종료되도록 보장합니다. 재시도 종료를 확인하고 다섯 번째 실패 후 재 시도하지 않습니다.
retryWhen
복잡하고 버그가있는 연산자입니다. 공식 문서와 여기에서 적어도 하나의 답변은 range
연산자를 사용 하며, 재 시도가 없으면 실패합니다. ReactiveX 회원 David Karnok와의 토론 을 참조하십시오 .
나는 변경하여 kjones '대답에 개선 flatMap
에 concatMap
와 추가하여 RetryDelayStrategy
클래스를. flatMap
방출 순서를 보존하지 않습니다 concatMap
. 이는 백 오프 지연에 중요합니다. 는 RetryDelayStrategy
이름에서 알 수 있듯이,하자 사용자가 백 오프를 포함하여 생성 재시도 지연의 다양한 모드를 선택할 수 있습니다. 이 코드는 다음 테스트 케이스와 함께 내 GitHub에서 사용할 수 있습니다 .
- 첫 번째 시도에서 성공 (재시도 없음)
- 1 회 재시도 후 실패
- 3 회 재 시도를 시도했지만 2 회에 성공하므로 3 회 재 시도하지 않습니다.
- 세 번째 재시도 성공
setRandomJokes
방법을 참조하십시오 .
이제 RxJava 버전 1.0 이상에서는 zipWith를 사용하여 지연된 재 시도를 수행 할 수 있습니다.
kjones 답변에 수정 사항 추가 .
수정 됨
public class RetryWithDelay implements
Func1<Observable<? extends Throwable>, Observable<?>> {
private final int MAX_RETRIES;
private final int DELAY_DURATION;
private final int START_RETRY;
/**
* Provide number of retries and seconds to be delayed between retry.
*
* @param maxRetries Number of retries.
* @param delayDurationInSeconds Seconds to be delays in each retry.
*/
public RetryWithDelay(int maxRetries, int delayDurationInSeconds) {
MAX_RETRIES = maxRetries;
DELAY_DURATION = delayDurationInSeconds;
START_RETRY = 1;
}
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
return observable
.delay(DELAY_DURATION, TimeUnit.SECONDS)
.zipWith(Observable.range(START_RETRY, MAX_RETRIES),
new Func2<Throwable, Integer, Integer>() {
@Override
public Integer call(Throwable throwable, Integer attempt) {
return attempt;
}
});
}
}
kjones의 답변과 동일 하지만 RxJava 2.x 버전의 경우 최신 버전으로 업데이트 됨 : ( 'io.reactivex.rxjava2 : rxjava : 2.1.3')
public class RetryWithDelay implements Function<Flowable<Throwable>, Publisher<?>> {
private final int maxRetries;
private final long retryDelayMillis;
private int retryCount;
public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
this.maxRetries = maxRetries;
this.retryDelayMillis = retryDelayMillis;
this.retryCount = 0;
}
@Override
public Publisher<?> apply(Flowable<Throwable> throwableFlowable) throws Exception {
return throwableFlowable.flatMap(new Function<Throwable, Publisher<?>>() {
@Override
public Publisher<?> apply(Throwable throwable) throws Exception {
if (++retryCount < maxRetries) {
// When this Observable calls onNext, the original
// Observable will be retried (i.e. re-subscribed).
return Flowable.timer(retryDelayMillis,
TimeUnit.MILLISECONDS);
}
// Max retries hit. Just pass the error along.
return Flowable.error(throwable);
}
});
}
}
용법:
// 기존 Observable에 재시도 로직을 추가합니다. // 2 초의 지연으로 최대 3 회 재 시도합니다.
observable
.retryWhen(new RetryWithDelay(3, 2000));
retryWhen 연산자에서 반환 된 Observable에 지연을 추가 할 수 있습니다.
/**
* Here we can see how onErrorResumeNext works and emit an item in case that an error occur in the pipeline and an exception is propagated
*/
@Test
public void observableOnErrorResumeNext() {
Subscription subscription = Observable.just(null)
.map(Object::toString)
.doOnError(failure -> System.out.println("Error:" + failure.getCause()))
.retryWhen(errors -> errors.doOnNext(o -> count++)
.flatMap(t -> count > 3 ? Observable.error(t) : Observable.just(null).delay(100, TimeUnit.MILLISECONDS)),
Schedulers.newThread())
.onErrorResumeNext(t -> {
System.out.println("Error after all retries:" + t.getCause());
return Observable.just("I save the world for extinction!");
})
.subscribe(s -> System.out.println(s));
new TestSubscriber((Observer) subscription).awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
}
여기에서 더 많은 예를 볼 수 있습니다. https://github.com/politrons/reactive
Kotlin 및 RxJava1 버전
class RetryWithDelay(private val MAX_RETRIES: Int, private val DELAY_DURATION_IN_SECONDS: Long)
: Function1<Observable<out Throwable>, Observable<*>> {
private val START_RETRY: Int = 1
override fun invoke(observable: Observable<out Throwable>): Observable<*> {
return observable.delay(DELAY_DURATION_IN_SECONDS, TimeUnit.SECONDS)
.zipWith(Observable.range(START_RETRY, MAX_RETRIES),
object : Function2<Throwable, Int, Int> {
override fun invoke(throwable: Throwable, attempt: Int): Int {
return attempt
}
})
}
}
(Kotlin) I little bit improved code with exponential backoff and applied defense emitting of Observable.range():
fun testOnRetryWithDelayExponentialBackoff() {
val interval = 1
val maxCount = 3
val ai = AtomicInteger(1);
val source = Observable.create<Unit> { emitter ->
val attempt = ai.getAndIncrement()
println("Subscribe ${attempt}")
if (attempt >= maxCount) {
emitter.onNext(Unit)
emitter.onComplete()
}
emitter.onError(RuntimeException("Test $attempt"))
}
// Below implementation of "retryWhen" function, remove all "println()" for real code.
val sourceWithRetry: Observable<Unit> = source.retryWhen { throwableRx ->
throwableRx.doOnNext({ println("Error: $it") })
.zipWith(Observable.range(1, maxCount)
.concatMap { Observable.just(it).delay(0, TimeUnit.MILLISECONDS) },
BiFunction { t1: Throwable, t2: Int -> t1 to t2 }
)
.flatMap { pair ->
if (pair.second >= maxCount) {
Observable.error(pair.first)
} else {
val delay = interval * 2F.pow(pair.second)
println("retry delay: $delay")
Observable.timer(delay.toLong(), TimeUnit.SECONDS)
}
}
}
//Code to print the result in terminal.
sourceWithRetry
.doOnComplete { println("Complete") }
.doOnError({ println("Final Error: $it") })
.blockingForEach { println("$it") }
}
in the event when you need to print out the retry count, you can use the example provided in Rxjava's wiki page https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators
observable.retryWhen(errors ->
// Count and increment the number of errors.
errors.map(error -> 1).scan((i, j) -> i + j)
.doOnNext(errorCount -> System.out.println(" -> query errors #: " + errorCount))
// Limit the maximum number of retries.
.takeWhile(errorCount -> errorCount < retryCounts)
// Signal resubscribe event after some delay.
.flatMapSingle(errorCount -> Single.timer(errorCount, TimeUnit.SECONDS));
Based on kjones answer here is Kotlin version of RxJava 2.x retry with a delay as an extension. Replace Observable
to create the same extension for Flowable
.
fun <T> Observable<T>.retryWithDelay(maxRetries: Int, retryDelayMillis: Int): Observable<T> {
var retryCount = 0
return retryWhen { thObservable ->
thObservable.flatMap { throwable ->
if (++retryCount < maxRetries) {
Observable.timer(retryDelayMillis.toLong(), TimeUnit.MILLISECONDS)
} else {
Observable.error(throwable)
}
}
}
}
Then just use it on observable observable.retryWithDelay(3, 1000)
Simply do it like this:
Observable.just("")
.delay(2, TimeUnit.SECONDS) //delay
.flatMap(new Func1<String, Observable<File>>() {
@Override
public Observable<File> call(String s) {
L.from(TAG).d("postAvatar=");
File file = PhotoPickUtil.getTempFile();
if (file.length() <= 0) {
throw new NullPointerException();
}
return Observable.just(file);
}
})
.retry(6)
.subscribe(new Action1<File>() {
@Override
public void call(File file) {
postAvatar(file);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
}
});
참고URL : https://stackoverflow.com/questions/22066481/rxjava-can-i-use-retry-but-with-delay
'development' 카테고리의 다른 글
다른 변수와 비교하기 전에 문자열을 변수에 할당해야합니까? (0) | 2020.09.15 |
---|---|
iPad Safari : 링크가 연결되었을 때 빠른 깜박임 효과를 비활성화하는 방법 (0) | 2020.09.15 |
Ruby에서 한 줄에 메소드를 어떻게 정의합니까? (0) | 2020.09.15 |
Symfony2를 사용하여 Twig 템플릿에서 환경 이름 가져 오기 (0) | 2020.09.15 |
JavaScript에서 시간대 오프셋으로 날짜를 ISO 8601 형식으로 지정하는 방법은 무엇입니까? (0) | 2020.09.15 |