[rxJava] Flowable 과 Observable 의 차이

rxjava 가 메이저 버전 업(1->2)을 하면서 몇 가지 변경점이 생겼다.

변경점에 대한 자세한 내용은 아래 링크를 참조하기 바란다.

Flowable 이라는 base reactive class 가 추가 되었다. Observable 과의 차이는 backpressure buffer의 기본 탑재 유무이다.

backpressure?

우리말로 번역하면 ‘등 뒤에서 떠밀리는 압박’ 정도가 될 듯 하다.

이런 상황을 가정해보자. 콘서트장을 사람들이 가득 메웠다. 콘서트장에 들어오려는 사람들은 저글링 개떼처럼 밀려드는데 나가는 사람은 별로 없다. 콘서트장 출입구를 통제하는 요원이 없다면? 콘서트장이 터지던지 안에 있던 사람들이 짜부러지던지 아무튼 대형 사고가 발생할거다.

publish / subscribe 모델에서도 이런 비극적인 시나리오가 발생할 수 있다. 생산자는 미친듯이 element 를 생산해 내는데 소비자가 처리하는 속도가 이를 따라가지 못한다면

  1. busy waiting 또는
  2. out of memory exception 이 발생할 것이다.

‘등 뒤에서 떠밀리는 압박’ 에 대한 흐름제어를 위한 버퍼가 바로 backpressure buffer 다. 버퍼가 가득 차면 어차피 소비자는 element 를 처리할 여유가 없는 상태이므로 더 이상 publish 를 하지 않는다.

기존에 없던 개념이 새로 추가된 것은 아니다. 기존 rxJava 1.xx 의 경우 Observable 에 backpressure buffer 를 직접 생성해 주면 사용이 가능하다. 허나 rxJava 개발자는 초보자들이 미처 알아채지 못하는 영역에서 기대하지 않는 동작이 일어날 가능성이 있다며 Flowable 을 추가하였다.

다음 예제코드를 보자. 생산자의 생산 속도를 소비자가 따라가지 못하는 시나리오다.
Flowable 을 사용하면 default buffer size(128) 이상 backpressure buffer 에 element 가 쌓일 경우 흐름제어를 한다.

public class example01 {

    public static void main(String... args) throws InterruptedException {

        final String tmpStr = Arrays.stream(new String[10_000_000]).map(x->"*").collect(Collectors.joining());
        Flowable foo = Flowable.range(0, 1000_000_000)
                .map(x-> {
                    System.out.println("[very fast sender] i'm fast. very fast.");
                    System.out.println(String.format("sending id: %s %d%50.50s", Thread.currentThread().getName(), x, tmpStr));
                    return x+tmpStr;
                });

        foo.observeOn(Schedulers.computation()).subscribe(x->{
            Thread.sleep(1000);
            System.out.println("[very busy receiver] i'm busy. very busy.");
            System.out.println(String.format("receiving id: %s %50.50s", Thread.currentThread().getName(), x));
        });

        while (true) {
            Thread.sleep(1000);
        }
    }
}
[very fast sender] i'm fast. very fast.
sending id: main 0**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 1**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 2**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 3**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 4**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 5**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 6**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 7**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 8**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 9**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 10**************************************************

... 중략 ...

[very busy receiver] i'm busy. very busy.
receiving id: RxComputationThreadPool-1 0*************************************************
receiving id: RxComputationThreadPool-1 1*************************************************
receiving id: RxComputationThreadPool-1 2*************************************************
[very fast sender] i'm fast. very fast.
sending id: main 117**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 118**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 119**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 120**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 121**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 122**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 123**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 124**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 125**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 126**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 127**************************************************
[very busy receiver] i'm busy. very busy.
receiving id: RxComputationThreadPool-1 3*************************************************
[very busy receiver] i'm busy. very busy.
receiving id: RxComputationThreadPool-1 4*************************************************
[very busy receiver] i'm busy. very busy.
receiving id: RxComputationThreadPool-1 5*************************************************
[very busy receiver] i'm busy. very busy.
receiving id: RxComputationThreadPool-1 6*************************************************
[very busy receiver] i'm busy. very busy.
receiving id: RxComputationThreadPool-1 7*************************************************
[very busy receiver] i'm busy. very busy.
receiving id: RxComputationThreadPool-1 8*************************************************
[very busy receiver] i'm busy. very busy.
receiving id: RxComputationThreadPool-1 9*************************************************

