[rxJava] Flowable 과 Observable 의 차이

rxjava 가 메이저 버전 업(1->2)을 하면서 몇 가지 변경점이 생겼다.

변경점에 대한 자세한 내용은 아래 링크를 참조하기 바란다.

Flowable 이라는 base reactive class 가 추가 되었다. Observable 과의 차이는 backpressure buffer의 기본 탑재 유무이다.

backpressure?

우리말로 번역하면 ‘등 뒤에서 떠밀리는 압박’ 정도가 될 듯 하다.

이런 상황을 가정해보자. 콘서트장을 사람들이 가득 메웠다. 콘서트장에 들어오려는 사람들은 저글링 개떼처럼 밀려드는데 나가는 사람은 별로 없다. 콘서트장 출입구를 통제하는 요원이 없다면? 콘서트장이 터지던지 안에 있던 사람들이 짜부러지던지 아무튼 대형 사고가 발생할거다.

publish / subscribe 모델에서도 이런 비극적인 시나리오가 발생할 수 있다. 생산자는 미친듯이 element 를 생산해 내는데 소비자가 처리하는 속도가 이를 따라가지 못한다면

  1. busy waiting 또는
  2. out of memory exception 이 발생할 것이다.

‘등 뒤에서 떠밀리는 압박’ 에 대한 흐름제어를 위한 버퍼가 바로 backpressure buffer 다. 버퍼가 가득 차면 어차피 소비자는 element 를 처리할 여유가 없는 상태이므로 더 이상 publish 를 하지 않는다.

기존에 없던 개념이 새로 추가된 것은 아니다. 기존 rxJava 1.xx 의 경우 Observable 에 backpressure buffer 를 직접 생성해 주면 사용이 가능하다. 허나 rxJava 개발자는 초보자들이 미처 알아채지 못하는 영역에서 기대하지 않는 동작이 일어날 가능성이 있다며 Flowable 을 추가하였다.

다음 예제코드를 보자. 생산자의 생산 속도를 소비자가 따라가지 못하는 시나리오다.
Flowable 을 사용하면 default buffer size(128) 이상 backpressure buffer 에 element 가 쌓일 경우 흐름제어를 한다.

public class example01 {

    public static void main(String... args) throws InterruptedException {

        final String tmpStr = Arrays.stream(new String[10_000_000]).map(x->"*").collect(Collectors.joining());
        Flowable foo = Flowable.range(0, 1000_000_000)
                .map(x-> {
                    System.out.println("[very fast sender] i'm fast. very fast.");
                    System.out.println(String.format("sending id: %s %d%50.50s", Thread.currentThread().getName(), x, tmpStr));
                    return x+tmpStr;
                });

        foo.observeOn(Schedulers.computation()).subscribe(x->{
            Thread.sleep(1000);
            System.out.println("[very busy receiver] i'm busy. very busy.");
            System.out.println(String.format("receiving id: %s %50.50s", Thread.currentThread().getName(), x));
        });

        while (true) {
            Thread.sleep(1000);
        }
    }
}
[very fast sender] i'm fast. very fast.
sending id: main 0**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 1**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 2**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 3**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 4**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 5**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 6**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 7**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 8**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 9**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 10**************************************************

... 중략 ...

[very busy receiver] i'm busy. very busy.
receiving id: RxComputationThreadPool-1 0*************************************************
receiving id: RxComputationThreadPool-1 1*************************************************
receiving id: RxComputationThreadPool-1 2*************************************************
[very fast sender] i'm fast. very fast.
sending id: main 117**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 118**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 119**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 120**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 121**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 122**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 123**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 124**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 125**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 126**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 127**************************************************
[very busy receiver] i'm busy. very busy.
receiving id: RxComputationThreadPool-1 3*************************************************
[very busy receiver] i'm busy. very busy.
receiving id: RxComputationThreadPool-1 4*************************************************
[very busy receiver] i'm busy. very busy.
receiving id: RxComputationThreadPool-1 5*************************************************
[very busy receiver] i'm busy. very busy.
receiving id: RxComputationThreadPool-1 6*************************************************
[very busy receiver] i'm busy. very busy.
receiving id: RxComputationThreadPool-1 7*************************************************
[very busy receiver] i'm busy. very busy.
receiving id: RxComputationThreadPool-1 8*************************************************
[very busy receiver] i'm busy. very busy.
receiving id: RxComputationThreadPool-1 9*************************************************

반면, 같은 시나리오를 Observable 을 backpressure buffer 생성 없이 사용하면 OutOfMemoryException 이 발생한다.

public class example02 {

    public static void main(String... args) throws InterruptedException {

        final String tmpStr = Arrays.stream(new String[10_000_000]).map(x->"*").collect(Collectors.joining());
        Observable foo = Observable.range(0, 1000_000_000)
                .map(x-> {
                    System.out.println("[very fast sender] i'm fast. very fast.");
                    System.out.println(String.format("sending id: %s %d%50.50s", Thread.currentThread().getName(), x, tmpStr));
                    return x+tmpStr;
                });

        foo.observeOn(Schedulers.computation()).subscribe(x->{
            Thread.sleep(1000);
            System.out.println("[very busy receiver] i'm busy. very busy.");
            System.out.println(String.format("receiving id: %s %50.50s", Thread.currentThread().getName(), x));
        });

        while (true) {
            Thread.sleep(1000);
        }
    }
}
[very fast sender] i'm fast. very fast.
sending id: main 0**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 1**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 2**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 3**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 4**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 5**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 6**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 7**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 8**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 9**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 10**************************************************
[very fast sender] i'm fast. very fast.

...중략...

sending id: main 198**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 199**************************************************
[very fast sender] i'm fast. very fast.
sending id: main 200**************************************************
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
	at java.util.Arrays.copyOfRange(Arrays.java:3664)
	at java.lang.String.<init>(String.java:207)
	at java.lang.StringBuilder.toString(StringBuilder.java:407)
	at example02.lambda$main$1(example02.java:24)
	at example02$$Lambda$6/123961122.apply(Unknown Source)
	at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:59)
	at io.reactivex.internal.operators.observable.ObservableRange$RangeDisposable.run(ObservableRange.java:64)
	at io.reactivex.internal.operators.observable.ObservableRange.subscribeActual(ObservableRange.java:35)
	at io.reactivex.Observable.subscribe(Observable.java:10700)
	at io.reactivex.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:33)
	at io.reactivex.Observable.subscribe(Observable.java:10700)
	at io.reactivex.internal.operators.observable.ObservableObserveOn.subscribeActual(ObservableObserveOn.java:45)
	at io.reactivex.Observable.subscribe(Observable.java:10700)
	at io.reactivex.Observable.subscribe(Observable.java:10686)
	at io.reactivex.Observable.subscribe(Observable.java:10589)
	at example02.main(example02.java:27)
[very busy receiver] i'm busy. very busy.
receiving id: RxComputationThreadPool-1 5*************************************************

참고로, Flowable 은 FlowableCreate 라는 builder 에서 생성되며, 특별한 설정이 없을 경우 buffer size 는 최소 16, 기본 128 로 설정한다.

//FlowableCreate.java line:44

    @Override
    public void subscribeActual(Subscriber<? super T> t) {
        BaseEmitter<T> emitter;

        switch (backpressure) {
        case MISSING: {
            emitter = new MissingEmitter<T>(t);
            break;
        }
        case ERROR: {
            emitter = new ErrorAsyncEmitter<T>(t);
            break;
        }
        case DROP: {
            emitter = new DropAsyncEmitter<T>(t);
            break;
        }
        case LATEST: {
            emitter = new LatestAsyncEmitter<T>(t);
            break;
        }
        default: {
            emitter = new BufferAsyncEmitter<T>(t, bufferSize());
            break;
        }
        }

        t.onSubscribe(emitter);
        try {
            source.subscribe(emitter);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            emitter.onError(ex);
        }
    }
//Flowable.java line:61
    static final int BUFFER_SIZE;
    static {
        BUFFER_SIZE = Math.max(16, Integer.getInteger("rx2.buffer-size", 128));
    }

python trivia

개요

python 이해도를 높이기 위한 목적으로,

잘 만들어진 library 소스코드(tensorflow)를 보다가 특기할 만한 짧은 내용들을 간략히 소개한다.

 

__future__

compiler 지시자로 동작한다.

하위 호환성이 없는 기능을 미리 사용하기 위함이다.

링크 : PEP 236 – Back to the __future__

링크 : future statement

 

__all__

init.py 내에서 작성하는 variable 이다.

package 내의 공개 가능한 submodule 의 범위를 한정하기 위함이다.

복잡한 프로젝트의 구조와 별개로 사용자 인터페이스를 단순하게 하고 싶을 때 사용할 수도 있다.


# tensorflow\__init__.py

# tensorflow.python 패키지 하위의 submodule 들을 사용자가 접근할 때

# tensorflow.xxx 으로 사용 가능하도록 한다.

from tensorflow.python import *

링크 : importing * from a package

 

pylint

정적분석 도구다.

PEP8 스타일 가이드를 따르도록 도와준다.

pylint: 로 시작하는 주석으로  message 를 제어할 수 있다.

# pylint: disable=wildcard-import
from tensorflow.python import *
# pylint: enable=wildcard-import

링크 : PEP8 style guilde for python code

 

SWIG

python – native code wrapper 다.

같은 목적의 도구로 boost::python 가 있다.

둘 다 써보지 않아서 잘은 모르겠지만

SWIG 는 별도의 인터페이스 file 을 작성하여야 하는 번거로움이 있는 대신

python / tcl / perl / java /c# 등 다양한 타겟으로 인터페이스가 가능하다.

boost 빠인 나로선 boost::python 쪽에 더 마음이 간다.

java synchronization internal

java의 동기화는 크게 두 가지로 분류

  • 암묵적인 동기화(synchronized keyword)
  • 명시적인 동기화(concurrent.locks.Lock)

이 두가지는 구현 방식이 다르다.

  • synchronized는 jvm의 monitorenter / monitorexit 인스트럭션을 호출
  • Lock 은 hotspot에서 바로 native intrinsic function 을 호출

이 글에선 synchronized 의 구현을 살펴본다.

jvm-hotspot 소스코드는 openjdk8 을 참조하였으므로 다른 구현체(oracle, android 등)에서는 상황이 다를 수도 있지만 실제로 그럴 것 같지는 않다.

synchronized keyword

알다시피 synchronized keyword는 두 가지 경우에 쓰인다.

  • synchronized method
  • synchronized block

두 경우 모두 lock의 획득/반환은 jvm bytecode 중 monitorenter/monitorexit 인스트럭션으로
수행한다.

글에 착오가 있어 수정한다.

  1. synchronized block은 컴파일러에 의해 monitorenter / monitorexit 인스트럭션으로 변환이 되는 반면,
  2. synchronized method 는 bytecodeInterpreter가 method를 수행하는 시점에 동기화 여부를 판별하지 monitorenter/monitorexit 인스트럭션으로 컴파일되지는 않는다.

두 경우 모두 InterpreterRuntime::monitorenter() InterpreterRuntime::monitorexit() 메쏘드를 호출하기 때문에 최종적으로 lock을 획득하는 방식은 같지만 메커니즘은 다르다. hotspot 소스코드를 주의깊게 보지 않아 이런 혼동이 왔다. 이 글을 통해 잘못된 정보가 전달될 수 있으므로 앞으로는 조심하겠다.

jvm spec

synchronized method

synchronized method 에서 주의깊게 살펴 볼 내용은 biased lock 이라는 성능 향상을 위한 기법이다. document에 의하면 un-contented thread(=경쟁상태가 아닌 thread)의 성능 향상 효과가 있다고 하며 실제로 그렇다.

biased lock

biased lock이란 대부분의 java 쓰레드는 동기화를 위한 object가 1개인 경우가 많은데에서 착안,

object header 에 thread ID 와 biased 여부를 확인하는 flag 를 두어

동일 thread 가 연속적으로 critical section에 접근하는 경우 (= resource 경쟁 상태가 아닌경우)

atomic operation 을 수행하지 않는 lock 을 제공하여 수행 속도를 향상시키는 기법이다.

요약하면, biased lock = atomic operation을 하지 않는 lock = 흉내만 내는 lock 이다.

Dice-Moir-Scherer QR lock 최초 논문

hotspot에서의 biased lock 을 설명한 오라클 블로그

openJDK wiki 중 synchronization

implementation

openJDK8에서 bytecode interpreter가 synchronized method를 해석하는 구현. 모든 invoke 된 method는 이 구문에 진입하는데, synchronized keyword가 있으면 다음의 순서로 단계적으로 lock 획득을 시도한다.

  1. lock 최초 획득 – 이때 오브젝트 헤더에 thread id 와 bias flag 를 설정한다.
  2. biased lock 획득 시도 – 오브젝트 헤더와 동일한 thread 접근 시
  3. reboke bias – biased lock 획득을 실패
    1. thin lock 시도 – aging count 증가시킨다
    2. inflate lock 시도
  4. re-bias

hotspot/share/vm/interpreter/bytecodeInterpreter.cpp line:683

//bytecode interpreter 가 method 진입
case method_entry: {
... 중략 ...
// synchronized method 이면 Lock
if (METHOD->is_synchronized()) {
... 중략 ...
// 가장 먼저 biased lock 이 가능한지 확인한다.
// revoke 가능하면 revoke
// 아니면 re-bias
// 아니면 anonymously bias
if (mark->has_bias_pattern()) {
uintptr_t thread_ident;
uintptr_t anticipated_bias_locking_value;
thread_ident = (uintptr_t)istate->thread();
anticipated_bias_locking_value =
(((uintptr_t)rcvr->klass()->prototype_header() | thread_ident) ^ (uintptr_t)mark) &
~((uintptr_t) markOopDesc::age_mask_in_place);

if (anticipated_bias_locking_value == 0) {
// Already biased towards this thread, nothing to do.
if (PrintBiasedLockingStatistics) {
(* BiasedLocking::biased_lock_entry_count_addr())++;
}
success = true;
} else if ((anticipated_bias_locking_value & markOopDesc::biased_lock_mask_in_place) != 0) {
// Try to revoke bias.
markOop header = rcvr->klass()->prototype_header();
if (hash != markOopDesc::no_hash) {
header = header->copy_set_hash(hash);
}
if (Atomic::cmpxchg_ptr(header, rcvr->mark_addr(), mark) == mark) {
if (PrintBiasedLockingStatistics)
(*BiasedLocking::revoked_lock_entry_count_addr())++;
}
} else if ((anticipated_bias_locking_value & epoch_mask_in_place) != 0) {
// Try to rebias.
markOop new_header = (markOop) ( (intptr_t) rcvr->klass()->prototype_header() | thread_ident);
if (hash != markOopDesc::no_hash) {
new_header = new_header->copy_set_hash(hash);
}
if (Atomic::cmpxchg_ptr((void*)new_header, rcvr->mark_addr(), mark) == mark) {
if (PrintBiasedLockingStatistics) {
(* BiasedLocking::rebiased_lock_entry_count_addr())++;
}
} else {
CALL_VM(InterpreterRuntime::monitorenter(THREAD, mon), handle_exception);
}
success = true;
} else {
// Try to bias towards thread in case object is anonymously biased.
markOop header = (markOop) ((uintptr_t) mark &
((uintptr_t)markOopDesc::biased_lock_mask_in_place |
(uintptr_t)markOopDesc::age_mask_in_place | epoch_mask_in_place));
if (hash != markOopDesc::no_hash) {
header = header->copy_set_hash(hash);
}
markOop new_header = (markOop) ((uintptr_t) header | thread_ident);
// Debugging hint.
DEBUG_ONLY(mon->lock()->set_displaced_header((markOop) (uintptr_t) 0xdeaddead);)
if (Atomic::cmpxchg_ptr((void*)new_header, rcvr->mark_addr(), header) == header) {
if (PrintBiasedLockingStatistics) {
(* BiasedLocking::anonymously_biased_lock_entry_count_addr())++;
}
} else {
CALL_VM(InterpreterRuntime::monitorenter(THREAD, mon), handle_exception);
}
success = true;
}
}

// biased lock 이 실패하면 기존 방식대로 lock을 획득한다.
// Traditional lightweight locking.
if (!success) {
markOop displaced = rcvr->mark()->set_unlocked();
mon->lock()->set_displaced_header(displaced);
bool call_vm = UseHeavyMonitors;
if (call_vm || Atomic::cmpxchg_ptr(mon, rcvr->mark_addr(), displaced) != displaced) {
// Is it simple recursive case?
if (!call_vm && THREAD->is_lock_owned((address) displaced->clear_lock_bits())) {
mon->lock()->set_displaced_header(NULL);
} else {
CALL_VM(InterpreterRuntime::monitorenter(THREAD, mon), handle_exception);
}
}
}
}
THREAD->clr_do_not_unlock();

아래 코드는 monitorenter / monitorexit 구현이다. 궁극적으로 synchronized method나 synchronized block 모두 InterpreterRuntime::monitorenter() 메소드를 호출한다.
hotspot/share/vm/interpreter/interpreterRuntime.cpp line:606

IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem))
#ifdef ASSERT
thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
if (PrintBiasedLockingStatistics) {
Atomic::inc(BiasedLocking::slow_path_entry_count_addr());
}
Handle h_obj(thread, elem->obj());
assert(Universe::heap()->is_in_reserved_or_null(h_obj()),
"must be NULL or an object");
if (UseBiasedLocking) {
// Retry fast entry if bias is revoked to avoid unnecessary inflation
ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK);
} else {
ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK);
}
assert(Universe::heap()->is_in_reserved_or_null(elem->obj()),
"must be NULL or an object");
#ifdef ASSERT
thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
IRT_END

