Ruby on Rails | Screencasts | Download | Documentation | Weblog | Community | Source

root/trunk/activesupport/lib/active_support/vendor/memcache-client-1.5.0/memcache.rb

Revision 8766, 22.1 kB (checked in by bitsweat, 4 months ago)

Bundled memcache client consistently returns server responses and checks for errors

Line 
1 # All original code copyright 2005, 2006, 2007 Bob Cottrell, Eric Hodel,
2 # The Robot Co-op.  All rights reserved.
3 #
4 # Redistribution and use in source and binary forms, with or without
5 # modification, are permitted provided that the following conditions
6 # are met:
7 #
8 # 1. Redistributions of source code must retain the above copyright
9 #    notice, this list of conditions and the following disclaimer.
10 # 2. Redistributions in binary form must reproduce the above copyright
11 #    notice, this list of conditions and the following disclaimer in the
12 #    documentation and/or other materials provided with the distribution.
13 # 3. Neither the names of the authors nor the names of their contributors
14 #    may be used to endorse or promote products derived from this software
15 #    without specific prior written permission.
16 #
17 # THIS SOFTWARE IS PROVIDED BY THE AUTHORS ``AS IS'' AND ANY EXPRESS
18 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 # ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE
21 # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
22 # OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
23 # OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
24 # BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
25 # WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
26 # OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
27 # EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28
29
30 require 'socket'
31 require 'thread'
32 require 'timeout'
33 require 'rubygems'
34
35 class String
36
37   ##
38   # Uses the ITU-T polynomial in the CRC32 algorithm.
39
40   def crc32_ITU_T
41     n = length
42     r = 0xFFFFFFFF
43
44     n.times do |i|
45       r ^= self[i]
46       8.times do
47         if (r & 1) != 0 then
48           r = (r>>1) ^ 0xEDB88320
49         else
50           r >>= 1
51         end
52       end
53     end
54
55     r ^ 0xFFFFFFFF
56   end
57
58 end
59
60 ##
61 # A Ruby client library for memcached.
62 #
63 # This is intended to provide access to basic memcached functionality.  It
64 # does not attempt to be complete implementation of the entire API, but it is
65 # approaching a complete implementation.
66
67 class MemCache
68
69   ##
70   # The version of MemCache you are using.
71
72   VERSION = '1.5.0'
73
74   ##
75   # Default options for the cache object.
76
77   DEFAULT_OPTIONS = {
78     :namespace   => nil,
79     :readonly    => false,
80     :multithread => false,
81   }
82
83   ##
84   # Default memcached port.
85
86   DEFAULT_PORT = 11211
87
88   ##
89   # Default memcached server weight.
90
91   DEFAULT_WEIGHT = 1
92
93   ##
94   # The amount of time to wait for a response from a memcached server.  If a
95   # response is not completed within this time, the connection to the server
96   # will be closed and an error will be raised.
97
98   attr_accessor :request_timeout
99
100   ##
101   # The namespace for this instance
102
103   attr_reader :namespace
104
105   ##
106   # The multithread setting for this instance
107
108   attr_reader :multithread
109
110   ##
111   # The servers this client talks to.  Play at your own peril.
112
113   attr_reader :servers
114
115   ##
116   # Accepts a list of +servers+ and a list of +opts+.  +servers+ may be
117   # omitted.  See +servers=+ for acceptable server list arguments.
118   #
119   # Valid options for +opts+ are:
120   #
121   #   [:namespace]   Prepends this value to all keys added or retrieved.
122   #   [:readonly]    Raises an exeception on cache writes when true.
123   #   [:multithread] Wraps cache access in a Mutex for thread safety.
124   #
125   # Other options are ignored.
126
127   def initialize(*args)
128     servers = []
129     opts = {}
130
131     case args.length
132     when 0 then # NOP
133     when 1 then
134       arg = args.shift
135       case arg
136       when Hash   then opts = arg
137       when Array  then servers = arg
138       when String then servers = [arg]
139       else raise ArgumentError, 'first argument must be Array, Hash or String'
140       end
141     when 2 then
142       servers, opts = args
143     else
144       raise ArgumentError, "wrong number of arguments (#{args.length} for 2)"
145     end
146
147     opts = DEFAULT_OPTIONS.merge opts
148     @namespace   = opts[:namespace]
149     @readonly    = opts[:readonly]
150     @multithread = opts[:multithread]
151     @mutex       = Mutex.new if @multithread
152     @buckets     = []
153     self.servers = servers
154   end
155
156   ##
157   # Returns a string representation of the cache object.
158
159   def inspect
160     "<MemCache: %d servers, %d buckets, ns: %p, ro: %p>" %
161       [@servers.length, @buckets.length, @namespace, @readonly]
162   end
163
164   ##
165   # Returns whether there is at least one active server for the object.
166
167   def active?
168     not @servers.empty?
169   end
170
171   ##
172   # Returns whether or not the cache object was created read only.
173
174   def readonly?
175     @readonly
176   end
177
178   ##
179   # Set the servers that the requests will be distributed between.  Entries
180   # can be either strings of the form "hostname:port" or
181   # "hostname:port:weight" or MemCache::Server objects.
182
183   def servers=(servers)
184     # Create the server objects.
185     @servers = servers.collect do |server|
186       case server
187       when String
188         host, port, weight = server.split ':', 3
189         port ||= DEFAULT_PORT
190         weight ||= DEFAULT_WEIGHT
191         Server.new self, host, port, weight
192       when Server
193         if server.memcache.multithread != @multithread then
194           raise ArgumentError, "can't mix threaded and non-threaded servers"
195         end
196         server
197       else
198         raise TypeError, "cannot convert #{server.class} into MemCache::Server"
199       end
200     end
201
202     # Create an array of server buckets for weight selection of servers.
203     @buckets = []
204     @servers.each do |server|
205       server.weight.times { @buckets.push(server) }
206     end
207   end
208
209   ##
210   # Deceremets the value for +key+ by +amount+ and returns the new value.
211   # +key+ must already exist.  If +key+ is not an integer, it is assumed to be
212   # 0.  +key+ can not be decremented below 0.
213
214   def decr(key, amount = 1)
215     server, cache_key = request_setup key
216
217     if @multithread then
218       threadsafe_cache_decr server, cache_key, amount
219     else
220       cache_decr server, cache_key, amount
221     end
222   rescue TypeError, SocketError, SystemCallError, IOError => err
223     handle_error server, err
224   end
225
226   ##
227   # Retrieves +key+ from memcache.  If +raw+ is false, the value will be
228   # unmarshalled.
229
230   def get(key, raw = false)
231     server, cache_key = request_setup key
232
233     value = if @multithread then
234               threadsafe_cache_get server, cache_key
235             else
236               cache_get server, cache_key
237             end
238
239     return nil if value.nil?
240
241     value = Marshal.load value unless raw
242
243     return value
244   rescue TypeError, SocketError, SystemCallError, IOError => err
245     handle_error server, err
246   end
247
248   ##
249   # Retrieves multiple values from memcached in parallel, if possible.
250   #
251   # The memcached protocol supports the ability to retrieve multiple
252   # keys in a single request.  Pass in an array of keys to this method
253   # and it will:
254   #
255   # 1. map the key to the appropriate memcached server
256   # 2. send a single request to each server that has one or more key values
257   #
258   # Returns a hash of values.
259   #
260   #   cache["a"] = 1
261   #   cache["b"] = 2
262   #   cache.get_multi "a", "b" # => { "a" => 1, "b" => 2 }
263
264   def get_multi(*keys)
265     raise MemCacheError, 'No active servers' unless active?
266
267     keys.flatten!
268     key_count = keys.length
269     cache_keys = {}
270     server_keys = Hash.new { |h,k| h[k] = [] }
271
272     # map keys to servers
273     keys.each do |key|
274       server, cache_key = request_setup key
275       cache_keys[cache_key] = key
276       server_keys[server] << cache_key
277     end
278
279     results = {}
280
281     server_keys.each do |server, keys|
282       keys = keys.join ' '
283       values = if @multithread then
284                  threadsafe_cache_get_multi server, keys
285                else
286                  cache_get_multi server, keys
287                end
288       values.each do |key, value|
289         results[cache_keys[key]] = Marshal.load value
290       end
291     end
292
293     return results
294   rescue TypeError, SocketError, SystemCallError, IOError => err
295     handle_error server, err
296   end
297
298   ##
299   # Increments the value for +key+ by +amount+ and retruns the new value.
300   # +key+ must already exist.  If +key+ is not an integer, it is assumed to be
301   # 0.
302
303   def incr(key, amount = 1)
304     server, cache_key = request_setup key
305
306     if @multithread then
307       threadsafe_cache_incr server, cache_key, amount
308     else
309       cache_incr server, cache_key, amount
310     end
311   rescue TypeError, SocketError, SystemCallError, IOError => err
312     handle_error server, err
313   end
314
315   ##
316   # Add +key+ to the cache with value +value+ that expires in +expiry+
317   # seconds.  If +raw+ is true, +value+ will not be Marshalled.
318   #
319   # Warning: Readers should not call this method in the event of a cache miss;
320   # see MemCache#add.
321
322   def set(key, value, expiry = 0, raw = false)
323     raise MemCacheError, "Update of readonly cache" if @readonly
324     server, cache_key = request_setup key
325     socket = server.socket
326
327     value = Marshal.dump value unless raw
328     command = "set #{cache_key} 0 #{expiry} #{value.size}\r\n#{value}\r\n"
329
330     begin
331       @mutex.lock if @multithread
332       socket.write command
333       result = socket.gets
334       raise_on_error_response! result
335       result
336     rescue SocketError, SystemCallError, IOError => err
337       server.close
338       raise MemCacheError, err.message
339     ensure
340       @mutex.unlock if @multithread
341     end
342   end
343
344   ##
345   # Add +key+ to the cache with value +value+ that expires in +expiry+
346   # seconds, but only if +key+ does not already exist in the cache.
347   # If +raw+ is true, +value+ will not be Marshalled.
348   #
349   # Readers should call this method in the event of a cache miss, not
350   # MemCache#set or MemCache#[]=.
351
352   def add(key, value, expiry = 0, raw = false)
353     raise MemCacheError, "Update of readonly cache" if @readonly
354     server, cache_key = request_setup key
355     socket = server.socket
356
357     value = Marshal.dump value unless raw
358     command = "add #{cache_key} 0 #{expiry} #{value.size}\r\n#{value}\r\n"
359
360     begin
361       @mutex.lock if @multithread
362       socket.write command
363       result = socket.gets
364       raise_on_error_response! result
365       result
366     rescue SocketError, SystemCallError, IOError => err
367       server.close
368       raise MemCacheError, err.message
369     ensure
370       @mutex.unlock if @multithread
371     end
372   end
373
374   ##
375   # Removes +key+ from the cache in +expiry+ seconds.
376
377   def delete(key, expiry = 0)
378     @mutex.lock if @multithread
379
380     raise MemCacheError, "No active servers" unless active?
381     cache_key = make_cache_key key
382     server = get_server_for_key cache_key
383
384     sock = server.socket
385     raise MemCacheError, "No connection to server" if sock.nil?
386
387     begin
388       sock.write "delete #{cache_key} #{expiry}\r\n"
389       result = sock.gets
390       raise_on_error_response! result
391       result
392     rescue SocketError, SystemCallError, IOError => err
393       server.close
394       raise MemCacheError, err.message
395     end
396   ensure
397     @mutex.unlock if @multithread
398   end
399
400   ##
401   # Flush the cache from all memcache servers.
402
403   def flush_all
404     raise MemCacheError, 'No active servers' unless active?
405     raise MemCacheError, "Update of readonly cache" if @readonly
406     begin
407       @mutex.lock if @multithread
408       @servers.each do |server|
409         begin
410           sock = server.socket
411           raise MemCacheError, "No connection to server" if sock.nil?
412           sock.write "flush_all\r\n"
413           result = sock.gets
414           raise_on_error_response! result
415           result
416         rescue SocketError, SystemCallError, IOError => err
417           server.close
418           raise MemCacheError, err.message
419         end
420       end
421     ensure
422       @mutex.unlock if @multithread
423     end
424   end
425
426   ##
427   # Reset the connection to all memcache servers.  This should be called if
428   # there is a problem with a cache lookup that might have left the connection
429   # in a corrupted state.
430
431   def reset
432     @servers.each { |server| server.close }
433   end
434
435   ##
436   # Returns statistics for each memcached server.  An explanation of the
437   # statistics can be found in the memcached docs:
438   #
439   # http://code.sixapart.com/svn/memcached/trunk/server/doc/protocol.txt
440   #
441   # Example:
442   #
443   #   >> pp CACHE.stats
444   #   {"localhost:11211"=>
445   #     {"bytes"=>4718,
446   #      "pid"=>20188,
447   #      "connection_structures"=>4,
448   #      "time"=>1162278121,
449   #      "pointer_size"=>32,
450   #      "limit_maxbytes"=>67108864,
451   #      "cmd_get"=>14532,
452   #      "version"=>"1.2.0",
453   #      "bytes_written"=>432583,
454   #      "cmd_set"=>32,
455   #      "get_misses"=>0,
456   #      "total_connections"=>19,
457   #      "curr_connections"=>3,
458   #      "curr_items"=>4,
459   #      "uptime"=>1557,
460   #      "get_hits"=>14532,
461   #      "total_items"=>32,
462   #      "rusage_system"=>0.313952,
463   #      "rusage_user"=>0.119981,
464   #      "bytes_read"=>190619}}
465   #   => nil
466
467   def stats
468     raise MemCacheError, "No active servers" unless active?
469     server_stats = {}
470
471     @servers.each do |server|
472       sock = server.socket
473       raise MemCacheError, "No connection to server" if sock.nil?
474
475       value = nil
476       begin
477         sock.write "stats\r\n"
478         stats = {}
479         while line = sock.gets do
480           raise_on_error_response! line
481           break if line == "END\r\n"
482           if line =~ /\ASTAT ([\w]+) ([\w\.\:]+)/ then
483             name, value = $1, $2
484             stats[name] = case name
485                           when 'version'
486                             value
487                           when 'rusage_user', 'rusage_system' then
488                             seconds, microseconds = value.split(/:/, 2)
489                             microseconds ||= 0
490                             Float(seconds) + (Float(microseconds) / 1_000_000)
491                           else
492                             if value =~ /\A\d+\Z/ then
493                               value.to_i
494                             else
495                               value
496                             end
497                           end
498           end
499         end
500         server_stats["#{server.host}:#{server.port}"] = stats
501       rescue SocketError, SystemCallError, IOError => err
502         server.close
503         raise MemCacheError, err.message
504       end
505     end
506
507     server_stats
508   end
509
510   ##
511   # Shortcut to get a value from the cache.
512
513   alias [] get
514
515   ##
516   # Shortcut to save a value in the cache.  This method does not set an
517   # expiration on the entry.  Use set to specify an explicit expiry.
518
519   def []=(key, value)
520     set key, value
521   end
522
523   protected
524
525   ##
526   # Create a key for the cache, incorporating the namespace qualifier if
527   # requested.
528
529   def make_cache_key(key)
530     if namespace.nil? then
531       key
532     else
533       "#{@namespace}:#{key}"
534     end
535   end
536
537   ##
538   # Pick a server to handle the request based on a hash of the key.
539
540   def get_server_for_key(key)
541     raise ArgumentError, "illegal character in key #{key.inspect}" if
542       key =~ /\s/
543     raise ArgumentError, "key too long #{key.inspect}" if key.length > 250
544     raise MemCacheError, "No servers available" if @servers.empty?
545     return @servers.first if @servers.length == 1
546
547     hkey = hash_for key
548
549     20.times do |try|
550       server = @buckets[hkey % @buckets.nitems]
551       return server if server.alive?
552       hkey += hash_for "#{try}#{key}"
553     end
554
555     raise MemCacheError, "No servers available"
556   end
557
558   ##
559   # Returns an interoperable hash value for +key+.  (I think, docs are
560   # sketchy for down servers).
561
562   def hash_for(key)
563     (key.crc32_ITU_T >> 16) & 0x7fff
564   end
565
566   ##
567   # Performs a raw decr for +cache_key+ from +server+.  Returns nil if not
568   # found.
569
570   def cache_decr(server, cache_key, amount)
571     socket = server.socket
572     socket.write "decr #{cache_key} #{amount}\r\n"
573     text = socket.gets
574     raise_on_error_response! text
575     return nil if text == "NOT_FOUND\r\n"
576     return text.to_i
577   end
578
579   ##
580   # Fetches the raw data for +cache_key+ from +server+.  Returns nil on cache
581   # miss.
582
583   def cache_get(server, cache_key)
584     socket = server.socket
585     socket.write "get #{cache_key}\r\n"
586     keyline = socket.gets # "VALUE <key> <flags> <bytes>\r\n"
587
588     if keyline.nil? then
589       server.close
590       raise MemCacheError, "lost connection to #{server.host}:#{server.port}"
591     end
592
593     raise_on_error_response! keyline
594     return nil if keyline == "END\r\n"
595
596     unless keyline =~ /(\d+)\r/ then
597       server.close
598       raise MemCacheError, "unexpected response #{keyline.inspect}"
599     end
600     value = socket.read $1.to_i
601     socket.read 2 # "\r\n"
602     socket.gets   # "END\r\n"
603     return value
604   end
605
606   ##
607   # Fetches +cache_keys+ from +server+ using a multi-get.
608
609   def cache_get_multi(server, cache_keys)
610     values = {}
611     socket = server.socket
612     socket.write "get #{cache_keys}\r\n"
613
614     while keyline = socket.gets do
615       return values if keyline == "END\r\n"
616       raise_on_error_response! keyline
617
618       unless keyline =~ /\AVALUE (.+) (.+) (.+)/ then
619         server.close
620         raise MemCacheError, "unexpected response #{keyline.inspect}"
621       end
622
623       key, data_length = $1, $3
624       values[$1] = socket.read data_length.to_i
625       socket.read(2) # "\r\n"
626     end
627
628     server.close
629     raise MemCacheError, "lost connection to #{server.host}:#{server.port}"
630   end
631
632   ##
633   # Performs a raw incr for +cache_key+ from +server+.  Returns nil if not
634   # found.
635
636   def cache_incr(server, cache_key, amount)
637     socket = server.socket
638     socket.write "incr #{cache_key} #{amount}\r\n"
639     text = socket.gets
640     raise_on_error_response! text
641     return nil if text == "NOT_FOUND\r\n"
642     return text.to_i
643   end
644
645   ##
646   # Handles +error+ from +server+.
647
648   def handle_error(server, error)
649     server.close if server
650     new_error = MemCacheError.new error.message
651     new_error.set_backtrace error.backtrace
652     raise new_error
653   end
654
655   ##
656   # Performs setup for making a request with +key+ from memcached.  Returns
657   # the server to fetch the key from and the complete key to use.
658
659   def request_setup(key)
660     raise MemCacheError, 'No active servers' unless active?
661     cache_key = make_cache_key key
662     server = get_server_for_key cache_key
663     raise MemCacheError, 'No connection to server' if server.socket.nil?
664     return server, cache_key
665   end
666
667   def threadsafe_cache_decr(server, cache_key, amount) # :nodoc:
668     @mutex.lock
669     cache_decr server, cache_key, amount
670   ensure
671     @mutex.unlock
672   end
673
674   def threadsafe_cache_get(server, cache_key) # :nodoc:
675     @mutex.lock
676     cache_get server, cache_key
677   ensure
678     @mutex.unlock
679   end
680
681   def threadsafe_cache_get_multi(socket