prerequisite

rabbitmq 설치 : https://www.rabbitmq.com/download.html

python 설치(다른 언어로 대체 가능하나, 예제는 python3.5임)

rabbitmqctl 사용법

ack 확인되지 않은 msg list

rabbitmqctl list_queues name messages_ready messages_unacknowledged

 

모든 queue 정보 삭제(=reset)

rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app

 

매번 queue 삭제/리스타트하기 귀찮아서 python으로 짰다.

본인 환경에 맞도록 적당히 수정해서  쓰면 될 듯

pp = pprint.PrettyPrinter(indent=2, compact=True)
result = []
mqctlExec = '"C:\\Program Files\\RabbitMQ Server\\rabbitmq_server-3.6.3\sbin\\rabbitmqctl" {param}'
spExec = """for line in sp.Popen(mqctlExec.format(param='{param}'), shell=True, stdout=sp.PIPE, stderr=sp.PIPE).stdout:
    result.append(line)"""
exec(spExec.format(param='stop_app'))
exec(spExec.format(param='reset'))
exec(spExec.format(param='start_app'))
pp.pprint(result) 

 

 

basic queue

기본적으로 mq로 msg 전송/수신하기 위한 최소 설정

1) queue

2) channel

3) message body

basicSender
import pika
def main():
    con = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = con.channel()
    channel.queue_declare(queue='emc')
    cnt = 0
    while(True):
        a = input()
        cnt += 1
        msg = "no {number} Hello Basic Sender".format(number=cnt)
        channel.basic_publish(exchange='', routing_key='emc', body=msg)
        print(str(cnt) + " send success >>" + msg)
    con.close()
if __name__=="__main__":
    main()
basicReceiver
import pika
def callback(ch, method, properties, body):
    print(body)
    ch.basic_ack(delivery_tag=method.delivery_tag)
def main():
    con = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = con.channel()
    channel.queue_declare(queue='emc')
    channel.basic_consume(callback, queue='emc', no_ack=True)
    channel.start_consuming()
if __name__=="__main__":
    main()

문제점

메세지 유실

  • sender 영역 : rabbitmq 서비스가 재시작되면 메세지가 유실될 수 있다.
  • receiver 영역 : receiver가 받을 준비가 되어 있지 않으면 메세지가 유실될 수 있다.

 

durable queue

다음과 같이 message를 유실하지 않기 위한 안전장치를 마련한다.

  • queue(rabbitmq 서비스) : queue 를 durable = True 로 설정
  • message(sender 설정) : delivery mode를 persistence 로 설정
  • ack(receiver 설정) : message를 수신하였을 때, ack 를 송신하도록 설정

durable queue 주의점

만일 ack를 송신하지 않으면 message가 삭제되지 않으므로 주의한다.

 

durable = True 일 때 message가 삭제되지 않을 조건

queue의 durability 를 True로 설정하면 message 의 생명 주기는 해당 message의 delivery mode에 의하여 결정된다.

  • delivery mode 1 (=non persistent) : 서비스가 종료되면 message는 삭제된다.
  • delivery mode 2 (=persistent) : 서비스가 종료되더라도 삭제되지 않는다.
durableSender.py
import pika
def main():
    con = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = con.channel()
    channel.queue_declare(queue='emc', durable=True)
    cnt = 0
    while(True):
        a = input()
        cnt += 1
        msg = "no {number} Hello Basic Sender".format(number=cnt)
        channel.basic_publish(exchange='', routing_key='emc', body=msg,
                              properties=pika.BasicProperties(delivery_mode=2))
        print(str(cnt) + " send success >>" + msg)
    con.close()
if __name__=="__main__":
    main()
durableReceiver.py
import pika
def callback(ch, method, properties, body):
    print(body)
    ch.basic_ack(delivery_tag=method.delivery_tag)
def main():
    con = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = con.channel()
    channel.queue_declare(queue='emc', durable=True)
    channel.basic_consume(callback, queue='emc')
    channel.start_consuming()
if __name__=="__main__":
    main()

 

load balancing

아래 그림과 같이 2대의 receiver가 message를 받아 처리하는 구조를 가정해 보자.

 

rabbitmq의 기본 balancing 전략은 round-robin 이다.

만일 receiver 중 1대가 매우 바쁘면, 해당 receiver에 전달되어야 할 message는 제때 처리되지 못한 채 queue에 적체될 것이다.

 