//%note monitor_1
IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorexit(JavaThread* thread, BasicObjectLock* elem))
#ifdef ASSERT
thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
Handle h_obj(thread, elem->obj());
assert(Universe::heap()->is_in_reserved_or_null(h_obj()),
"must be NULL or an object");
if (elem == NULL || h_obj()->is_unlocked()) {
THROW(vmSymbols::java_lang_IllegalMonitorStateException());
}
ObjectSynchronizer::slow_exit(h_obj(), elem->lock(), thread);
// Free entry. This must be done here, since a pending exception might be installed on
// exit. If it is not cleared, the exception handling code will try to unlock the monitor again.
elem->set_obj(NULL);
#ifdef ASSERT
thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
IRT_END

synchronized block

synchronized block은 해당하는 block의 시작과 끝을 java compiler가 monitorenter / monitorexit 로 변환한다.

아래 예제코드는 synchronized block 과 synchronized method 를 bytecode 로 변환하면 어떻게 되는지 보여준다.

public class SynchronizedExample {
public static void main(String[] args) {
Foo foo = new Foo();
synchronized (foo) {
++foo.foo;
}

methodFoo(foo);
}

static synchronized void methodFoo(Foo foo) {
++foo.foo;
}

static class Foo {
public int foo;
}
}

main 메쏘드. synchronized block 은 monitorenter 로 진입, monitorexit 로 끝나지만 synchronized method는 메소드를 invoke할 뿐이다.

0 new #2 <SynchronizedExample$Foo>
3 dup
4 invokespecial #3 <SynchronizedExample$Foo.<init>>
7 astore_1
8 aload_1
9 dup
10 astore_2
11 monitorenter //synchronized block 시작
12 aload_1
13 dup
14 getfield #4 <SynchronizedExample$Foo.foo>
17 iconst_1
18 iadd
19 putfield #4 <SynchronizedExample$Foo.foo>
22 aload_2
23 monitorexit  // synchronized block 끝
24 goto 32 (+8)
27 astore_3
28 aload_2
29 monitorexit
30 aload_3
31 athrow
32 aload_1
33 invokestatic #5 <SynchronizedExample.methodFoo> // method invoke
36 return

synchronized 메소드

0 aload_0
1 dup
2 getfield #5 <SynchronizedExample$Foo.foo>
5 iconst_1
6 iadd
7 putfield #5 <SynchronizedExample$Foo.foo>
10 return

결론

Lock object 를 명시적으로 사용하는 것 보다 synchronized keyword 를 사용하였을 때 유리한 경우는 다음과 같다.

  • 리소스가 공유될 필요는 있으나, 빈도가 적다.(=경쟁적이지 않다.)
  • 공유 리소스에 한 번 접근하였던 thread가 재진입하는 경우가 빈번하다.

concurrent programming #1

왜 concurrent programming 인가?

clockspeed

2000년대 초반까지 cpu의 처리속도는 2년 마다 2배씩 증가해 왔습니다. 복잡한 연산을 하고 싶으면 돈을 더 내고 비싼 cpu를 사면 해결되는 시대였죠. 그런데 어느 순간부터 cpu의 처리속도는 더 이상 빨라지지 않고 있습니다. 회로의 집적도와 전자의 이동속도에 한계가 있기 때문에 양자컴퓨터라도 나오지 않는 이상 현재로선 cpu의 수직적인 처리속도를 높일 수 있는 마땅한 방법은 없는 듯 합니다.

Cores.png

2004년 부터는 cpu의 처리속도를 높이는 대신 병렬적으로 코어의 갯수를 늘리는 시도를 하였습니다. 그리고 10여 년이 지난 현재, 손바닥만한 핸드폰에도 예외없이 cpu는 2개 이상 들어가는 것이 당연한 시대가 되었습니다. 멀티 코어 환경이 당연한 시대에 과연 그 안에서 동작하는 프로그램은 멀티 코어 환경에 적합하도록 만들어 졌을까요?

 

