Skip to content

[Node.js + RabbitMQ] Node.js + socket.io + RabbitMQ 이용한 실시간 메시지 처리

2012/03/07

[Node.js + RabbitMQ] Node.js + socket.io + RabbitMQ 이용한 실시간 메시지 처리

Node.js와 RabbitMQ를 연동하기 위해 여러 모듈들이 있으며 이중에서 가장 많이 사용? 하고 활발한 모듈은 node-amqp 이거 인거 같습니다.

물론 RabbitMQ 공식 블로그에 보면 rabbitmq-nodejs-rabbitjs 에 대한 포스트가 있으며 거기에서 rabbit.js를 소개하고 있지만 결국 node-amqp를 이용해서 3가지 메세지 패턴(Publish/Subscribe, Request/Reply, PUSH/PULL) 을 구현해 놓은 것인데 각 패턴별로 고정된 옵션설정이 되어 있어서(exchange나 queue에 대한) 열심히 수정하다가 결국 막혔습니다.

대신 rabbit.js를 참고해서 나름대로 원하는 것을 만들어 보았습니다.

일단 node.js, npm, express, rabbitmq 가 설치되어 있다는 가정하에 진행해 보겠습니다. 기본적으로 express를 이용해 프로젝트를 만들면 다음과 같습니다.

-bash-3.2$ express ex3
  create : ex3
  create : ex3/package.json
  create : ex3/app.js
  create : ex3/public
  create : ex3/public/javascripts                   // app에서 필요한 javascripts 폴더
  create : ex3/public/images                        // app에서 필요한 image 폴더
  create : ex3/public/stylesheets                 // app에서 필요한 css 폴더
  create : ex3/public/stylesheets/style.css  // 기본 css
  create : ex3/routes
  create : ex3/routes/index.js                // 라우트를 위한 js (어떤 view를 보여줄지를 결정한다)
  create : ex3/views
  create : ex3/views/layout.jade                 // 기본 레이아웃(jade 들의 공통 head)
  create : ex3/views/index.jade                  // layout을 바탕으로 body 부분
  dont forget to install dependencies:    $ cd ex3 && npm install
                                           // 디펜던시한 모듈(jade 등)들을 또 설치하라는 안내 문구

기본 상태에서 실행했을때 동작 흐름은 다음과 같습니다.

클라이언트에서 요청이 오면 app.js 에서 app.get(‘/’, routes.index); 으로 인해 어떤 페이지가 보여질지 결정되며 routes.index는 routes 폴더에 index.js 파일에 exports.index로 선언된 내용을 보여주겠다는 것으로 만약 app.get(‘/’, routes.test); 라고 선언해면 routes에 test.js 파일을 찾는 것이 아니라 당연히 index.js 에 exports.test로 선언된 내용을 찾게 됩니다.

그리고 exports.index에 res.render(‘index’, { title: ‘Express’ }) 에서 index 는 /views 폴더 안에 index.jade를 렌더링 해서 response로 전달하며 { title: ‘Express’ } 은 index.jade 에 전달되어 Welcome to #{title}를 Welcome to Express로 화면에 보여줍니다.

이때 기본적으로 layout.jade가 head가 되고 index.jade가 body가 됩니다.

인제 app.js 를 다음과 같이 수정합니다.


//rabbitmq 서버에 접속(호스트, 포트, user, password, vhost 등을 지정할수 있습니다.)
var connec = amqp.createConnection({ host: 'localhost'
                                       , port: 5673
                                       , login: 'guest'
                                       //, password: 'quest'
                                       , vhost: '/'
                                         });
var io = require('socket.io');
var socketioserver = io.listen(app);
var _subscriptions = {};  //접속한 queue와 consumerTag를 저장하여 클라이언트 접속이 끊겼을때 unsubscribe 할수 있도록 합니다.