반면, 같은 시나리오를 Observable 을 backpressure buffer 생성 없이 사용하면 OutOfMemoryException 이 발생한다.

public class example02 {

    public static void main(String... args) throws InterruptedException {

        final String tmpStr = Arrays.stream(new String[10_000_000]).map(x->"*").collect(Collectors.joining());
        Observable foo = Observable.range(0, 1000_000_000)
                .map(x-> {
                    System.out.println("[very fast sender] i'm fast. very fast.");
                    System.out.println(String.format("sending id: %s %d%50.50s", Thread.currentThread().getName(), x, tmpStr));
                    return x+tmpStr;
                });

        foo.observeOn(Schedulers.computation()).subscribe(x->{
            Thread.sleep(1000);
            System.out.println("[very busy receiver] i'm busy. very busy.");
            System.out.println(String.format("receiving id: %s %50.50s", Thread.currentThread().getName(), x));
        });

        while (true) {
            Thread.sleep(1000);
        }
    }
}
[very fast sender] i'm fast. very fast.
sending id: main 0**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 1**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 2**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 3**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 4**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 5**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 6**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 7**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 8**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 9**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 10**************************************************
[very fast sender] i'm fast. very fast.

...중략...

sending id: main 198**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 199**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 200**************************************************
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
	at java.util.Arrays.copyOfRange(Arrays.java:3664)
	at java.lang.String.<init>(String.java:207)
	at java.lang.StringBuilder.toString(StringBuilder.java:407)
	at example02.lambda$main$1(example02.java:24)
	at example02$$Lambda$6/123961122.apply(Unknown Source)
	at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:59)
	at io.reactivex.internal.operators.observable.ObservableRange$RangeDisposable.run(ObservableRange.java:64)
	at io.reactivex.internal.operators.observable.ObservableRange.subscribeActual(ObservableRange.java:35)
	at io.reactivex.Observable.subscribe(Observable.java:10700)
	at io.reactivex.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:33)
	at io.reactivex.Observable.subscribe(Observable.java:10700)
	at io.reactivex.internal.operators.observable.ObservableObserveOn.subscribeActual(ObservableObserveOn.java:45)
	at io.reactivex.Observable.subscribe(Observable.java:10700)
	at io.reactivex.Observable.subscribe(Observable.java:10686)
	at io.reactivex.Observable.subscribe(Observable.java:10589)
	at example02.main(example02.java:27)
[very busy receiver] i'm busy. very busy.
receiving id: RxComputationThreadPool-1 5*************************************************

참고로, Flowable 은 FlowableCreate 라는 builder 에서 생성되며, 특별한 설정이 없을 경우 buffer size 는 최소 16, 기본 128 로 설정한다.

//FlowableCreate.java line:44

    @Override
    public void subscribeActual(Subscriber<? super T> t) {
        BaseEmitter<T> emitter;

        switch (backpressure) {
        case MISSING: {
            emitter = new MissingEmitter<T>(t);
            break;
        }
        case ERROR: {
            emitter = new ErrorAsyncEmitter<T>(t);
            break;
        }
        case DROP: {
            emitter = new DropAsyncEmitter<T>(t);
            break;
        }
        case LATEST: {
            emitter = new LatestAsyncEmitter<T>(t);
            break;
        }
        default: {
            emitter = new BufferAsyncEmitter<T>(t, bufferSize());
            break;
        }
        }

        t.onSubscribe(emitter);
        try {
            source.subscribe(emitter);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            emitter.onError(ex);
        }
    }
//Flowable.java line:61
    static final int BUFFER_SIZE;
    static {
        BUFFER_SIZE = Math.max(16, Integer.getInteger("rx2.buffer-size", 128));
    }

[펌] 임도형 님의 예외처리 가이드

4년 전 현재 직장입사 후 약 한 달 정도 함께 근무하였던 임도형 님의 예외처리에 대한 명 ppt 2개를 소개한다.

첫번째 ppt 는 java 예외 처리의 원칙에 대한 내용이고

두번째 ppt 는 언제 어디에 예외를 남길지에 대한 내용이다.

 

요약하면

  • 발생하는 예외를 뭉개지 마라. IDE로부터 자동생성된 e.printStackTrace() 만 남기는건 아무 처리도 안하는 것 만 못하다.
  • caller / callee 의 경계에서 예외를 처리하면 로그의 중복/누락을 방지할 수 있다.
  • 해당 상황을 인지할 수 있는 충분한 정보를 남겨라.

 

