prerequisite
rabbitmq 설치 : https://www.rabbitmq.com/download.html
python 설치(다른 언어로 대체 가능하나, 예제는 python3.5임)
rabbitmqctl 사용법
ack 확인되지 않은 msg list
rabbitmqctl list_queues name messages_ready messages_unacknowledged |
모든 queue 정보 삭제(=reset)
rabbitmqctl stop_apprabbitmqctl resetrabbitmqctl start_app |
매번 queue 삭제/리스타트하기 귀찮아서 python으로 짰다.
본인 환경에 맞도록 적당히 수정해서 쓰면 될 듯
pp = pprint.PrettyPrinter(indent=2, compact=True)result = []mqctlExec = '"C:\\Program Files\\RabbitMQ Server\\rabbitmq_server-3.6.3\sbin\\rabbitmqctl" {param}'spExec = """for line in sp.Popen(mqctlExec.format(param='{param}'), shell=True, stdout=sp.PIPE, stderr=sp.PIPE).stdout: result.append(line)"""exec(spExec.format(param='stop_app'))exec(spExec.format(param='reset'))exec(spExec.format(param='start_app'))pp.pprint(result) |
basic queue
기본적으로 mq로 msg 전송/수신하기 위한 최소 설정
1) queue
2) channel
3) message body
basicSender
import pikadef main(): con = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = con.channel() channel.queue_declare(queue='emc') cnt = 0 while(True): a = input() cnt += 1 msg = "no {number} Hello Basic Sender".format(number=cnt) channel.basic_publish(exchange='', routing_key='emc', body=msg) print(str(cnt) + " send success >>" + msg) con.close()if __name__=="__main__": main() |
basicReceiver
import pikadef callback(ch, method, properties, body): print(body) ch.basic_ack(delivery_tag=method.delivery_tag)def main(): con = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = con.channel() channel.queue_declare(queue='emc') channel.basic_consume(callback, queue='emc', no_ack=True) channel.start_consuming()if __name__=="__main__": main() |
문제점
메세지 유실
- sender 영역 : rabbitmq 서비스가 재시작되면 메세지가 유실될 수 있다.
- receiver 영역 : receiver가 받을 준비가 되어 있지 않으면 메세지가 유실될 수 있다.
durable queue
다음과 같이 message를 유실하지 않기 위한 안전장치를 마련한다.
- queue(rabbitmq 서비스) : queue 를 durable = True 로 설정
- message(sender 설정) : delivery mode를 persistence 로 설정
- ack(receiver 설정) : message를 수신하였을 때, ack 를 송신하도록 설정
durable queue 주의점
만일 ack를 송신하지 않으면 message가 삭제되지 않으므로 주의한다.
durable = True 일 때 message가 삭제되지 않을 조건
queue의 durability 를 True로 설정하면 message 의 생명 주기는 해당 message의 delivery mode에 의하여 결정된다.
- delivery mode 1 (=non persistent) : 서비스가 종료되면 message는 삭제된다.
- delivery mode 2 (=persistent) : 서비스가 종료되더라도 삭제되지 않는다.
durableSender.py
import pikadef main(): con = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = con.channel() channel.queue_declare(queue='emc', durable=True) cnt = 0 while(True): a = input() cnt += 1 msg = "no {number} Hello Basic Sender".format(number=cnt) channel.basic_publish(exchange='', routing_key='emc', body=msg, properties=pika.BasicProperties(delivery_mode=2)) print(str(cnt) + " send success >>" + msg) con.close()if __name__=="__main__": main() |
durableReceiver.py
import pikadef callback(ch, method, properties, body): print(body) ch.basic_ack(delivery_tag=method.delivery_tag)def main(): con = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = con.channel() channel.queue_declare(queue='emc', durable=True) channel.basic_consume(callback, queue='emc') channel.start_consuming()if __name__=="__main__": main() |
load balancing
아래 그림과 같이 2대의 receiver가 message를 받아 처리하는 구조를 가정해 보자.
rabbitmq의 기본 balancing 전략은 round-robin 이다.
만일 receiver 중 1대가 매우 바쁘면, 해당 receiver에 전달되어야 할 message는 제때 처리되지 못한 채 queue에 적체될 것이다.
busyReceiver.py
import pikadef busyFunction(): while(True): a = input() print('i am busy. very busy.')def callback(ch, method, properties, body): print(body) busyFunction() ch.basic_ack(delivery_tag=method.delivery_tag)def main(): con = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = con.channel() channel.queue_declare(queue='emc', durable=True) channel.basic_consume(callback, queue='emc') channel.start_consuming()if __name__=="__main__": main() |
//durableSender.py 로 message 를 10 개 발송1 send success >>no 1 Hello Basic Sender2 send success >>no 2 Hello Basic Sender3 send success >>no 3 Hello Basic Sender4 send success >>no 4 Hello Basic Sender5 send success >>no 5 Hello Basic Sender6 send success >>no 6 Hello Basic Sender7 send success >>no 7 Hello Basic Sender8 send success >>no 8 Hello Basic Sender9 send success >>no 9 Hello Basic Sender10 send success >>no 10 Hello Basic Sender //durableReceiver.py 는 1,3,5,7,9 번째 메세지를 전달받는다.b'no 1 Hello Basic Sender'b'no 3 Hello Basic Sender'b'no 5 Hello Basic Sender'b'no 7 Hello Basic Sender'b'no 9 Hello Basic Sender'...busyReceiver 가 오랜시간(default 1분)동안 heartbeat을 주지 못해 connection closed 되면 unacked message 인 2,4,6,8,10 번째 메세지를 전달 받는다.b'no 2 Hello Basic Sender'b'no 4 Hello Basic Sender'b'no 6 Hello Basic Sender'b'no 8 Hello Basic Sender'b'no 10 Hello Basic Sender'//busyReceiver.py 는 바빠서 2번째 메세지를 받은 뒤 ack 를 보내지 못했다.b'no 2 Hello Basic Sender'i am busy. very busy. |
qos 의 prefetch_count 를 설정하면 ack를 받지 못할 경우, 해당 receiver 로 message 를 발송하지 않는다.
busyReceiver.py
import pikadef busyFunction(): while(True): a = input() print('i am busy. very busy.')def callback(ch, method, properties, body): print(body) busyFunction() ch.basic_ack(delivery_tag=method.delivery_tag)def main(): con = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = con.channel() channel.queue_declare(queue='emc', durable=True) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='emc') channel.start_consuming()if __name__=="__main__": main() |
//durableSender.py 로 message 를 10 개 발송1 send success >>no 1 Hello Basic Sender2 send success >>no 2 Hello Basic Sender3 send success >>no 3 Hello Basic Sender4 send success >>no 4 Hello Basic Sender5 send success >>no 5 Hello Basic Sender6 send success >>no 6 Hello Basic Sender7 send success >>no 7 Hello Basic Sender8 send success >>no 8 Hello Basic Sender9 send success >>no 9 Hello Basic Sender10 send success >>no 10 Hello Basic Sender//balancedReceiver.py 는 바빠서 1번째 메세지를 받은 뒤 ack 를 보내지 못했다.//prefetch-count 를 1로 설정하였기 때문에, ack를 1번 받지 못한 balancedReceiver에게는 다시 message를 전달하지 않는다.b'no 1 Hello Basic Sender'i am busy. very busy. //durableReceiver.py 는 2,3,4,5,6,7,8,9 번째 메세지를 전달받는다.b'no 2 Hello Basic Sender'b'no 3 Hello Basic Sender'b'no 4 Hello Basic Sender'b'no 5 Hello Basic Sender'b'no 6 Hello Basic Sender'b'no 7 Hello Basic Sender'b'no 8 Hello Basic Sender'b'no 9 Hello Basic Sender'b'no 10 Hello Basic Sender'...busyReceiver 가 오랜시간(default 1분)동안 heartbeat을 주지 못해 connection closed 되면 unacked message 인 1 번째 메세지를 전달 받는다.b'no 1 Hello Basic Sender' |





댓글 남기기