Skip to content

[RabbitMQ] Publish/Subscribe 패턴

2012/01/31

Publish/Subscribe

RabbitMQ 홈페이지에 3번째 Tutorial은 Publish/Subscribe 패턴에 대한 구현방법 입니다.

지난 tutorial까지는 한개의 message가 한개의 queue에 정확히 전달되기만 하면 된다는 가정하에 한개의 work queue를 만들었습니다.  그리고 연결된 여러개의 Consumer가 있다면 round-robin 방식을 순차적으로 한개의 message가 하나의 Consumer에 전달되어 처리되었습니다.

하지만 Publish/Subscribe 패턴은 완전히 다른 방식입니다. 한개의 Produser에서 한개의 message를 발행하지만 여러 Consumer에서 해당 message를 받는(구독)하는 방식입니다.

이러한 방식이 필요한 이유는 하나의 Consumer에서 하나의 message를 받아 두가지 이상의 다른 작업을 실행 할 수도 있지만 완전히 다른 작업이면서 동시 처리가 필요한 경우가 있기 때문입니다. 간단한 예로 logging system에서 message를 받아 DB에 저장을 하면서 실시간 모니터링도 하는 경우 입니다.

Exchanges

Publish/Subscribe 패턴의 동작방식은 다음과 같습니다.

 Produser에서 message를 발생해서 누군가(exchange)에게 넘겨주고 누군가는 그 message를 기다리고 있는 여러 Consumer들이 바라보고 있는 각 queue들에 message를 전달해 주게 됩니다. 즉, produser는 오직 message를 exchange에게만 전달해 주기만 하면 되며 실제로 어떤 consumer가 받을 지에 대해서는 신경쓰지 않아도 됩니다. 대신 exchange는 produser에게 message를 받고 이를 consumer에게 전달하기 위해 queue에 넣는 것만 신경쓰면 됩니다. 그래서 exchange는 message를 받았을때 정확히 어떤 일을 해야하는지 알아야 하며 그러한 룰은 exchange의 type에 의해 정의됩니다.

exchange의 type에는 {direct, topic, headers, fanout}이 있습니다. 그리고 이번 포스트에서는 logging system의 예를 통해 fanout에 동작방식을 알아보겠습니다.

fanout 타입은 exchange가 알고 있는 모든 queue에 message를 전달하는 방식 입니다.

그래서 우선 아래와 같이 exchange를 선언해 주고,

channel.exchageDeclare("logs", "fanout"); //logs라는 이름의 exchange를 fanout 타입으로 선언

 produser에서는 아래와 같이 exchange에 message를 전달해 주면 produser의 일은 끝나게 됩니다.

channel.basicPublish("logs", "", null, message.getByte());

Temporary queues

queue에 직접 특정 이름을 지정하고  사용할 수도 있지만 이름을 지정하게 되면 producser와 consumer간에 message를 주고받을때 특정 queue의 이름을 지정해야만 합니다.

하지만 logging system 에는 특정 queue 이름이 정의되어 있고 이것들만 써야한다면 번거로울수 있습니다. exchange에 바인딩되는 모든 consumer에게 실시간으로(지난 message는 버리고) message를 받을 수 있으면 됩니다.

그렇기 때문에 consumer가 RabbitMQ Server에 접속할때 server에서 랜덤한 이름의 새로운 queue가 생성해서 exchange가 message를 넣을수 있도록 해주면 되고 접속이 끊겼을때는 queue가 바로 삭제되면 됩니다.

그래서 다음과 같이 queue를 파라메터없이 queueDeclare()로 생성하면 server에서 랜덤한 이름 만들어서 주며, non-durable, exclusive, autodelete 한 속성을 가지는 queue가 생성됩니다.

String queueName = channel.queueDeclare().getQueue();

Bindings

마지막으로 exchange와 queue를 만들었다면 둘을 바인딩하주면 됩니다.

channel.queueBind(queueName, “logs”, “”);

Test Result

다음과 같이 produser와 consumer를 구성했습니다.

Produser는 1초에 한번씩 1~10000까지 순차적으로 숫자를 message로 보낸다.
각 Consumer는 1초에 한번씩 queue에서 받은 message를 화면에 출력한다.

이때 Produser는 logs라는 exchange에 보내고 Consumer는 서버에서 중복되지 않는 랜덤한 queue의 이름을 받아서 생성하고 logs라는 exchange에 바인딩합니다.

Produser    Consumer1
1              1
2              2
3              3
                       Consumer2
4              4           4
5              5           5

위와 같이 Consumer2가 실행되고 난 이후 message부터 Consumer1과 함께 동일한 message를 받으며 추가로 Consumer를 계속 추가하더라도 소스코드 수정없이 새로운 queue가 생성되어 message를 받을 수 있게 됩니다.

No comments yet

답글 남기기

아래 항목을 채우거나 오른쪽 아이콘 중 하나를 클릭하여 로그 인 하세요:

WordPress.com 로고

WordPress.com의 계정을 사용하여 댓글을 남깁니다. 로그아웃 / 변경 )

Twitter 사진

Twitter의 계정을 사용하여 댓글을 남깁니다. 로그아웃 / 변경 )

Facebook 사진

Facebook의 계정을 사용하여 댓글을 남깁니다. 로그아웃 / 변경 )

Google+ photo

Google+의 계정을 사용하여 댓글을 남깁니다. 로그아웃 / 변경 )

%s에 연결하는 중

%d 블로거가 이것을 좋아합니다: