Notes/PythonAMQP: kombu-mongodb-example.py

File kombu-mongodb-example.py, 7.4 KB (added by billb, 10 years ago)

Example of Kombu using MongoDB as Transport

Line 
1
2"""
3kombu.transport.mongodb
4=======================
5
6MongoDB transport.
7
8:copyright: (c) 2010 - 2012 by Flavio Percoco Premoli.
9:license: BSD, see LICENSE for more details.
10
11From: http://pydoc.net/Python/kombu/2.5.4/kombu.transport.mongodb/
12
13"""
14from __future__ import absolute_import
15
16from Queue import Empty
17
18import pymongo
19
20from pymongo import errors
21from anyjson import loads, dumps
22from pymongo.connection import Connection
23
24from kombu.exceptions import StdConnectionError, StdChannelError
25
26from . import virtual
27
28DEFAULT_HOST = '127.0.0.1'
29DEFAULT_PORT = 27017
30
31__author__ = """\
32Flavio [FlaPer87] Percoco Premoli <flaper87@flaper87.org>;\
33Scott Lyons <scottalyons@gmail.com>;\
34"""
35
36
37class Channel(virtual.Channel):
38    _client = None
39    supports_fanout = True
40    _fanout_queues = {}
41
42    def __init__(self, *vargs, **kwargs):
43        super_ = super(Channel, self)
44        super_.__init__(*vargs, **kwargs)
45
46        self._queue_cursors = {}
47        self._queue_readcounts = {}
48
49    def _new_queue(self, queue, **kwargs):
50        pass
51
52    def _get(self, queue):
53        try:
54            if queue in self._fanout_queues:
55                msg = self._queue_cursors[queue].next()
56                self._queue_readcounts[queue] += 1
57                return loads(msg['payload'])
58            else:
59                msg = self.client.command('findandmodify', 'messages',
60                    query={'queue': queue},
61                    sort={'_id': pymongo.ASCENDING}, remove=True)
62        except errors.OperationFailure, exc:
63            if 'No matching object found' in exc.args[0]:
64                raise Empty()
65            raise
66        except StopIteration:
67            raise Empty()
68
69        # as of mongo 2.0 empty results won't raise an error
70        if msg['value'] is None:
71            raise Empty()
72        return loads(msg['value']['payload'])
73
74    def _size(self, queue):
75        if queue in self._fanout_queues:
76            return (self._queue_cursors[queue].count() -
77                    self._queue_readcounts[queue])
78
79        return self.client.messages.find({'queue': queue}).count()
80
81    def _put(self, queue, message, **kwargs):
82        self.client.messages.insert({'payload': dumps(message),
83                                     'queue': queue})
84
85    def _purge(self, queue):
86        size = self._size(queue)
87        if queue in self._fanout_queues:
88            cursor = self._queue_cursors[queue]
89            cursor.rewind()
90            self._queue_cursors[queue] = cursor.skip(cursor.count())
91        else:
92            self.client.messages.remove({'queue': queue})
93        return size
94
95    def close(self):
96        super(Channel, self).close()
97        if self._client:
98            self._client.connection.end_request()
99
100    def _open(self):
101        """
102        See mongodb uri documentation:
103        http://www.mongodb.org/display/DOCS/Connections
104        """
105        conninfo = self.connection.client
106
107        dbname = None
108        hostname = None
109
110        if not conninfo.hostname:
111            conninfo.hostname = DEFAULT_HOST
112
113        for part in conninfo.hostname.split('/'):
114            if not hostname:
115                hostname = 'mongodb://' + part
116                continue
117
118            dbname = part
119            if '?' in part:
120                # In case someone is passing options
121                # to the mongodb connection. Right now
122                # it is not permitted by kombu
123                dbname, options = part.split('?')
124                hostname += '/?' + options
125
126        hostname = "%s/%s" % (hostname, dbname in [None, "/"] and "admin" \
127                                                                    or dbname)
128        if not dbname or dbname == "/":
129            dbname = "kombu_default"
130
131        # At this point we expect the hostname to be something like
132        # (considering replica set form too):
133        #
134        #   mongodb://[username:password@]host1[:port1][,host2[:port2],
135        #   ...[,hostN[:portN]]][/[?options]]
136        mongoconn = Connection(host=hostname)
137        version = mongoconn.server_info()['version']
138        if tuple(map(int, version.split('.')[:2])) < (1, 3):
139            raise NotImplementedError(
140                'Kombu requires MongoDB version 1.3+, but connected to %s' % (
141                    version, ))
142
143        database = getattr(mongoconn, dbname)
144
145        # This is done by the connection uri
146        # if conninfo.userid:
147        #     database.authenticate(conninfo.userid, conninfo.password)
148        self.db = database
149        col = database.messages
150        col.ensure_index([('queue', 1), ('_id', 1)], background=True)
151
152        if 'messages.broadcast' not in database.collection_names():
153            capsize = conninfo.transport_options.get(
154                            'capped_queue_size') or 100000
155            database.create_collection('messages.broadcast', size=capsize,
156                                                             capped=True)
157
158        self.bcast = getattr(database, 'messages.broadcast')
159        self.bcast.ensure_index([('queue', 1)])
160
161        self.routing = getattr(database, 'messages.routing')
162        self.routing.ensure_index([('queue', 1), ('exchange', 1)])
163        return database
164
165    #TODO: Store a more complete exchange metatable in the routing collection
166    def get_table(self, exchange):
167        """Get table of bindings for ``exchange``."""
168        localRoutes = frozenset(self.state.exchanges[exchange]['table'])
169        brokerRoutes = self.client.messages.routing.find({
170                            'exchange': exchange})
171
172        return localRoutes | frozenset((r['routing_key'],
173                                        r['pattern'],
174                                        r['queue']) for r in brokerRoutes)
175
176    def _put_fanout(self, exchange, message, **kwargs):
177        """Deliver fanout message."""
178        self.client.messages.broadcast.insert({'payload': dumps(message),
179                                               'queue': exchange})
180
181    def _queue_bind(self, exchange, routing_key, pattern, queue):
182        if self.typeof(exchange).type == 'fanout':
183            cursor = self.bcast.find(query={'queue': exchange},
184                                     sort=[('$natural', 1)], tailable=True)
185            # Fast forward the cursor past old events
186            self._queue_cursors[queue] = cursor.skip(cursor.count())
187            self._queue_readcounts[queue] = cursor.count()
188            self._fanout_queues[queue] = exchange
189
190        meta = {'exchange': exchange,
191                'queue': queue,
192                'routing_key': routing_key,
193                'pattern': pattern}
194        self.client.messages.routing.update(meta, meta, upsert=True)
195
196    def queue_delete(self, queue, **kwargs):
197        self.routing.remove({'queue': queue})
198        super(Channel, self).queue_delete(queue, **kwargs)
199        if queue in self._fanout_queues:
200            self._queue_cursors[queue].close()
201            self._queue_cursors.pop(queue, None)
202            self._fanout_queues.pop(queue, None)
203
204    @property
205    def client(self):
206        if self._client is None:
207            self._client = self._open()
208        return self._client
209
210
211class Transport(virtual.Transport):
212    Channel = Channel
213
214    polling_interval = 1
215    default_port = DEFAULT_PORT
216    connection_errors = (StdConnectionError, errors.ConnectionFailure)
217    channel_errors = (StdChannelError,
218                      errors.ConnectionFailure,
219                      errors.OperationFailure)
220    driver_type = 'mongodb'
221    driver_name = 'pymongo'
222
223    def driver_version(self):
224        return pymongo.version