Skip to content

[RabbitMQ] Fair dispatch(prefetch 설정)

2012/01/27

Fair dispatch

RabbitMQ  Tutorials에 위 그림과 같이 구성해서 테스트를 하였습니다.

Produser는 1초에 한번씩 1~10000까지 순차적으로 숫자를 message로 보낸다.
각 Consumer는 3초에 한번씩 queue에서 받은 message를 화면에 출력한다.

Consumer의 코드는 다음과 같습니다.

   Connection connection = factory.newConnection();
   Channel channel = connection.createChannel();
   channel.queueDeclare(QUEUE_NAME, true, false, false, null); // durable=true
// channel.basicQos(1);

   System.out.println("[*] Waiting for message..");
   QueueingConsumer consumer = new QueueingConsumer(channel);
   channel.basicConsume(QUEUE_NAME, false, consumer); //autoAck=false
   while (true) {
       QueueingConsumer.Delivery delivery = consumer.nextDelivery();
       if (delivery.getBody().length > 0) {
           String message = new String(delivery.getBody());
           System.out.println(" [x] Received '" + message + "'");
           channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
           Thread.sleep(3 * 1000);
       }
   }

만약 위와 같이 channel.basicQos(1) 가 주석처리가 되어 있는 상황(=unlimited)인 상태에서 Consumer들이 실행된 상태에서 Produser가 0부터 message를 보내면 아래와 같이 출력이 됩니다.

Consumer1 | Consumer2
    0           1
    2           3
    4           5

하지만 이때 Produser가 15까지 메시지를 보냈었고 Consumer2를 죽이면 Consumer1은 다음과 비슷하게 동작합니다.

Consumer1
    6
    8
   10
   12
   14
   16
    7
   13

위와 같이 동작하는 이유는 prefetch 때문입니다.

Consumer에서 autoAck=false로 설정했기때문에 Consumer에서 ack를 보내주기 전까지는 queue에 남아있기 때문에 Consumer2가 죽었을때 Consumer1에서 6을 처리하고 다음 message를 queue에서 꺼내와 7을 처리할것 같지만 그렇지 않습니다. queue에 message가 들어오면 Consumer의 prefetchCount(basicQos(N))만큼의 message를 미리 보내게 됩니다.

즉, Consumer들은 위에 소스에서는 unlimited 였기 때문에 Consumer2가 죽은 시점에서 이미 Consumer1에는 (6,8,10,12,14 )가 prefetch되어 있고 Consumer2에는 (7,9,11,13,15)가 prefetch 되어 있습니다. 하지만 Consumer2가 죽는 순간 connection이 끊기고 (7,9,11,13,15)는 unacknowleged message가 되기 때문에 바로 처리 되지 않고 Consumer1에서 prefetch된 message가 다 처리하고 난 뒤에 처리가 됩니다. 하지만 여기에서 unacknowleged message는 기존 순서와 상관없이 랜덤하게 Consumer1에 전송이 됩니다.

결국 여러개의 Consumer를 사용하는 상황에서 message에 대한 순차적인 처리가 중요한 상황이라면 prefetchCount=1(channel.basicQos(1))로 설정해야 합니다.

No comments yet

답글 남기기

아래 항목을 채우거나 오른쪽 아이콘 중 하나를 클릭하여 로그 인 하세요:

WordPress.com 로고

WordPress.com의 계정을 사용하여 댓글을 남깁니다. 로그아웃 / 변경 )

Twitter 사진

Twitter의 계정을 사용하여 댓글을 남깁니다. 로그아웃 / 변경 )

Facebook 사진

Facebook의 계정을 사용하여 댓글을 남깁니다. 로그아웃 / 변경 )

Google+ photo

Google+의 계정을 사용하여 댓글을 남깁니다. 로그아웃 / 변경 )

%s에 연결하는 중

%d 블로거가 이것을 좋아합니다: