Building mobile apps with Sencha Touch and Phonegap

It’s very likely that you’ve heard of node.js, the event-based JavaScript powered server – and to go hand in hand with it, SocketIO: the cross-browser websockets implementation that plugs right into it.

If not, I’d recommend you go look that up, then come back here so we can talk…

Ready? Good.

What makes SocketIO so appealing is it’s client-side implementation – it is completely cross-browser compatible and basically makes it very simple for you to read and write directly to web clients from your server implementation and making that real-time app you’ve always wanted to make without implementing something wierd like COMET, Twisted, Orbited or STOMP.

The only drawback is that in order to use SocketIO you need to use Node.js, which means you have to use a JS stack server-side… and that just doesn’t feel right.

Lucky for us, there’s a python event-based server very similar to node.js that can be implemented instead, and even better, it has a SocketIO plugin that speaks the same language as the SocketIO client-side library. So now you can implement SocketIO with a python stack.

With this server implementation you can develop a SocketIO based site that runs entirely in python, which is great.

Unfortunately, if you’re trying to add real-time to your existing site the last thing you want to have to do is re-write everything in another stack (that would just be silly) or be forced to use the Tornado server to build your app, ideally you will want to just be able to bolt it on.

That’s where RabbitMQ comes in – RabbitMQ is an AMPQ queue implementation – you can set it up to pipe information throughout your application and transport messages, commands or data to different clients and functions in your app. What RabbitMQ will do for you is serve as a backbone for you to be able to pipe events out of your traditional website implementation, hold onto those messages and pipe them into your Tornado server.

Pythons AMPQ client is called Pika, so, if like me you are using Django you can bridge the two using Django’s eventing system.

In order to get SocketIO on your front-end to talk to TornadIO and in turn receive messages from your webapp you need to pipe those messages via RabbitMQ into a TornadIO listener that is implementing Pika to recieve them.

The traditional implementation for one of these apps would be a chat server, but as this could easily be done with tornado alone and would have few tie-ins with your webapp, so for this how-to we’ll use the example of sending notifications to your users when they receive a notification that is specific to their UserID (like an email, or one-on-one chat invitation).

Setting up your front end:

To get things going, let’s set up your front end. Here all we need to do is link to the SocketIO library on the SocketIO CDN and implement a few functions when the page loads that will¬†initiate¬†your connection to your TornadIO server.

<!-- Include the SockeIO script -->
<script type="text/javascript" src="http://cdn.socket.io/stable/socket.io.js"></script>
<script type="text/javascript" src="https://ajax.googleapis.com/ajax/libs/jquery/1.7.0/jquery.min.js"></script>

<!-- This library is a little better at handling malformed JSON -->
<script type="text/javascript" src="{{ STATIC_URL }}js/json2.js"></script>

<script type="text/javascript">// <![CDATA[
// For backward compatability you need a flash transport layer
WEB_SOCKET_SWF_LOCATION = 'http://cdn.socket.io/stable/WebSocketMain.swf';

// This will st up our listener
function setupConnection(key) {
   var liveServer = 'localhost';
   var s = new io.Socket(liveServer,
                           {
                            port: 8081,
                            rememberTransport: false,
                            resource: 'messaging/'+key
                            // 'messaging' is a channel resource and 'key' can be anything
                           });
   s.connect();

   // Do this stuff when the connection is readdy, we just append to a UL
   s.addEvent('connect', function(data) {
       $('#statuses').append(('--> Connected!
'));
   });

   // Do stuff when you receive something from the server
   // We assume it's JSON data here, this isn't necessarily pretty!
   s.addEvent('message', function(data) {
       try {
           var obj = JSON.parse(data);
           $('#statuses').prepend('
    <li>' + obj.text + '</li>

')
       }
       catch(err) {
           $('#statuses').prepend('
    <li>FAIL!</li>

')
       }
   });
}
// ---- END CONNECTOR CODE ----

$(document).ready(function() {

   $('#start-connection').click(function() {
       $('#statuses').append(('--> Starting connection
'));
       setupConnection('channelname')
   });
});
// ]]></script>

The code above is slightly adapted boilerplate from the SocketIO documentation. First, we set up a connection to the SocketIO server, connect to the server and finally set up some event handlers to fire when things happen. In our case those handlers simply populate an un-ordered list so we can capture the messages on screen.

Tornado and TornadIO

First of all you will need to get your hands on Tornado, TornadIO and Pika – all of which can be installed via easy_install, so I won’t go into it here.

To get the server side part working, we need to set up Tornado with a Tornadio implementation, and have TornadIO listen to RabbitMQ to bridge the queue down to the browser-level elements.

The code we will be working with is this:

# Instantiate the Tornadio router
MyRouter = tornadio.get_router(MyConnection, resource='messaging', extra_re='w+', extra_sep='/')

# When the script starts, start the event loop!
if __name__ == "__main__":
# Port defintions for SocketIO, make sure these match your front end
sio_port = 8081
fpp = 844

# configure the webapp
application = tornado.web.Application(
# Declare your routes here, you can also display HTML files in this list
[MyRouter.route()],
# We support everything in this case
enabled_protocols = ['websocket',
'flashsocket',
'xhr-multipart',
'htmlfile',
'xhr-polling'],
flash_policy_port = fpp,
# Set up a ROOT variable that points to where the Flashpolicy XML is
# You can find this in the TornadIO git
flash_policy_file = op.join(ROOT, 'flashpolicy.xml'),
socket_io_port = sio_port
)

# We're using logbook for logging all errors here, you don;t have to
with handler.threadbound():
log.info("Starting SocketIO Server...")
tornadio.server.SocketServer(application)

The code above, when put into a file called socket_server.py will start your SocketIO server. Some interesting aprts to note are that MyConnection hasn’t been declared yet, this is a class that will manage the various events that happen to the tornadIO implementation, so running this will fail at the moment.

The TornadIO setup is pretty straightforward and is actually very well documented at the TornadIO git repository, all the real action happens in MyRouter, which is documented below:

class MyConnection(tornadio.SocketConnection):
"""TornadIO Socket Connection"""
def __init__(self, protocol, io_loop, heartbeat_interval):
#We override init so we can apply data to the connection
self.this_id = None
self.running_thread = None
tornadio.SocketConnection.__init__ (self, protocol, io_loop, heartbeat_interval)

def on_open(self, *args, **kwargs):
# When the connection opens, get the data passed through the connection
# via the router - this way each connection can be unique.

self.this_id = str(kwargs['extra'])

#Our implementation of RabbitMQ is threaded, this is because
#In order to pipe data from the server down, we need an open line to the
#RabbitMQ server.

a = thisRabbit(self)
self.running_thread = a
a.start()

def on_message(self, message):
# Unused as this is a server-&gt;client bridge, you could
# make the connection two-way and talk back via the client into another queue
pass

def on_close(self):
#This is important, when the connection closes, kill the RabbitMQ listener.
if self.running_thread:
self.running_thread.kill_me()

This class is instantiated whenever a SocketIO connection is made, in the traditional implementation of a chat server the connection is stored in a list in the class, which we can’t implement as we are running a server to client bridge and not inter-client communication and need to keep a connection open.

It is because of this bridging aspect that we instantiate our RabbitMQ listener as a thread – store it in a variable (this should be in a list really) so that we can kill it when the connection drops and save some memory on your server.

Setting up your RabbitMQ listener

This part of the code will define the thisRabbit() class, this class will listen to your RabbitMQ queue and keep the connection open, piping data back down to the client vie TornadIO when needed:

class thisRabbit(threading.Thread):
def __init__(self, t_client):
self.t_client = t_client
self.key = t_client.this_id
self.connection = None
self.last_active = time.time()
self.started = time.time()
self._stop = threading.Event()

threading.Thread.__init__ (self)

def stop(self):
self._stop.set()

def stopped(self):
return self._stop.isSet()

def on_connected(self, connection):
print "Connected..."
self.last_active = time.time()
connection.channel(self.on_channel_open)
self.connection = connection

def on_channel_open(self, new_channel):
print "Channel open"
self.last_active = time.time()
self.channel = new_channel
#try:
# self.channel.queue_delete(queue=self.key)
#except:
# print "Couldn't delete queue"

print "Declaring queue"
self.channel.queue_declare(queue=self.key, durable=True, exclusive=False, auto_delete=False, callback=self.on_queue_declared)
print "Queue declare done"

def on_queue_declared(self, frame):
self.last_active = time.time()
print "Trying to listen..."
self.channel.basic_consume(self.handle_delivery, queue=self.key)
print "Listening to: ", self.key
print ' [*] Waiting for messages. To exit press CTRL+C'

def kill_me(self, clear = False):
print "Killing"
print "Active threads: ", str(threading.active_count())
if clear:
self.channel.queue_delete(queue=self.key)

self.connection.close()
self.stop()

def handle_delivery(self, channel, header, method, body):
self.last_active = time.time()
print " [x] Received %r" % (body,)
if body == "die":
self.kill_me()
else:
self.t_client.send(body)

def startCall(self):
self.last_active = time.time()
connection = pika.SelectConnection(pika.ConnectionParameters(
host=RABBIT_SERVER), self.on_connected)

print "Starting connection"
connection.ioloop.start()

def run(self):
print "Running new thread:", self.name
self.startCall()

This is a lot of boring code, but in order to understand it, think of each of the various methods as a waterfall of functions that are callbacks instantiated as the events happen. So open connection -> define what to do when connection is made -> connection made -> call callback for connection made -> declare queue -> define function to declare queue etc. etc.

The one to look at here is the handle_delivery() which will do something with the message received from the queue, in our case we parse the message to check if it is a commadn to kill the connection, or, alternatively – to use the t_client property – the TornadIO connection class – to send a response down the wire to the client.

As the thread is tied to the connection, this is a server -> individual client call, strictly speaking this could be mediated here with an arbitrary protocol definition in the queue messages as one queue could individually manage individual messages for clients, although it would be more efficient to have a queue for each if that were really what you wanted to set up.

Sending a Signal

Sending a message to RabbitMQ is quite simple, the class below sshould make it easy to send a message:

class RabbitProducer:
def __init__(self, key):
self.key = key
self.connection = None
self.channel = None
self.message = None

# Called when our connection to RabbitMQ is closed
def on_closed(self, frame):
# connection.ioloop is blocking, this will stop and exit the app
self.connection.ioloop.stop()

# Called when we have connected to RabbitMQ
def on_connected(self, connection):
# Create a channel on our connection passing the on_channel_open callback
self.connection = connection
self.connection.channel(self.on_channel_open)

# Called when our channel is open
def on_channel_open(self, channel_):
# Our usable channel has been passed to us, assign it for future use
self.channel = channel_
# Declare a queue
self.channel.queue_declare(queue=self.key, durable=True,
exclusive=False, auto_delete=False,
callback=self.on_queue_declared)

# Called our queue is declared.
def on_queue_declared(self, frame):
# Add a callback so we can stop the ioloop
self.connection.add_on_close_callback(self.on_closed)
self.send_message()

def startCall(self):
# Create our connection parameters and connect to RabbitMQ
parameters = pika.ConnectionParameters(RABBIT_SERVER)
connection = pika.SelectConnection(parameters, self.on_connected)

# Start our IO/Event loop
connection.ioloop.start()

def send_message(self):
# Send a message
self.channel.basic_publish(exchange='',
routing_key=self.key,
body=self.message,
properties=pika.BasicProperties(
content_type="text/plain",
delivery_mode=1))

# Close our connection
self.connection.close()

new_rabbit = RabbitProducer('channelname')
new_rabbit.message = "Hi there!"
new_rabbit.startCall()

Testing the implementation

When yo use the Rabbit Producer, you will be opeining a connection, passing a message to the queue and quitting, there are better ways of doing this and this is a little quick and dirty, however for one off pulses this should suffice.

Some thoughts

What makes this so interesting is that RabbitMQ will enable you to build streams of messages that can be scaled and attached to whatever data consumer you need, be that another Tornado service or streaming messages directly to your users, this implementation offers a scalable and robust way to manage data streams without having to use NodeJS and SocketIO.

This implementation will also enable you to decouple your real-time data from the rest of your site, by using RabbitMQ as a mediator, you can connect as many services together as needed, make them event based or publishers into the queue without having to change or be tied to a platform (RabbitMQ has a wide set of client libraries).

Good luck!

M.