//socket.io 클라이언트가 접속할때 http://host/pub으로 연결(connection) 이벤트 발생시
socketioserver.of("/pub").on('connection', function(connection) {    
	sys.p("pub on connection");
          //rabbitmq의 exchage를 생성합니다. exchageName = node-conn-share1, exchangeType = topic
	var exchange1 = connec.exchange('node-conn-share1', {type: 'topic'});
	connection.on('disconnect', function(){ //socket.io 클라이언트의 접속이 끊어졌을때 할일을 정의
		sys.p("disconnect");
	});
          // 클라이언트에서 send()를 했을때 할일을 정의
	connection.on('message', function(msg){
                    //클라이언트에서 json.send()로 json형태로 보냅니다.
		sys.p("message : " + msg.message + ", routingKey : " + msg.routingKey );
                   //exchage에 클라이언트에서 보낸 routingKey와 message를 세팅해서 publish 합니다.
		exchange1.publish(msg.routingKey, msg.message, {contentType: 'text/plain'});
	});
});

socketioserver.of("/sub").on('connection', function(connection) {
	sys.p("sub on connection");
	var exchange1 = connec.exchange('node-conn-share1', {type: 'topic'});

	connection.on('bindQueue', function(opt){
		sys.p('bindQueue routingKey : ' + opt.routingKey);
		//queueName은 rabbitmq server가 자동으로 생성, durable=false, autoDelete=false
		var q1 = connec.queue('', { durable: false, autoDelete: false }, function(){
			//queue와 exchange를 바인딩하며 이때 exchange에서 routingKey에 해당하는 내용만 받도록 함
			q1.bind(exchange1, opt.routingKey);
            //바인딩이 완료가 되었을때 실행
			q1.on('queueBindOk', function() {
				sys.p("Queue " + q1.name + " is open");
                //queue에서 메세지를 받아 전송하도록 하며 1개씩 받아와 성공시 ack를 보내는 방식으로 한다.
				q1.subscribe({ ack: true, prefetchCount: 1},
					function (m, headers, deliveryInfo) {
						sys.p("subscribe : " + m.data.toString());
						sys.p(deliveryInfo.deliveryTag + " : "+ deliveryInfo.routingKey + " : " + m.contentType);
                        //routingKey와 msg를 json으로 묶어서 보낸다.
						var msg = {routingKey: deliveryInfo.routingKey, msg: m.data.toString()};
						connection.emit('data', msg); //클라이언트에 보냄
						q1.shift() //다음 메시지 받아오기
				}).addCallback(function(ok) {
					sys.p("subscribe callback ok : " + ok.consumerTag);
					//connection.sessionId에서 만든 queue와 consumerTag를 기억
					_subscriptions[connection.id] = {'queue': q1, 'consumerTag': ok.consumerTag};
				});
			});
		});
	});
	//클라이언트 연결이 끊겼을때
	connection.on('disconnect', function(){
		sys.p("disconnect");
		var id = connection.id;
		var sub = _subscriptions[id];
		//_subscriptions에서 지우고
		delete _subscriptions[id];
		//해당 queue에 consumerTag에 해당하는 consumer가 더이상 soubscribe 하지 않도록 명시적으로 선언
		sub.queue.unsubscribe(sub.consumerTag); 
		sub.queue.destroy(); //해당 queue를 삭제한다.
		sys.p("unsubscribe " + id + ", " + sub.queue.name + ", " + sub.consumerTag);
	});
});

app.get('/sub', routes.sub);
app.get('/pub', routes.pub);


pub/sub 클라이언트를 만들어서 위와 같이 테스트 하였습니다.
pub 에서 메시지와 routingKey를 보내고 sub에서는 자신들이 받을 routingKey를 지정합니다.
그럼 위에 그림과 같이 실시간 메시지 전송이 가능하며 응용하면 그룹채팅을 하면서 그룹채팅 사용자간에 귓속말 기능도 가능할것 같습니다.

No comments yet

답글 남기기

아래 항목을 채우거나 오른쪽 아이콘 중 하나를 클릭하여 로그 인 하세요:

WordPress.com 로고

WordPress.com의 계정을 사용하여 댓글을 남깁니다. 로그아웃 / 변경 )

Twitter 사진

Twitter의 계정을 사용하여 댓글을 남깁니다. 로그아웃 / 변경 )

Facebook 사진

Facebook의 계정을 사용하여 댓글을 남깁니다. 로그아웃 / 변경 )

Google+ photo

Google+의 계정을 사용하여 댓글을 남깁니다. 로그아웃 / 변경 )

%s에 연결하는 중

%d 블로거가 이것을 좋아합니다: