prerequisite
rabbitmq 설치 : https://www.rabbitmq.com/download.html
python 설치(다른 언어로 대체 가능하나, 예제는 python3.5임)
rabbitmqctl 사용법
ack 확인되지 않은 msg list
rabbitmqctl list_queues name messages_ready messages_unacknowledged |
모든 queue 정보 삭제(=reset)
rabbitmqctl stop_app rabbitmqctl reset rabbitmqctl start_app |
매번 queue 삭제/리스타트하기 귀찮아서 python으로 짰다.
본인 환경에 맞도록 적당히 수정해서 쓰면 될 듯
pp = pprint.PrettyPrinter(indent = 2 , compact = True ) result = [] mqctlExec = '"C:\\Program Files\\RabbitMQ Server\\rabbitmq_server-3.6.3\sbin\\rabbitmqctl" {param}' spExec = """for line in sp.Popen(mqctlExec.format(param='{param}'), shell=True, stdout=sp.PIPE, stderr=sp.PIPE).stdout: result.append(line)""" exec (spExec. format (param = 'stop_app' )) exec (spExec. format (param = 'reset' )) exec (spExec. format (param = 'start_app' )) pp.pprint(result) |
basic queue
기본적으로 mq로 msg 전송/수신하기 위한 최소 설정
1) queue
2) channel
3) message body
basicSender
import pika def main(): con = pika.BlockingConnection(pika.ConnectionParameters( 'localhost' )) channel = con.channel() channel.queue_declare(queue = 'emc' ) cnt = 0 while ( True ): a = input () cnt + = 1 msg = "no {number} Hello Basic Sender" . format (number = cnt) channel.basic_publish(exchange = ' ', routing_key=' emc', body = msg) print ( str (cnt) + " send success >>" + msg) con.close() if __name__ = = "__main__" : main() |
basicReceiver
import pika def callback(ch, method, properties, body): print (body) ch.basic_ack(delivery_tag = method.delivery_tag) def main(): con = pika.BlockingConnection(pika.ConnectionParameters( 'localhost' )) channel = con.channel() channel.queue_declare(queue = 'emc' ) channel.basic_consume(callback, queue = 'emc' , no_ack = True ) channel.start_consuming() if __name__ = = "__main__" : main() |
문제점
메세지 유실
- sender 영역 : rabbitmq 서비스가 재시작되면 메세지가 유실될 수 있다.
- receiver 영역 : receiver가 받을 준비가 되어 있지 않으면 메세지가 유실될 수 있다.
durable queue
다음과 같이 message를 유실하지 않기 위한 안전장치를 마련한다.
- queue(rabbitmq 서비스) : queue 를 durable = True 로 설정
- message(sender 설정) : delivery mode를 persistence 로 설정
- ack(receiver 설정) : message를 수신하였을 때, ack 를 송신하도록 설정
durable queue 주의점
만일 ack를 송신하지 않으면 message가 삭제되지 않으므로 주의한다.
durable = True 일 때 message가 삭제되지 않을 조건
queue의 durability 를 True로 설정하면 message 의 생명 주기는 해당 message의 delivery mode에 의하여 결정된다.
- delivery mode 1 (=non persistent) : 서비스가 종료되면 message는 삭제된다.
- delivery mode 2 (=persistent) : 서비스가 종료되더라도 삭제되지 않는다.
durableSender.py
import pika def main(): con = pika.BlockingConnection(pika.ConnectionParameters( 'localhost' )) channel = con.channel() channel.queue_declare(queue = 'emc' , durable = True ) cnt = 0 while ( True ): a = input () cnt + = 1 msg = "no {number} Hello Basic Sender" . format (number = cnt) channel.basic_publish(exchange = ' ', routing_key=' emc', body = msg, properties = pika.BasicProperties(delivery_mode = 2 )) print ( str (cnt) + " send success >>" + msg) con.close() if __name__ = = "__main__" : main() |
durableReceiver.py
import pika def callback(ch, method, properties, body): print (body) ch.basic_ack(delivery_tag = method.delivery_tag) def main(): con = pika.BlockingConnection(pika.ConnectionParameters( 'localhost' )) channel = con.channel() channel.queue_declare(queue = 'emc' , durable = True ) channel.basic_consume(callback, queue = 'emc' ) channel.start_consuming() if __name__ = = "__main__" : main() |
load balancing
아래 그림과 같이 2대의 receiver가 message를 받아 처리하는 구조를 가정해 보자.
rabbitmq의 기본 balancing 전략은 round-robin 이다.
만일 receiver 중 1대가 매우 바쁘면, 해당 receiver에 전달되어야 할 message는 제때 처리되지 못한 채 queue에 적체될 것이다.
busyReceiver.py
import pika def busyFunction(): while ( True ): a = input () print ( 'i am busy. very busy.' ) def callback(ch, method, properties, body): print (body) busyFunction() ch.basic_ack(delivery_tag = method.delivery_tag) def main(): con = pika.BlockingConnection(pika.ConnectionParameters( 'localhost' )) channel = con.channel() channel.queue_declare(queue = 'emc' , durable = True ) channel.basic_consume(callback, queue = 'emc' ) channel.start_consuming() if __name__ = = "__main__" : main() |
//durableSender .py 로 message 를 10 개 발송 1 send success >>no 1 Hello Basic Sender 2 send success >>no 2 Hello Basic Sender 3 send success >>no 3 Hello Basic Sender 4 send success >>no 4 Hello Basic Sender 5 send success >>no 5 Hello Basic Sender 6 send success >>no 6 Hello Basic Sender 7 send success >>no 7 Hello Basic Sender 8 send success >>no 8 Hello Basic Sender 9 send success >>no 9 Hello Basic Sender 10 send success >>no 10 Hello Basic Sender //durableReceiver .py 는 1,3,5,7,9 번째 메세지를 전달받는다. b 'no 1 Hello Basic Sender' b 'no 3 Hello Basic Sender' b 'no 5 Hello Basic Sender' b 'no 7 Hello Basic Sender' b 'no 9 Hello Basic Sender' ...busyReceiver 가 오랜시간(default 1분)동안 heartbeat을 주지 못해 connection closed 되면 unacked message 인 2,4,6,8,10 번째 메세지를 전달 받는다. b 'no 2 Hello Basic Sender' b 'no 4 Hello Basic Sender' b 'no 6 Hello Basic Sender' b 'no 8 Hello Basic Sender' b 'no 10 Hello Basic Sender' //busyReceiver .py 는 바빠서 2번째 메세지를 받은 뒤 ack 를 보내지 못했다. b 'no 2 Hello Basic Sender' i am busy. very busy. |
qos 의 prefetch_count 를 설정하면 ack를 받지 못할 경우, 해당 receiver 로 message 를 발송하지 않는다.
busyReceiver.py
import pika def busyFunction(): while ( True ): a = input () print ( 'i am busy. very busy.' ) def callback(ch, method, properties, body): print (body) busyFunction() ch.basic_ack(delivery_tag = method.delivery_tag) def main(): con = pika.BlockingConnection(pika.ConnectionParameters( 'localhost' )) channel = con.channel() channel.queue_declare(queue = 'emc' , durable = True ) channel.basic_qos(prefetch_count = 1 ) channel.basic_consume(callback, queue = 'emc' ) channel.start_consuming() if __name__ = = "__main__" : main() |
//durableSender .py 로 message 를 10 개 발송 1 send success >>no 1 Hello Basic Sender 2 send success >>no 2 Hello Basic Sender 3 send success >>no 3 Hello Basic Sender 4 send success >>no 4 Hello Basic Sender 5 send success >>no 5 Hello Basic Sender 6 send success >>no 6 Hello Basic Sender 7 send success >>no 7 Hello Basic Sender 8 send success >>no 8 Hello Basic Sender 9 send success >>no 9 Hello Basic Sender 10 send success >>no 10 Hello Basic Sender //balancedReceiver .py 는 바빠서 1번째 메세지를 받은 뒤 ack 를 보내지 못했다. //prefetch-count 를 1로 설정하였기 때문에, ack를 1번 받지 못한 balancedReceiver에게는 다시 message를 전달하지 않는다. b 'no 1 Hello Basic Sender' i am busy. very busy. //durableReceiver .py 는 2,3,4,5,6,7,8,9 번째 메세지를 전달받는다. b 'no 2 Hello Basic Sender' b 'no 3 Hello Basic Sender' b 'no 4 Hello Basic Sender' b 'no 5 Hello Basic Sender' b 'no 6 Hello Basic Sender' b 'no 7 Hello Basic Sender' b 'no 8 Hello Basic Sender' b 'no 9 Hello Basic Sender' b 'no 10 Hello Basic Sender' ...busyReceiver 가 오랜시간(default 1분)동안 heartbeat을 주지 못해 connection closed 되면 unacked message 인 1 번째 메세지를 전달 받는다. b 'no 1 Hello Basic Sender' |