| | 1 | require 'monitor' |
|---|
| | 2 | |
|---|
| | 3 | module ActiveRecord |
|---|
| | 4 | |
|---|
| | 5 | class Base |
|---|
| | 6 | |
|---|
| | 7 | # Determines whether or not to use a connection pool to manage the actual connections. |
|---|
| | 8 | # Is tested and applied on a class by class basis. When set to true, @@allow_concurrency is |
|---|
| | 9 | # effectively disregarded |
|---|
| | 10 | cattr_accessor :pool_connections |
|---|
| | 11 | @@pool_connections = false |
|---|
| | 12 | |
|---|
| | 13 | cattr_accessor :connection_pool |
|---|
| | 14 | @@connection_pool=nil |
|---|
| | 15 | |
|---|
| | 16 | # Initial and minimum size of the connection pool. Meaningful only when @@pool_connections is true |
|---|
| | 17 | cattr_accessor :pool_connections_min |
|---|
| | 18 | @@pool_connections_min = 0 |
|---|
| | 19 | |
|---|
| | 20 | # Maximum size of the connection pool. Meaningful only when @@pool_connections is true |
|---|
| | 21 | cattr_accessor :pool_connections_max |
|---|
| | 22 | @@pool_connections_max = 0 |
|---|
| | 23 | |
|---|
| | 24 | # Whether or not a thread will wait for a pool connection to become available, |
|---|
| | 25 | # or if ConnectionNotEstablished is thrown |
|---|
| | 26 | cattr_accessor :pool_connections_wait |
|---|
| | 27 | @@pool_connections_wait = true |
|---|
| | 28 | |
|---|
| | 29 | # Period in seconds between sweeping for unused connections in the pool |
|---|
| | 30 | cattr_accessor :pool_connections_reaper_period |
|---|
| | 31 | @@pool_connections_reaper_period = 30 |
|---|
| | 32 | |
|---|
| | 33 | # alias adapter methods to get a ConnectionPool::Wrapper when pooling is enabled |
|---|
| | 34 | class << self |
|---|
| | 35 | #alias_method :mysql_connection_raw, :mysql_connection |
|---|
| | 36 | adapter_methods = RAILS_CONNECTION_ADAPTERS.collect {|a| "#{a}_connection" } |
|---|
| | 37 | adapter_methods.each do |am| |
|---|
| | 38 | if method_defined?(am.to_sym) |
|---|
| | 39 | alias_method "#{am}_raw".to_sym, am.to_sym |
|---|
| | 40 | module_eval <<-EOM |
|---|
| | 41 | def #{am}(*args) |
|---|
| | 42 | if pool_connections |
|---|
| | 43 | if @@connection_pool.nil? |
|---|
| | 44 | @@connection_pool = ConnectionPool.new(Proc.new {#{am}_raw(*args)}) |
|---|
| | 45 | @@connection_pool.startup |
|---|
| | 46 | end |
|---|
| | 47 | return @@connection_pool.wrapper |
|---|
| | 48 | else |
|---|
| | 49 | return #{am}_raw(*args) |
|---|
| | 50 | end |
|---|
| | 51 | end |
|---|
| | 52 | EOM |
|---|
| | 53 | end |
|---|
| | 54 | end |
|---|
| | 55 | |
|---|
| | 56 | alias_method :remove_connection_raw, :remove_connection |
|---|
| | 57 | def remove_connection(klass=self) |
|---|
| | 58 | if @@connection_pool |
|---|
| | 59 | @@connection_pool.shutdown |
|---|
| | 60 | @@connection_pool = nil |
|---|
| | 61 | end |
|---|
| | 62 | remove_connection_raw(klass) |
|---|
| | 63 | end |
|---|
| | 64 | end |
|---|
| | 65 | end |
|---|
| | 66 | |
|---|
| | 67 | class ConnectionPool |
|---|
| | 68 | include MonitorMixin |
|---|
| | 69 | |
|---|
| | 70 | def initialize(prc) |
|---|
| | 71 | super() |
|---|
| | 72 | @prc = prc |
|---|
| | 73 | end |
|---|
| | 74 | |
|---|
| | 75 | def wrapper |
|---|
| | 76 | Wrapper.new(self) |
|---|
| | 77 | end |
|---|
| | 78 | |
|---|
| | 79 | def startup |
|---|
| | 80 | @min_size,@max_size = ActiveRecord::Base.pool_connections_min,ActiveRecord::Base.pool_connections_max |
|---|
| | 81 | @available_connections = [] |
|---|
| | 82 | @acquired_connections = [] |
|---|
| | 83 | @min_size.times { @available_connections << @prc.call } |
|---|
| | 84 | @cond_var ||= new_cond |
|---|
| | 85 | @use_check = 0 |
|---|
| | 86 | @use_thread = Thread.new { reap_connections } |
|---|
| | 87 | end |
|---|
| | 88 | |
|---|
| | 89 | def shutdown |
|---|
| | 90 | synchronize do |
|---|
| | 91 | @cond_var.wait_until { @acquired_connections.size==0 } |
|---|
| | 92 | # TODO - connection_adapters should have a close method to close the physical connection |
|---|
| | 93 | #@available_connections.each do |c| |
|---|
| | 94 | # c.close |
|---|
| | 95 | #end |
|---|
| | 96 | @available_connections = nil |
|---|
| | 97 | end |
|---|
| | 98 | @use_thread.terminate |
|---|
| | 99 | end |
|---|
| | 100 | |
|---|
| | 101 | def reset |
|---|
| | 102 | shutdown |
|---|
| | 103 | startup |
|---|
| | 104 | end |
|---|
| | 105 | |
|---|
| | 106 | def current_size |
|---|
| | 107 | return @available_connections.size + @acquired_connections.size |
|---|
| | 108 | end |
|---|
| | 109 | |
|---|
| | 110 | def reap_connections |
|---|
| | 111 | Base.logger.debug("starting reaper thread") |
|---|
| | 112 | loop do |
|---|
| | 113 | sleep(ActiveRecord::Base.pool_connections_reaper_period) |
|---|
| | 114 | synchronize do |
|---|
| | 115 | Base.logger.debug("use_check is #{@use_check}") |
|---|
| | 116 | if @use_check < current_size |
|---|
| | 117 | num_to_free = current_size - @use_check |
|---|
| | 118 | max_to_free = current_size-@min_size |
|---|
| | 119 | num_to_free = max_to_free if num_to_free > max_to_free |
|---|
| | 120 | Base.logger.debug("freeing #{num_to_free} connections") |
|---|
| | 121 | @available_connections[0...num_to_free].each do |conn| |
|---|
| | 122 | conn.close |
|---|
| | 123 | @available_connections.delete(conn) |
|---|
| | 124 | end |
|---|
| | 125 | end |
|---|
| | 126 | end |
|---|
| | 127 | @use_check=0 |
|---|
| | 128 | end |
|---|
| | 129 | end |
|---|
| | 130 | |
|---|
| | 131 | def acquire_connection |
|---|
| | 132 | conn = nil |
|---|
| | 133 | synchronize do |
|---|
| | 134 | loop do |
|---|
| | 135 | if @available_connections.size > 0 |
|---|
| | 136 | conn = @available_connections.shift |
|---|
| | 137 | break |
|---|
| | 138 | elsif (@max_size > 0 and current_size < @max_size) or @max_size==0 |
|---|
| | 139 | conn = @prc.call |
|---|
| | 140 | break |
|---|
| | 141 | else |
|---|
| | 142 | if ActiveRecord::Base.pool_connections_wait |
|---|
| | 143 | @cond_var.wait |
|---|
| | 144 | else |
|---|
| | 145 | raise ActiveRecord::ConnectionNotEstablished |
|---|
| | 146 | end |
|---|
| | 147 | end |
|---|
| | 148 | end |
|---|
| | 149 | @acquired_connections << conn |
|---|
| | 150 | @use_check=@acquired_connections.size if @acquired_connections.size > @use_check |
|---|
| | 151 | Base.logger.debug("current_size=" + current_size.to_s + " max_size=" + @max_size.to_s) |
|---|
| | 152 | end |
|---|
| | 153 | conn |
|---|
| | 154 | end |
|---|
| | 155 | |
|---|
| | 156 | def release_connection(conn) |
|---|
| | 157 | synchronize do |
|---|
| | 158 | @acquired_connections.delete(conn) |
|---|
| | 159 | @available_connections << conn |
|---|
| | 160 | @cond_var.signal |
|---|
| | 161 | end |
|---|
| | 162 | end |
|---|
| | 163 | |
|---|
| | 164 | # The Wrapper class is used as a proxy for the real connection adapter. |
|---|
| | 165 | # All methods are forwarded to the underlying connections in the pool via |
|---|
| | 166 | # the borrow_connection method, which ensures connections go back to the pool |
|---|
| | 167 | # after the method is executed on the connection |
|---|
| | 168 | class Wrapper |
|---|
| | 169 | attr_reader :pool |
|---|
| | 170 | |
|---|
| | 171 | # copy definition of transaction method from database_statements.rb so that internal transaction |
|---|
| | 172 | # methods (begin_db_transaction, etc.) go through the wrapper |
|---|
| | 173 | def transaction(start_db_transaction = true) |
|---|
| | 174 | transaction_open = false |
|---|
| | 175 | begin |
|---|
| | 176 | if block_given? |
|---|
| | 177 | if start_db_transaction |
|---|
| | 178 | begin_db_transaction |
|---|
| | 179 | transaction_open = true |
|---|
| | 180 | end |
|---|
| | 181 | yield |
|---|
| | 182 | end |
|---|
| | 183 | rescue Exception => database_transaction_rollback |
|---|
| | 184 | if transaction_open |
|---|
| | 185 | transaction_open = false |
|---|
| | 186 | rollback_db_transaction |
|---|
| | 187 | end |
|---|
| | 188 | raise |
|---|
| | 189 | end |
|---|
| | 190 | ensure |
|---|
| | 191 | commit_db_transaction if transaction_open |
|---|
| | 192 | end |
|---|
| | 193 | |
|---|
| | 194 | def initialize(pool) |
|---|
| | 195 | @pool=pool |
|---|
| | 196 | end |
|---|
| | 197 | |
|---|
| | 198 | def indexes(table_name, name=nil) |
|---|
| | 199 | borrow_connection {|c| c.indexes(table_name, name) } |
|---|
| | 200 | end |
|---|
| | 201 | |
|---|
| | 202 | def tables(name=nil) |
|---|
| | 203 | borrow_connection {|c| c.tables(name) } |
|---|
| | 204 | end |
|---|
| | 205 | |
|---|
| | 206 | # Provides a real connection to a block and returns the value of the block. |
|---|
| | 207 | # Primary purpose is the enforce proper use of pooled connections |
|---|
| | 208 | def borrow_connection(&block) |
|---|
| | 209 | result = nil |
|---|
| | 210 | if block_given? |
|---|
| | 211 | if Thread.current['activerecord_connection'].nil? |
|---|
| | 212 | begin |
|---|
| | 213 | Thread.current['activerecord_connection'] = @pool.acquire_connection |
|---|
| | 214 | result = yield Thread.current['activerecord_connection'] |
|---|
| | 215 | ensure |
|---|
| | 216 | if !Thread.current['activerecord_connection'].nil? |
|---|
| | 217 | @pool.release_connection(Thread.current['activerecord_connection']) |
|---|
| | 218 | Thread.current['activerecord_connection'] = nil |
|---|
| | 219 | end |
|---|
| | 220 | end |
|---|
| | 221 | else |
|---|
| | 222 | result = yield Thread.current['activerecord_connection'] |
|---|
| | 223 | end |
|---|
| | 224 | end |
|---|
| | 225 | result |
|---|
| | 226 | end |
|---|
| | 227 | alias_method :borrow, :borrow_connection |
|---|
| | 228 | |
|---|
| | 229 | def begin_db_transaction |
|---|
| | 230 | if Thread.current['activerecord_connection'].nil? |
|---|
| | 231 | Thread.current['activerecord_connection'] = @pool.acquire_connection |
|---|
| | 232 | Thread.current['activerecord_connection'].begin_db_transaction |
|---|
| | 233 | end |
|---|
| | 234 | end |
|---|
| | 235 | |
|---|
| | 236 | def commit_db_transaction |
|---|
| | 237 | if !Thread.current['activerecord_connection'].nil? |
|---|
| | 238 | Thread.current['activerecord_connection'].commit_db_transaction |
|---|
| | 239 | @pool.release_connection(Thread.current['activerecord_connection']) |
|---|
| | 240 | Thread.current['activerecord_connection'] = nil |
|---|
| | 241 | end |
|---|
| | 242 | end |
|---|
| | 243 | |
|---|
| | 244 | def rollback_db_transaction |
|---|
| | 245 | if !Thread.current['activerecord_connection'].nil? |
|---|
| | 246 | Thread.current['activerecord_connection'].rollback_db_transaction |
|---|
| | 247 | @pool.release_connection(Thread.current['activerecord_connection']) |
|---|
| | 248 | Thread.current['activerecord_connection'] = nil |
|---|
| | 249 | end |
|---|
| | 250 | end |
|---|
| | 251 | |
|---|
| | 252 | def method_missing(method, *args, &proc) |
|---|
| | 253 | borrow_connection do |conn| |
|---|
| | 254 | conn.send(method, *args, &proc) |
|---|
| | 255 | end |
|---|
| | 256 | end |
|---|
| | 257 | |
|---|
| | 258 | # For testability more than anything |
|---|
| | 259 | def pool_size |
|---|
| | 260 | return @pool.current_size |
|---|
| | 261 | end |
|---|
| | 262 | |
|---|
| | 263 | def shutdown_pool |
|---|
| | 264 | @pool.shutdown |
|---|
| | 265 | end |
|---|
| | 266 | end |
|---|
| | 267 | |
|---|
| | 268 | end |
|---|
| | 269 | end |