thread vs process

잘 아시겠지만 다수의 컴퓨팅 유닛을 사용하는 방법은

  1. multi processing
  2. multi threading

이 두 가지 방법 뿐입니다.

그렇다면 쓰레드랑 프로세스의 차이점은 무엇일까요? 찾아보면 많이들 나오니 자세한 설명은 패스하고 주요 차이점만 서술하면

쓰레드는 프로세스에 비해

  1. 생성 비용이 낮고
  2. 리소스 공유가 쉽습니다.

runtimearea2

위 그림은 jvm의 run-time data area 인데요. java 프로세스를 하나 생성하기 위해선 위 6가지 영역(정확히는 5가지, run-time constant pool은 method area에 포함) 이 모두 새로 생성되어야 합니다. 허나 쓰레드는 초록색 박스 영역만 새로 생성하고, 빨간색 박스 영역은 부모 프로세스와 공유합니다. java 를 예로 들었지만 다른 언어나 os도 상황은 대동소이합니다.

프로세스가 서로 리소스를 공유하는 수단은 1) named pipe 2) socket 3) shared memory 4) message 등이 있습니다. 이러한 프로세스 간 통신 수단은 어플리케이션 외부 커널에서 관리되기 때문에 비용이 높고 규약 외 처리(=exception)를 잘 정의하여야 합니다.

반면 쓰레드는 프로세스에 비해 리소스를 공유하는 방법이 쉽습니다.

  1. heap 에 리소스를 할당하거나
  2. 전역 변수를 설정하거나
  3. static 변수를 설정하면

해당 리소스는 쓰레드 간 공유가 가능합니다.

java의 경우, 전역변수가 없고 static keyword 는 멤버 변수에만 할당이 가능하므로 리소스가 공유되는 경우는 다음 두 가지 뿐입니다.

  • heap에 할당된 리소스 및 레퍼런스
  • static 멤버 변수 및 레퍼런스

 

thread 문제들

멀티쓰레드 프로그램은 분명 cpu 처리속도의 한계를 극복하기 위한 좋은 대안입니다만, 개발자에게 다양한 문제를 안겨주기도 합니다. 아마도 개발 초년생에게 가장 어려운 문제가 thread 문제일겁니다. thread 문제가 어려운 이유는 무엇일까요?

  1. 재현이 어렵고
  2. 현상이 그때그때 다르고
  3. 인간의 뇌가 동시에 두 가지 사고를 할 수 없기 때문

이 아닐까 생각합니다. 허나 얼핏 복잡다단해 보이는 thread 문제에도 유형이 있으니 문제의 유형들을 분류해 보도록 합시다.

thread 문제의 양상은 크게 두 가지로 구분합니다.

  1. thread safety 문제
  2. 기대하지 않는 동작

thread safety 문제는 프로그래머가 리소스 공유에 대한 이해가 부족하기 때문에 발생합니다. 공유되는 리소스는 반드시라고 해도 좋을 정도로 대부분 동기화 문제가 발생합니다. 리소스가 공유되더라도 동기화 문제가 발생하지 않는 경우는 다음 세 가지 뿐입니다. 이 세 가지는 반드시 기억해 두길 바랍니다. 여러 가지 다른 thread 문제들은 쉽게 예측이 어렵고 해결방안이 뚜렷하지 않을 수도 있으나 thread safety 문제는 명약관화하고 해결방안이 분명합니다.

  1. 공유 리소스가 immutable 하거나
  2. critical section에 대한 동기화 보장을 하였거나
  3. instruction 이 atomic 하거나

thread safety를 보장하였는데도 어플리케이션이 기대하지 않는 엉뚱한 동작을 한다면 이는 어플리케이션 동작 환경을 고려하지 않은 설계/정책의 문제입니다. 쓰레드는 태생적으로 제어가 어렵습니다. 실행중인 쓰레드가 언제 종료될지, sleep 상태의 쓰레드가 언제 다시 작업을 재개할 지 실행중에는 알 수가 없기 때문입니다. 또한 모든 thread 는 태생적으로 다른 thread들과 리소스를 점유하기 위한 경쟁관계에 놓여있기 때문에 경쟁의 과열로 인한 문제가 발생할 가능성이 항상 존재합니다.

  1. starvation
  2. dead lock / live lock 문제
  3. aba 문제

이러한 문제들은 설계와 정책으로 해결하여야 합니다. 어플리케이션의 동작환경에 따라, 사용자의 사용 패턴에 따라 해당 문제는 발생할 수도 있고 안할 수도 있습니다. 또한 어떤 문제들은 소프트웨어 설계 시점에 원천적으로 문제의 원인을 봉쇄할 수도 있습니다. 모든 분야가 마찬가지겠지만 소프트웨어 설계와 정책은 외부 환경에 대한 고려가 필수적입니다.

 

동기화 방식과 성능 병목

monitor

critical section

mutex

semaphore

spinlock

 

reentrantlock

synchronized

 

volatile

memory transaction (hardware / software)

lock-free / wait-free

concurrent programming 사전 학습

금번 스터디 주제는 concurrent programming 입니다.

멀티코어 환경에서 코어 갯수가 2배로 늘어나면 성능도 2배로 늘어나기를 기대하지만, 실제로 thread가 어떤 구조로 되어있고, 어떤 자원들이 공유되며, 해당 공유 자원을 코어가 어떻게 처리하는지에 대한 정확한 이해가 없으면 이러한 성능향상은 있을 수 없습니다. 때문에 멀티코어 환경에서 프로그램의 성능을 높이려면 concurrent 한 데이터 처리 기법에 대한 이해가 반드시 필요합니다.