부연하자면

Exception 이란 정의되지 않은 동작을 처리하는 규약이다. 개발자가 정의한 io 의 범주를 벗어날지라도 우리는 이를 처리하는 규약을 정의할 수 있다.

java 에는 크게 두 가지 Exception 이 존재한다.

  • CheckedException
  • UncheckedException(=RuntimeException)

CheckedException 의 경우, 처리의 기준이 명백하고 프로그램은 복구가 가능하다. 처리에 크게 고민할 필요가 없다.

문제는 RuntimeException이다. RuntimeException이 발생하였다는 것은 이 프로그램이 복구 불가능한 상태에 진입하였다는 의미이다. 아래 소개된 임도형 님의 ppt 는 RuntimeException 을 어떻게 잘 처리할지에 대한 내용이다. Exception 처리같은 경우 충분한 경험이 없다면 매끄럽게 해결하기가 어려운데, 이 ppt는 실천적인 가이드를 제시해주는 매우 훌륭한 자료이므로 java 개발자라면 반드시 한번은 읽어볼 필요가 있다.

 

 

python으로 minecraft 를 해보자 #1

아이들 코딩 교육을 어떻게 하는게 재미있을까 이것 저것 알아보다 결국 minecraft 로 결정

minecraft로 여러가지 재미있는 일을 할 수 있다(..고한다.) 그 중에 하나가 malmo project 인데, 개인이 쉽게 구성하기 어려운 unsupervised learning 환경을 minecraft 로 제공하는 프로젝트다.

이 글에선 malmo 는 다루지 않고 minecraft – python 인터페이스를 어떻게 구성하는지만 정리한다.

prerequisite

python 은 설치되어 있어야 한다.

python 실행환경이 minecraft 에서 접근 가능해야 한다.

 

minecraft 설치

설치 : https://minecraft.net/ko-kr/
주의할 점: windows 10 edition 은 설치하지 말 것. python 인터페이스를 구성할 수 없음
현재 설치할 수 있는 가장 최신 버전은 1.11.2 이다.

minecraft forge 설치

minecraft mod 를 설치할 수 있도록 도와주는 tool
처음 설치했던 minecraft와 같은 버전으로 설치한다.
설치 : https://files.minecraftforge.net/

raspberry jam mod 설치

본래는 raspberrypi 에서 minecraft 를 동작시키기 위한 mod.
허나 이 mod 를 설치하면 python 인터페이스를 바로 구성할 수 있다.
설치 : https://github.com/arpruss/raspberryjammod/releases

  1. 위 링크에서 mods.zip 을 받은 후 %APPDATA%\.minecraft\mods 디렉토리 안에 압축 해제한다.
  2. 위 링크에서 python-scripts.zip 을 받은 후 %APPDATA%\.minecraft 디렉토리 안에 압축 해제한다.

확인

minecraft 를 실행한다.

mc1

실행환경이 forge 로 되어있는지 확인한다.

mc2.PNG

%APPDATA%.minecraft\mcpipy 디렉토리 안에 포함된 예제 코드 중 아무거나 minecraft 안에서 실행해 본다.

실행 방법은 /py [python filename] 이다.

mc3.PNG

tensorflow 윈도우 환경 빌드

readme 만 잘 읽고 따라하면 특별할 것은 없다.

근데 정말 잘(!) 읽어야 하므로 주의사항을 기록해 둔다.

 

build tool

bezel 과 cmake 중 하나를 선택할 수 있다.

bezel 이 더 좋아보이긴 하지만 윈도우 환경을 제대로 지원하지 않으므로 mingw 를 설치하여야 한다. 그러니 왠만하면 cmake 로 빌드하자.

 

prerequisite

swig

python 3.5

  • numpy 1.11.0 이상이 설치되어 있어야 한다.

visual studio 2015

(중요) cmake 3.5~3.6

  • 현재 cmake 버전이 3.8rc 까지 나왔는데 이걸로 빌드하면 nvcc 쪽에서 공백 처리 문제 때문에 빌드 에러 난다.
  • 이 문서에서는 이 팁이 가장 중요하다. cmake 버전 확인!
  • 관련 issue

nvidia cuda toolkit 8.0

nvidia cudnn 5.1

 

여기까지만 구성하면 나머지는 readme 의 설명대로 따르면 된다.

cmake 버전만 조심하자.