| 1 | |
|---|
| 2 | """ |
|---|
| 3 | kombu.transport.mongodb |
|---|
| 4 | ======================= |
|---|
| 5 | |
|---|
| 6 | MongoDB transport. |
|---|
| 7 | |
|---|
| 8 | :copyright: (c) 2010 - 2012 by Flavio Percoco Premoli. |
|---|
| 9 | :license: BSD, see LICENSE for more details. |
|---|
| 10 | |
|---|
| 11 | From: http://pydoc.net/Python/kombu/2.5.4/kombu.transport.mongodb/ |
|---|
| 12 | |
|---|
| 13 | """ |
|---|
| 14 | from __future__ import absolute_import |
|---|
| 15 | |
|---|
| 16 | from Queue import Empty |
|---|
| 17 | |
|---|
| 18 | import pymongo |
|---|
| 19 | |
|---|
| 20 | from pymongo import errors |
|---|
| 21 | from anyjson import loads, dumps |
|---|
| 22 | from pymongo.connection import Connection |
|---|
| 23 | |
|---|
| 24 | from kombu.exceptions import StdConnectionError, StdChannelError |
|---|
| 25 | |
|---|
| 26 | from . import virtual |
|---|
| 27 | |
|---|
| 28 | DEFAULT_HOST = '127.0.0.1' |
|---|
| 29 | DEFAULT_PORT = 27017 |
|---|
| 30 | |
|---|
| 31 | __author__ = """\ |
|---|
| 32 | Flavio [FlaPer87] Percoco Premoli <flaper87@flaper87.org>;\ |
|---|
| 33 | Scott Lyons <scottalyons@gmail.com>;\ |
|---|
| 34 | """ |
|---|
| 35 | |
|---|
| 36 | |
|---|
| 37 | class Channel(virtual.Channel): |
|---|
| 38 | _client = None |
|---|
| 39 | supports_fanout = True |
|---|
| 40 | _fanout_queues = {} |
|---|
| 41 | |
|---|
| 42 | def __init__(self, *vargs, **kwargs): |
|---|
| 43 | super_ = super(Channel, self) |
|---|
| 44 | super_.__init__(*vargs, **kwargs) |
|---|
| 45 | |
|---|
| 46 | self._queue_cursors = {} |
|---|
| 47 | self._queue_readcounts = {} |
|---|
| 48 | |
|---|
| 49 | def _new_queue(self, queue, **kwargs): |
|---|
| 50 | pass |
|---|
| 51 | |
|---|
| 52 | def _get(self, queue): |
|---|
| 53 | try: |
|---|
| 54 | if queue in self._fanout_queues: |
|---|
| 55 | msg = self._queue_cursors[queue].next() |
|---|
| 56 | self._queue_readcounts[queue] += 1 |
|---|
| 57 | return loads(msg['payload']) |
|---|
| 58 | else: |
|---|
| 59 | msg = self.client.command('findandmodify', 'messages', |
|---|
| 60 | query={'queue': queue}, |
|---|
| 61 | sort={'_id': pymongo.ASCENDING}, remove=True) |
|---|
| 62 | except errors.OperationFailure, exc: |
|---|
| 63 | if 'No matching object found' in exc.args[0]: |
|---|
| 64 | raise Empty() |
|---|
| 65 | raise |
|---|
| 66 | except StopIteration: |
|---|
| 67 | raise Empty() |
|---|
| 68 | |
|---|
| 69 | # as of mongo 2.0 empty results won't raise an error |
|---|
| 70 | if msg['value'] is None: |
|---|
| 71 | raise Empty() |
|---|
| 72 | return loads(msg['value']['payload']) |
|---|
| 73 | |
|---|
| 74 | def _size(self, queue): |
|---|
| 75 | if queue in self._fanout_queues: |
|---|
| 76 | return (self._queue_cursors[queue].count() - |
|---|
| 77 | self._queue_readcounts[queue]) |
|---|
| 78 | |
|---|
| 79 | return self.client.messages.find({'queue': queue}).count() |
|---|
| 80 | |
|---|
| 81 | def _put(self, queue, message, **kwargs): |
|---|
| 82 | self.client.messages.insert({'payload': dumps(message), |
|---|
| 83 | 'queue': queue}) |
|---|
| 84 | |
|---|
| 85 | def _purge(self, queue): |
|---|
| 86 | size = self._size(queue) |
|---|
| 87 | if queue in self._fanout_queues: |
|---|
| 88 | cursor = self._queue_cursors[queue] |
|---|
| 89 | cursor.rewind() |
|---|
| 90 | self._queue_cursors[queue] = cursor.skip(cursor.count()) |
|---|
| 91 | else: |
|---|
| 92 | self.client.messages.remove({'queue': queue}) |
|---|
| 93 | return size |
|---|
| 94 | |
|---|
| 95 | def close(self): |
|---|
| 96 | super(Channel, self).close() |
|---|
| 97 | if self._client: |
|---|
| 98 | self._client.connection.end_request() |
|---|
| 99 | |
|---|
| 100 | def _open(self): |
|---|
| 101 | """ |
|---|
| 102 | See mongodb uri documentation: |
|---|
| 103 | http://www.mongodb.org/display/DOCS/Connections |
|---|
| 104 | """ |
|---|
| 105 | conninfo = self.connection.client |
|---|
| 106 | |
|---|
| 107 | dbname = None |
|---|
| 108 | hostname = None |
|---|
| 109 | |
|---|
| 110 | if not conninfo.hostname: |
|---|
| 111 | conninfo.hostname = DEFAULT_HOST |
|---|
| 112 | |
|---|
| 113 | for part in conninfo.hostname.split('/'): |
|---|
| 114 | if not hostname: |
|---|
| 115 | hostname = 'mongodb://' + part |
|---|
| 116 | continue |
|---|
| 117 | |
|---|
| 118 | dbname = part |
|---|
| 119 | if '?' in part: |
|---|
| 120 | # In case someone is passing options |
|---|
| 121 | # to the mongodb connection. Right now |
|---|
| 122 | # it is not permitted by kombu |
|---|
| 123 | dbname, options = part.split('?') |
|---|
| 124 | hostname += '/?' + options |
|---|
| 125 | |
|---|
| 126 | hostname = "%s/%s" % (hostname, dbname in [None, "/"] and "admin" \ |
|---|
| 127 | or dbname) |
|---|
| 128 | if not dbname or dbname == "/": |
|---|
| 129 | dbname = "kombu_default" |
|---|
| 130 | |
|---|
| 131 | # At this point we expect the hostname to be something like |
|---|
| 132 | # (considering replica set form too): |
|---|
| 133 | # |
|---|
| 134 | # mongodb://[username:password@]host1[:port1][,host2[:port2], |
|---|
| 135 | # ...[,hostN[:portN]]][/[?options]] |
|---|
| 136 | mongoconn = Connection(host=hostname) |
|---|
| 137 | version = mongoconn.server_info()['version'] |
|---|
| 138 | if tuple(map(int, version.split('.')[:2])) < (1, 3): |
|---|
| 139 | raise NotImplementedError( |
|---|
| 140 | 'Kombu requires MongoDB version 1.3+, but connected to %s' % ( |
|---|
| 141 | version, )) |
|---|
| 142 | |
|---|
| 143 | database = getattr(mongoconn, dbname) |
|---|
| 144 | |
|---|
| 145 | # This is done by the connection uri |
|---|
| 146 | # if conninfo.userid: |
|---|
| 147 | # database.authenticate(conninfo.userid, conninfo.password) |
|---|
| 148 | self.db = database |
|---|
| 149 | col = database.messages |
|---|
| 150 | col.ensure_index([('queue', 1), ('_id', 1)], background=True) |
|---|
| 151 | |
|---|
| 152 | if 'messages.broadcast' not in database.collection_names(): |
|---|
| 153 | capsize = conninfo.transport_options.get( |
|---|
| 154 | 'capped_queue_size') or 100000 |
|---|
| 155 | database.create_collection('messages.broadcast', size=capsize, |
|---|
| 156 | capped=True) |
|---|
| 157 | |
|---|
| 158 | self.bcast = getattr(database, 'messages.broadcast') |
|---|
| 159 | self.bcast.ensure_index([('queue', 1)]) |
|---|
| 160 | |
|---|
| 161 | self.routing = getattr(database, 'messages.routing') |
|---|
| 162 | self.routing.ensure_index([('queue', 1), ('exchange', 1)]) |
|---|
| 163 | return database |
|---|
| 164 | |
|---|
| 165 | #TODO: Store a more complete exchange metatable in the routing collection |
|---|
| 166 | def get_table(self, exchange): |
|---|
| 167 | """Get table of bindings for ``exchange``.""" |
|---|
| 168 | localRoutes = frozenset(self.state.exchanges[exchange]['table']) |
|---|
| 169 | brokerRoutes = self.client.messages.routing.find({ |
|---|
| 170 | 'exchange': exchange}) |
|---|
| 171 | |
|---|
| 172 | return localRoutes | frozenset((r['routing_key'], |
|---|
| 173 | r['pattern'], |
|---|
| 174 | r['queue']) for r in brokerRoutes) |
|---|
| 175 | |
|---|
| 176 | def _put_fanout(self, exchange, message, **kwargs): |
|---|
| 177 | """Deliver fanout message.""" |
|---|
| 178 | self.client.messages.broadcast.insert({'payload': dumps(message), |
|---|
| 179 | 'queue': exchange}) |
|---|
| 180 | |
|---|
| 181 | def _queue_bind(self, exchange, routing_key, pattern, queue): |
|---|
| 182 | if self.typeof(exchange).type == 'fanout': |
|---|
| 183 | cursor = self.bcast.find(query={'queue': exchange}, |
|---|
| 184 | sort=[('$natural', 1)], tailable=True) |
|---|
| 185 | # Fast forward the cursor past old events |
|---|
| 186 | self._queue_cursors[queue] = cursor.skip(cursor.count()) |
|---|
| 187 | self._queue_readcounts[queue] = cursor.count() |
|---|
| 188 | self._fanout_queues[queue] = exchange |
|---|
| 189 | |
|---|
| 190 | meta = {'exchange': exchange, |
|---|
| 191 | 'queue': queue, |
|---|
| 192 | 'routing_key': routing_key, |
|---|
| 193 | 'pattern': pattern} |
|---|
| 194 | self.client.messages.routing.update(meta, meta, upsert=True) |
|---|
| 195 | |
|---|
| 196 | def queue_delete(self, queue, **kwargs): |
|---|
| 197 | self.routing.remove({'queue': queue}) |
|---|
| 198 | super(Channel, self).queue_delete(queue, **kwargs) |
|---|
| 199 | if queue in self._fanout_queues: |
|---|
| 200 | self._queue_cursors[queue].close() |
|---|
| 201 | self._queue_cursors.pop(queue, None) |
|---|
| 202 | self._fanout_queues.pop(queue, None) |
|---|
| 203 | |
|---|
| 204 | @property |
|---|
| 205 | def client(self): |
|---|
| 206 | if self._client is None: |
|---|
| 207 | self._client = self._open() |
|---|
| 208 | return self._client |
|---|
| 209 | |
|---|
| 210 | |
|---|
| 211 | class Transport(virtual.Transport): |
|---|
| 212 | Channel = Channel |
|---|
| 213 | |
|---|
| 214 | polling_interval = 1 |
|---|
| 215 | default_port = DEFAULT_PORT |
|---|
| 216 | connection_errors = (StdConnectionError, errors.ConnectionFailure) |
|---|
| 217 | channel_errors = (StdChannelError, |
|---|
| 218 | errors.ConnectionFailure, |
|---|
| 219 | errors.OperationFailure) |
|---|
| 220 | driver_type = 'mongodb' |
|---|
| 221 | driver_name = 'pymongo' |
|---|
| 222 | |
|---|
| 223 | def driver_version(self): |
|---|
| 224 | return pymongo.version |
|---|