스터디 시작 전 미리 알아두어야 할 사전 지식에 대하여

  • 자료 링크
  • (설명이 필요한 내용은)간단한 설명

를 이 페이지에 정리해 두었으니 스터디 시작 전에 미리 공부를 해두었으면 합니다.

사전 학습 내용은 모두 java 및 jvm 내용이지만, 다른 언어/os 도 비슷비슷합니다. 하나만 제대로 알고 있으면 나머지는 쉽게 이해가능합니다.

jvm memory 구조

jvm run-time data area

dependency hell과 빌드지옥 탈출

jvm의 메모리 구조를 잘 이해하려면 java process가 어떻게 동작하는지 알아야 합니다. 근데 아이러니하게도 java process가 어떻게 동작하는지 알려면 jvm의 메모리 구조를 이해하고 있어야 합니다. 이러한 모순적 상황에서 이해의 수준을 높이려면 메모리 구조와 process 의 동작 원리를 반복적으로 여러번 공부하는 방법 밖에는 없습니다.

runtimeArea.PNG

jvm 내에서 프로세스의 흐름을 기술하기 위한 저장 영역을 run-time data area 라고 합니다. 어떤 process가 실행되어 종료될 때 까지 사용되는 모든 데이터는 이 영역에 기록됩니다.

  • program counter register : 프로그램 카운터(pc)는 현재 동작중인 프로세스/또는 쓰레드가 sleep 상태에 진입하여 CPU 점유권을 잃어버렸다가 다시 되찾을 때 현재 이 쓰레드가 어디까지 진행되었는지에 대한 기록을 남겨놓기 위한 저장공간입니다. pc register에는 현재 진행중인 instruction의 주소값이 저장됩니다.
  • jvm stack : 프로그램 내에서 method가 호출되면 그 method 를 수행하기 위해 필요한 저장공간을 제공합니다. 또한 method의 호출은 계층적으로 호출순서가 발생하므로 이 순서를 보장하여 처리하기 위해 stack 을 사용합니다. stack에 저장되는 element의 단위는 frame 이라 합니다.
  • native method stack : native method stack은 조금 특별한 영역으로, jvm의 깍두기와도 같은 예외 케이스입니다. 외부에서 실행되는 native process와의 인터페이스(jni) 를 호출하기 위한 공간입니다.
  • method area : method area는 프로그램을 실행하기 위한 메타정보(class구조, method, constructor 등등)을 저장하는 영역입니다. 특히 static 으로 선언된 변수가 이 영역에 저장된다는 사실을 기억해 두기 바랍니다.
  • run time constant pool : run time constant pool 은 method area 내부에 존재하는 영역으로, constant,  class 및 interface의 symbolic reference table 정보 등을 저장하고 있습니다.
  • heap : run time 에 동적으로 할당되는 데이터가 저장되는 영역입니다. heap에 할당된 데이터는 gc의 대상입니다.

지금부터 설명하는 내용이 중요합니다. run-time data area 내에서 어떤 영역은 프로세스 내의 모든 thread 들이 공유하는 반면, 어떤 영역은 각 thread 마다 독립적으로 생성됩니다. 여기서 thread 간에 공유되는 리소스 영역이 중요합니다. 극단적으로 말하면 모든 쓰레드 문제는 동기화 문제이며, 모든 동기화 문제는 공유 리소스로부터 발생합니다.

run-time data area 에서 다음 영역은 process 내 모든 thread가 공유합니다.

  • method area
  • run-time constant pool
  • heap

다음 영역은 thread 1개 마다 독립적으로 1개씩 생성되며, 데이터는 공유되지 않습니다.

  • program counter register
  • stack
  • native method stack

runtimeArea2.PNG

java process

process 를 간결하고 정확하게 정의하자면, “program in execution”(=실행중인 프로그램)입니다.

process 가 실행되는 순서는 다음과 같습니다.

  1. jvm startup
  2. load class
  3. link class
  4. init class
  5. create class

java process의 동작 원리를 자세하게 설명하는 것은 본 문서의 목적을 넘어서므로 무얼 보고 공부해야 하는지 링크만 제공합니다.

java program execution

 

windows 환경에서 python/oracle 연결

Prerequisite

윈도우 환경에서 python / oracle 연결하려면 사전에 준비할 것들이 좀 된다.

  1. python 버전에 맞는 VC Common tools
  2. Oracle Instant Client Package SDK

 

Python version 에 맞는 VC++ 및 VC Common tools 설치

python 버전에 따라 vc++ 버전이 다르다.

아래 표를 보고 해당하는 버전의

1) VC++ 재배포가능 패키지(https://www.microsoft.com/ko-kr/download/details.aspx?id=48145) 또는

2) Visual Studio(https://www.visualstudio.com/ko-kr/downloads/download-visual-studio-vs.aspx)

를 설치한다.

VC++ version
Python version
14 3.5
10 3.3, 3.4
9 2.6, 2.7, 3.0, 3.1, 3.2

 

프로그램 언어 -> Visual C++ -> Visual C++ Common Tools 를 설치한다.

 

Oracle Instant Client Package SDK

Instant Client Package – SDK(http://www.oracle.com/technetwork/topics/winx64soft-089540.html) 를 다운로드한다.

다운로드한 sdk의 경로를 시스템 PATH 환경변수에 등록한다.

 

oracle python interface 설치

상기 tool 들을 설치하면 cx_Oracle 패키지가 설치된다.

pip install cx_Oracle

 

Locale 맞추기

한글을 정상적으로 입출력 하려면 system locale과 db locale 을 일치시켜야 한다.

locale 일치는 반드시 db connection 이전에 완료되어야 한다.

1) 현행 DB locale 확인

locale
cur.execute("select userenv('LANGUAGE') from dual")

2) system locale 환경변수 등록

locale
os.putenv('NLS_LANG', 'AMERICAN_AMERICA.AL32UTF8')

rabbitmq reference

prerequisite

rabbitmq 설치 : https://www.rabbitmq.com/download.html

python 설치(다른 언어로 대체 가능하나, 예제는 python3.5임)

rabbitmqctl 사용법

ack 확인되지 않은 msg list

rabbitmqctl list_queues name messages_ready messages_unacknowledged

 

모든 queue 정보 삭제(=reset)

rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app

 

매번 queue 삭제/리스타트하기 귀찮아서 python으로 짰다.

본인 환경에 맞도록 적당히 수정해서  쓰면 될 듯

pp = pprint.PrettyPrinter(indent=2, compact=True)
result = []
mqctlExec = '"C:\\Program Files\\RabbitMQ Server\\rabbitmq_server-3.6.3\sbin\\rabbitmqctl" {param}'
spExec = """for line in sp.Popen(mqctlExec.format(param='{param}'), shell=True, stdout=sp.PIPE, stderr=sp.PIPE).stdout:
    result.append(line)"""
exec(spExec.format(param='stop_app'))
exec(spExec.format(param='reset'))
exec(spExec.format(param='start_app'))
pp.pprint(result) 

 

 

basic queue

기본적으로 mq로 msg 전송/수신하기 위한 최소 설정

1) queue

2) channel

3) message body

basicSender
import pika
def main():
    con = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = con.channel()
    channel.queue_declare(queue='emc')
    cnt = 0
    while(True):
        a = input()
        cnt += 1
        msg = "no {number} Hello Basic Sender".format(number=cnt)
        channel.basic_publish(exchange='', routing_key='emc', body=msg)
        print(str(cnt) + " send success >>" + msg)
    con.close()
if __name__=="__main__":
    main()
basicReceiver
import pika
def callback(ch, method, properties, body):
    print(body)
    ch.basic_ack(delivery_tag=method.delivery_tag)
def main():
    con = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = con.channel()
    channel.queue_declare(queue='emc')
    channel.basic_consume(callback, queue='emc', no_ack=True)
    channel.start_consuming()
if __name__=="__main__":
    main()

문제점

메세지 유실

  • sender 영역 : rabbitmq 서비스가 재시작되면 메세지가 유실될 수 있다.
  • receiver 영역 : receiver가 받을 준비가 되어 있지 않으면 메세지가 유실될 수 있다.

 

durable queue

다음과 같이 message를 유실하지 않기 위한 안전장치를 마련한다.

  • queue(rabbitmq 서비스) : queue 를 durable = True 로 설정
  • message(sender 설정) : delivery mode를 persistence 로 설정
  • ack(receiver 설정) : message를 수신하였을 때, ack 를 송신하도록 설정

durable queue 주의점

만일 ack를 송신하지 않으면 message가 삭제되지 않으므로 주의한다.

 

durable = True 일 때 message가 삭제되지 않을 조건

queue의 durability 를 True로 설정하면 message 의 생명 주기는 해당 message의 delivery mode에 의하여 결정된다.

  • delivery mode 1 (=non persistent) : 서비스가 종료되면 message는 삭제된다.
  • delivery mode 2 (=persistent) : 서비스가 종료되더라도 삭제되지 않는다.
durableSender.py
import pika
def main():
    con = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = con.channel()
    channel.queue_declare(queue='emc', durable=True)
    cnt = 0
    while(True):
        a = input()
        cnt += 1
        msg = "no {number} Hello Basic Sender".format(number=cnt)
        channel.basic_publish(exchange='', routing_key='emc', body=msg,
                              properties=pika.BasicProperties(delivery_mode=2))
        print(str(cnt) + " send success >>" + msg)
    con.close()
if __name__=="__main__":
    main()
durableReceiver.py
import pika
def callback(ch, method, properties, body):
    print(body)
    ch.basic_ack(delivery_tag=method.delivery_tag)
def main():
    con = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = con.channel()
    channel.queue_declare(queue='emc', durable=True)
    channel.basic_consume(callback, queue='emc')
    channel.start_consuming()
if __name__=="__main__":
    main()

 

load balancing

아래 그림과 같이 2대의 receiver가 message를 받아 처리하는 구조를 가정해 보자.

 

rabbitmq의 기본 balancing 전략은 round-robin 이다.

만일 receiver 중 1대가 매우 바쁘면, 해당 receiver에 전달되어야 할 message는 제때 처리되지 못한 채 queue에 적체될 것이다.

 

busyReceiver.py
import pika
def busyFunction():
    while(True):
        a = input()
        print('i am busy. very busy.')
def callback(ch, method, properties, body):
    print(body)
    busyFunction()
    ch.basic_ack(delivery_tag=method.delivery_tag)
def main():
    con = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = con.channel()
    channel.queue_declare(queue='emc', durable=True)
    channel.basic_consume(callback, queue='emc')
    channel.start_consuming()
if __name__=="__main__":
    main()
//durableSender.py 로 message 를 10 개 발송
1 send success >>no 1 Hello Basic Sender
2 send success >>no 2 Hello Basic Sender
3 send success >>no 3 Hello Basic Sender
4 send success >>no 4 Hello Basic Sender
5 send success >>no 5 Hello Basic Sender
6 send success >>no 6 Hello Basic Sender
7 send success >>no 7 Hello Basic Sender
8 send success >>no 8 Hello Basic Sender
9 send success >>no 9 Hello Basic Sender
10 send success >>no 10 Hello Basic Sender
 
//durableReceiver.py 는 1,3,5,7,9 번째 메세지를 전달받는다.
b'no 1 Hello Basic Sender'
b'no 3 Hello Basic Sender'
b'no 5 Hello Basic Sender'
b'no 7 Hello Basic Sender'
b'no 9 Hello Basic Sender'
...busyReceiver 가 오랜시간(default 1분)동안 heartbeat을 주지 못해 connection closed 되면 unacked message 인 2,4,6,8,10 번째 메세지를 전달 받는다.
b'no 2 Hello Basic Sender'
b'no 4 Hello Basic Sender'
b'no 6 Hello Basic Sender'
b'no 8 Hello Basic Sender'
b'no 10 Hello Basic Sender'
//busyReceiver.py 는 바빠서 2번째 메세지를 받은 뒤 ack 를 보내지 못했다.
b'no 2 Hello Basic Sender'
i am busy. very busy.

 

qos 의 prefetch_count 를 설정하면 ack를 받지 못할 경우, 해당 receiver 로 message 를 발송하지 않는다.

busyReceiver.py
import pika
def busyFunction():
    while(True):
        a = input()
        print('i am busy. very busy.')
def callback(ch, method, properties, body):
    print(body)
    busyFunction()
    ch.basic_ack(delivery_tag=method.delivery_tag)
def main():
    con = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = con.channel()
    channel.queue_declare(queue='emc', durable=True)
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(callback, queue='emc')
    channel.start_consuming()
if __name__=="__main__":
    main()
//durableSender.py 로 message 를 10 개 발송
1 send success >>no 1 Hello Basic Sender
2 send success >>no 2 Hello Basic Sender
3 send success >>no 3 Hello Basic Sender
4 send success >>no 4 Hello Basic Sender
5 send success >>no 5 Hello Basic Sender
6 send success >>no 6 Hello Basic Sender
7 send success >>no 7 Hello Basic Sender
8 send success >>no 8 Hello Basic Sender
9 send success >>no 9 Hello Basic Sender
10 send success >>no 10 Hello Basic Sender
//balancedReceiver.py 는 바빠서 1번째 메세지를 받은 뒤 ack 를 보내지 못했다.
//prefetch-count 를 1로 설정하였기 때문에, ack를 1번 받지 못한 balancedReceiver에게는 다시 message를 전달하지 않는다.
b'no 1 Hello Basic Sender'
i am busy. very busy.
 
//durableReceiver.py 는 2,3,4,5,6,7,8,9 번째 메세지를 전달받는다.
b'no 2 Hello Basic Sender'
b'no 3 Hello Basic Sender'
b'no 4 Hello Basic Sender'
b'no 5 Hello Basic Sender'
b'no 6 Hello Basic Sender'
b'no 7 Hello Basic Sender'
b'no 8 Hello Basic Sender'
b'no 9 Hello Basic Sender'
b'no 10 Hello Basic Sender'
...busyReceiver 가 오랜시간(default 1분)동안 heartbeat을 주지 못해 connection closed 되면 unacked message 인 1 번째 메세지를 전달 받는다.
b'no 1 Hello Basic Sender'

 

spring boot

참고 문서

 

spring-boot

spring boot life-cycle

Cosysto Gimbh 라는 사람이 life-cycle에 대한 flow 를 아주 잘 그려놓았다.

 


 

실제 구현체 이름은 매우 길고 복잡하다. (e.g. ConfigurationWarningsApplicationContextInitializer)

spring application 내부에서 실제로 어떤 일이 일어나는지에 집중하도록 긴 이름은 짧게 축약하였다.

 

ApplicationContext 생성(=SpringApplication.run())

  • Listener 생성 및 시작
  • Environment 준비
  • Context 생성
  • Context 준비
    • Environment 세팅
    • 후처리(post process)
    • Initializer 초기화(apply initializer)
  • Context 로드
    • Source 로드
    • BeanDefinitionLoader 에 Source , Context 탑재
    • BeanDefinitionLoader 로드
  • 종료
    • Context Refresh

 

ApplicationContext 는 SpringApplication의 몸통에 해당하는 실제 instance의 추상화이다.

SpringApplication

SpringApplication 의 Entry point 는 반드시 특정 package를 지정하고, 그 하위에 위치시켜야 한다. (=Default package path에 entry point 시작 금지)

그렇지 않으면 ** WARNING ** : Your ApplicationContext is unlikely to start due to a @ComponentScan of the default package.  이 발생한다.

 

@SpringBootApplication 을 사용하면 다음 Annotation 을 생략 가능하다.

Source Code


@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Configuration
@EnableAutoConfiguration
@ComponentScan
public @interface SpringBootApplication {
/**
* Exclude specific auto-configuration classes such that they will never be applied.
* @return the classes to exclude
*/
Class<?>[] exclude() default {};
}

 

SpringApplicationRunListener

ConfigurableEnvironment

ConfigurableApplicationContext

 

실제로 호출되는 implementation 은 AnnotationConfigEmbddedWebApplicationContext 이다.