busyReceiver.py
import pika
def busyFunction():
    while(True):
        a = input()
        print('i am busy. very busy.')
def callback(ch, method, properties, body):
    print(body)
    busyFunction()
    ch.basic_ack(delivery_tag=method.delivery_tag)
def main():
    con = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = con.channel()
    channel.queue_declare(queue='emc', durable=True)
    channel.basic_consume(callback, queue='emc')
    channel.start_consuming()
if __name__=="__main__":
    main()
//durableSender.py 로 message 를 10 개 발송
1 send success >>no 1 Hello Basic Sender
2 send success >>no 2 Hello Basic Sender
3 send success >>no 3 Hello Basic Sender
4 send success >>no 4 Hello Basic Sender
5 send success >>no 5 Hello Basic Sender
6 send success >>no 6 Hello Basic Sender
7 send success >>no 7 Hello Basic Sender
8 send success >>no 8 Hello Basic Sender
9 send success >>no 9 Hello Basic Sender
10 send success >>no 10 Hello Basic Sender
 
//durableReceiver.py 는 1,3,5,7,9 번째 메세지를 전달받는다.
b'no 1 Hello Basic Sender'
b'no 3 Hello Basic Sender'
b'no 5 Hello Basic Sender'
b'no 7 Hello Basic Sender'
b'no 9 Hello Basic Sender'
...busyReceiver 가 오랜시간(default 1분)동안 heartbeat을 주지 못해 connection closed 되면 unacked message 인 2,4,6,8,10 번째 메세지를 전달 받는다.
b'no 2 Hello Basic Sender'
b'no 4 Hello Basic Sender'
b'no 6 Hello Basic Sender'
b'no 8 Hello Basic Sender'
b'no 10 Hello Basic Sender'
//busyReceiver.py 는 바빠서 2번째 메세지를 받은 뒤 ack 를 보내지 못했다.
b'no 2 Hello Basic Sender'
i am busy. very busy.

 

qos 의 prefetch_count 를 설정하면 ack를 받지 못할 경우, 해당 receiver 로 message 를 발송하지 않는다.

busyReceiver.py
import pika
def busyFunction():
    while(True):
        a = input()
        print('i am busy. very busy.')
def callback(ch, method, properties, body):
    print(body)
    busyFunction()
    ch.basic_ack(delivery_tag=method.delivery_tag)
def main():
    con = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = con.channel()
    channel.queue_declare(queue='emc', durable=True)
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(callback, queue='emc')
    channel.start_consuming()
if __name__=="__main__":
    main()
//durableSender.py 로 message 를 10 개 발송
1 send success >>no 1 Hello Basic Sender
2 send success >>no 2 Hello Basic Sender
3 send success >>no 3 Hello Basic Sender
4 send success >>no 4 Hello Basic Sender
5 send success >>no 5 Hello Basic Sender
6 send success >>no 6 Hello Basic Sender
7 send success >>no 7 Hello Basic Sender
8 send success >>no 8 Hello Basic Sender
9 send success >>no 9 Hello Basic Sender
10 send success >>no 10 Hello Basic Sender
//balancedReceiver.py 는 바빠서 1번째 메세지를 받은 뒤 ack 를 보내지 못했다.
//prefetch-count 를 1로 설정하였기 때문에, ack를 1번 받지 못한 balancedReceiver에게는 다시 message를 전달하지 않는다.
b'no 1 Hello Basic Sender'
i am busy. very busy.
 
//durableReceiver.py 는 2,3,4,5,6,7,8,9 번째 메세지를 전달받는다.
b'no 2 Hello Basic Sender'
b'no 3 Hello Basic Sender'
b'no 4 Hello Basic Sender'
b'no 5 Hello Basic Sender'
b'no 6 Hello Basic Sender'
b'no 7 Hello Basic Sender'
b'no 8 Hello Basic Sender'
b'no 9 Hello Basic Sender'
b'no 10 Hello Basic Sender'
...busyReceiver 가 오랜시간(default 1분)동안 heartbeat을 주지 못해 connection closed 되면 unacked message 인 1 번째 메세지를 전달 받는다.
b'no 1 Hello Basic Sender'

 

댓글 남기기

인기 검색어

01010011에서 더 알아보기

지금 구독하여 계속 읽고 전체 아카이브에 액세스하세요.

Continue reading