Ruby on Rails | Screencasts | Download | Documentation | Weblog | Community | Source

Ticket #2162: active_record_connection_pooling4.diff

File active_record_connection_pooling4.diff, 18.1 kB (added by greg@lapcominc.com, 3 years ago)
  • activerecord/test/fixtures/db_definitions/mysql.sql

    old new  
    188188CREATE TABLE `fk_test_has_fk` ( 
    189189  `id`    INTEGER NOT NULL PRIMARY KEY, 
    190190  `fk_id` INTEGER NOT NULL, 
    191  
     191  INDEX `fk_ind` (`fk_id`), 
    192192  FOREIGN KEY (`fk_id`) REFERENCES `fk_test_has_pk`(`id`) 
    193193) TYPE=InnoDB; 
    194194 
  • activerecord/test/pool_connections_test.rb

    old new  
     1require 'abstract_unit' 
     2require 'fixtures/topic' 
     3 
     4class PoolConnectionsTest < Test::Unit::TestCase 
     5  self.use_transactional_fixtures = false 
     6   
     7  fixtures :topics 
     8     
     9  def setup 
     10    super 
     11    ActiveRecord::Base.pool_connections = true 
     12    ActiveRecord::Base.pool_connections_min = 2 
     13    ActiveRecord::Base.pool_connections_max = 5     
     14    ActiveRecord::Base.establish_connection(ActiveRecord::Base.remove_connection)     
     15  end 
     16     
     17  def test_with_pooling_on 
     18    # assert that pool was created by establish_connection 
     19    assert_not_nil(ActiveRecord::Base.connection.pool) 
     20     
     21    # assert connection pooling works propertly with subclasses 
     22    assert_not_nil(Topic.connection.pool) 
     23    assert_equal(Topic.connection, ActiveRecord::Base.connection) 
     24     
     25    # Test that pool_connections_min connections initially established 
     26    assert_equal(2, ActiveRecord::Base.connection.pool_size) 
     27     
     28    # Test that additional connections are added to pool as needed, and threads wait 
     29    # when all connections are out 
     30    ActiveRecord::Base.pool_connections_wait = true 
     31    ActiveRecord::Base.connection.borrow do 
     32      Thread.new do 
     33        Topic.find :first 
     34        # should be 2 connections because we created 2 at startup (pool_connections_min=2) 
     35        assert_equal(2, ActiveRecord::Base.connection.pool_size) 
     36         
     37        # borrow connection then start new thread while holding connection, each new thread 
     38        # should increase size of pool as the connections are all in use 
     39        ActiveRecord::Base.connection.borrow do 
     40          Thread.new do 
     41            Topic.transaction do 
     42              assert_equal(3, ActiveRecord::Base.connection.pool_size) 
     43              ActiveRecord::Base.connection.borrow do 
     44                Thread.new do 
     45                  Topic.find :first 
     46                  assert_equal(4, ActiveRecord::Base.connection.pool_size)                 
     47                  ActiveRecord::Base.connection.borrow do 
     48                    Thread.new do 
     49                      Topic.find :first 
     50                       
     51                      # We should have 5 in there now which is what we set pool_connections_max to 
     52                      assert_equal(5, ActiveRecord::Base.connection.pool_size)                     
     53                      5.times do 
     54                        # Make these 5 threads share the one connection that is available in the pool 
     55                        # instead of creating more connections when an empty pool is found 
     56                        Thread.new do 
     57                          assert_not_nil(Topic.find(:first)) 
     58                          assert_not_nil(Topic.find(:first)) 
     59                          assert_equal(5, ActiveRecord::Base.connection.pool_size) 
     60                        end.join 
     61                      end                                                                 
     62                    end.join 
     63                  end                                 
     64                end.join 
     65              end 
     66            end 
     67          end.join 
     68        end           
     69      end.join       
     70    end 
     71  end 
     72   
     73  def test_no_wait     
     74    # Try with pool_connections_wait set to false 
     75    ActiveRecord::Base.pool_connections_wait = false 
     76     
     77    # borrow connection then start new thread while holding connection, each new thread 
     78    # should increase size of pool as the connections are all in use 
     79    ActiveRecord::Base.connection.borrow do 
     80      Thread.new do 
     81        ActiveRecord::Base.connection.borrow do 
     82          Thread.new do 
     83            ActiveRecord::Base.connection.borrow do 
     84              Thread.new do 
     85                ActiveRecord::Base.connection.borrow do 
     86                  Thread.new do 
     87                    ActiveRecord::Base.connection.borrow do 
     88                      Thread.new do 
     89                        # No more left in pool (5 are in use by threads) and no more can 
     90                        # be created, should throw ConnectionNotEstablished 
     91                        assert_raise(ActiveRecord::ConnectionNotEstablished) {Topic.find :first} 
     92                        assert_equal(5, ActiveRecord::Base.connection.pool_size) 
     93                      end.join 
     94                    end 
     95                  end.join 
     96                end 
     97              end.join 
     98            end 
     99          end.join 
     100        end 
     101      end.join 
     102    end             
     103    assert_equal(5, ActiveRecord::Base.connection.pool_size)                 
     104  end 
     105   
     106  def test_reaper 
     107    # Test that connections are released properly 
     108     
     109    # set reaper period to 3 seconds (default is 30) so we don't take forever to run the test 
     110    ActiveRecord::Base.pool_connections_reaper_period = 3 
     111    ActiveRecord::Base.connection.pool.reset 
     112    ActiveRecord::Base.connection.borrow do 
     113      Thread.new do 
     114        ActiveRecord::Base.connection.borrow do 
     115          Thread.new do 
     116            ActiveRecord::Base.connection.borrow do 
     117            end 
     118          end.join 
     119        end 
     120      end.join 
     121    end 
     122    assert_equal(3, ActiveRecord::Base.connection.pool_size) 
     123    sleep(3) 
     124     
     125    # No connection should have been closed on last run of reaper 
     126    assert_equal(3, ActiveRecord::Base.connection.pool_size) 
     127    ActiveRecord::Base.connection.borrow do 
     128      ActiveRecord::Base.connection.borrow do 
     129      end 
     130    end 
     131    sleep(3) 
     132     
     133    # One should have been closed on last run of reaper 
     134    assert_equal(2, ActiveRecord::Base.connection.pool_size) 
     135    ActiveRecord::Base.connection.borrow do 
     136    end 
     137    sleep(4) 
     138     
     139    # None should have been closed on last run of reaper 
     140    assert_equal(2,ActiveRecord::Base.connection.pool_size) 
     141  end 
     142   
     143  def test_transactions 
     144    # Test transactions 
     145    Topic.transaction do 
     146      t = Topic.find :first 
     147      t.content = "I changed the content!" 
     148      t.save 
     149       
     150      # If another thread accesses the Topic before the transaction was committed 
     151      # it should get the old value 
     152      Thread.new do 
     153        t2 = Topic.find :first 
     154        assert_equal(t2.content, "Have a nice day") 
     155      end.join       
     156    end         
     157  end   
     158     
     159end 
  • activerecord/lib/active_record/connection_adapters/abstract/connection_specification.rb

    old new  
    106106    # can be used as argument for establish_connection, for easy 
    107107    # re-establishing of the connection. 
    108108    def self.remove_connection(klass=self) 
    109       conn = @@defined_connections[klass] 
     109      conn = @@defined_connections[klass]       
    110110      @@defined_connections.delete(klass) 
    111111      active_connections[klass] = nil 
    112112      @connection = nil 
     
    115115 
    116116    # Set the connection for the class. 
    117117    def self.connection=(spec) 
    118       if spec.kind_of?(ActiveRecord::ConnectionAdapters::AbstractAdapter) 
     118      if spec.kind_of?(ActiveRecord::ConnectionAdapters::AbstractAdapter) or spec.kind_of?(ActiveRecord::ConnectionPool::Wrapper) 
    119119        active_connections[self] = spec 
    120120      elsif spec.kind_of?(ConnectionSpecification) 
    121121        self.connection = self.send(spec.adapter_method, spec.config) 
  • activerecord/lib/active_record/connection_adapters/abstract_adapter.rb

    old new  
    4444        @runtime = 0 
    4545        return rt 
    4646      end 
     47       
     48      def close 
     49        begin 
     50          @connection.close 
     51        rescue Exception => e 
     52          log_info("error while closing real connection: #{e.message}") 
     53          raise ActiveRecord::ActiveRecordError, "error while closing real connection: #{e.message}" 
     54        end 
     55      end 
    4756 
    4857      protected   
    4958        def log(sql, name) 
  • activerecord/lib/active_record/connection_pool.rb

    old new  
     1require 'monitor' 
     2 
     3module ActiveRecord 
     4 
     5  class Base 
     6   
     7    # Determines whether or not to use a connection pool to manage the actual connections.   
     8    # Is tested and applied on a class by class basis.  When set to true, @@allow_concurrency is  
     9    # effectively disregarded 
     10    cattr_accessor :pool_connections 
     11    @@pool_connections = false 
     12 
     13    cattr_accessor :connection_pool 
     14    @@connection_pool=nil 
     15     
     16    # Initial and minimum size of the connection pool.  Meaningful only when @@pool_connections is true 
     17    cattr_accessor :pool_connections_min 
     18    @@pool_connections_min = 0 
     19 
     20    # Maximum size of the connection pool.  Meaningful only when @@pool_connections is true 
     21    cattr_accessor :pool_connections_max 
     22    @@pool_connections_max = 0 
     23 
     24    # Whether or not a thread will wait for a pool connection to become available,  
     25    # or if ConnectionNotEstablished is thrown 
     26    cattr_accessor :pool_connections_wait 
     27    @@pool_connections_wait = true 
     28    
     29    # Period in seconds between sweeping for unused connections in the pool 
     30    cattr_accessor :pool_connections_reaper_period 
     31    @@pool_connections_reaper_period = 30 
     32     
     33    # alias adapter methods to get a ConnectionPool::Wrapper when pooling is enabled     
     34    class << self 
     35      #alias_method :mysql_connection_raw, :mysql_connection 
     36      adapter_methods = RAILS_CONNECTION_ADAPTERS.collect {|a| "#{a}_connection" } 
     37      adapter_methods.each do |am| 
     38        if method_defined?(am.to_sym) 
     39          alias_method "#{am}_raw".to_sym, am.to_sym 
     40          module_eval <<-EOM 
     41            def #{am}(*args) 
     42              if pool_connections 
     43                if @@connection_pool.nil? 
     44                  @@connection_pool = ConnectionPool.new(Proc.new {#{am}_raw(*args)}) 
     45                  @@connection_pool.startup 
     46                end                 
     47                return @@connection_pool.wrapper 
     48              else 
     49                return #{am}_raw(*args) 
     50              end               
     51            end 
     52          EOM 
     53        end 
     54      end 
     55       
     56      alias_method :remove_connection_raw, :remove_connection 
     57      def remove_connection(klass=self)       
     58        if @@connection_pool 
     59          @@connection_pool.shutdown 
     60          @@connection_pool = nil 
     61        end 
     62        remove_connection_raw(klass) 
     63      end 
     64    end 
     65  end 
     66 
     67  class ConnectionPool 
     68    include MonitorMixin 
     69     
     70    def initialize(prc) 
     71      super() 
     72      @prc = prc 
     73    end 
     74     
     75    def wrapper 
     76      Wrapper.new(self) 
     77    end 
     78     
     79    def startup 
     80      @min_size,@max_size = ActiveRecord::Base.pool_connections_min,ActiveRecord::Base.pool_connections_max 
     81      @available_connections = [] 
     82      @acquired_connections = [] 
     83      @min_size.times { @available_connections << @prc.call } 
     84      @cond_var ||= new_cond 
     85      @use_check = 0 
     86      @use_thread = Thread.new { reap_connections } 
     87    end 
     88     
     89    def shutdown 
     90      synchronize do 
     91        @cond_var.wait_until { @acquired_connections.size==0 } 
     92        # TODO - connection_adapters should have a close method to close the physical connection 
     93        #@available_connections.each do |c| 
     94        #  c.close           
     95        #end 
     96        @available_connections = nil         
     97      end 
     98      @use_thread.terminate 
     99    end     
     100     
     101    def reset 
     102      shutdown 
     103      startup 
     104    end 
     105     
     106    def current_size 
     107      return @available_connections.size + @acquired_connections.size 
     108    end 
     109     
     110    def reap_connections 
     111      Base.logger.debug("starting reaper thread") 
     112      loop do 
     113        sleep(ActiveRecord::Base.pool_connections_reaper_period) 
     114        synchronize do 
     115          Base.logger.debug("use_check is #{@use_check}") 
     116          if @use_check < current_size 
     117            num_to_free = current_size - @use_check 
     118            max_to_free = current_size-@min_size 
     119            num_to_free = max_to_free if num_to_free > max_to_free 
     120            Base.logger.debug("freeing #{num_to_free} connections")             
     121            @available_connections[0...num_to_free].each do |conn|               
     122              conn.close 
     123              @available_connections.delete(conn) 
     124            end 
     125          end 
     126        end 
     127        @use_check=0 
     128      end 
     129    end 
     130         
     131    def acquire_connection 
     132      conn = nil 
     133      synchronize do 
     134        loop do 
     135          if @available_connections.size > 0 
     136            conn = @available_connections.shift 
     137            break 
     138          elsif (@max_size > 0 and current_size < @max_size) or @max_size==0 
     139            conn = @prc.call 
     140            break 
     141          else 
     142            if ActiveRecord::Base.pool_connections_wait 
     143              @cond_var.wait 
     144            else 
     145              raise ActiveRecord::ConnectionNotEstablished 
     146            end 
     147          end   
     148        end 
     149        @acquired_connections << conn 
     150        @use_check=@acquired_connections.size if @acquired_connections.size > @use_check 
     151        Base.logger.debug("current_size=" + current_size.to_s + " max_size=" + @max_size.to_s)        
     152      end 
     153      conn 
     154    end 
     155     
     156    def release_connection(conn) 
     157      synchronize do 
     158        @acquired_connections.delete(conn) 
     159        @available_connections << conn 
     160        @cond_var.signal 
     161      end 
     162    end 
     163     
     164    # The Wrapper class is used as a proxy for the real connection adapter. 
     165    # All methods are forwarded to the underlying connections in the pool via 
     166    # the borrow_connection method, which ensures connections go back to the pool 
     167    # after the method is executed on the connection 
     168    class Wrapper 
     169      attr_reader :pool 
     170       
     171      # copy definition of transaction method from database_statements.rb so that internal transaction  
     172      # methods (begin_db_transaction, etc.) go through the wrapper 
     173      def transaction(start_db_transaction = true) 
     174        transaction_open = false 
     175        begin 
     176          if block_given? 
     177            if start_db_transaction 
     178              begin_db_transaction  
     179              transaction_open = true 
     180            end 
     181            yield 
     182          end 
     183        rescue Exception => database_transaction_rollback 
     184          if transaction_open 
     185            transaction_open = false 
     186            rollback_db_transaction 
     187          end 
     188          raise 
     189        end 
     190      ensure 
     191        commit_db_transaction if transaction_open 
     192      end 
     193       
     194      def initialize(pool) 
     195        @pool=pool 
     196      end 
     197       
     198      def indexes(table_name, name=nil) 
     199        borrow_connection {|c| c.indexes(table_name, name) } 
     200      end 
     201       
     202      def tables(name=nil) 
     203        borrow_connection {|c| c.tables(name) } 
     204      end 
     205       
     206      # Provides a real connection to a block and returns the value of the block. 
     207      # Primary purpose is the enforce proper use of pooled connections 
     208      def borrow_connection(&block) 
     209        result = nil  
     210        if block_given? 
     211          if Thread.current['activerecord_connection'].nil? 
     212            begin               
     213              Thread.current['activerecord_connection'] = @pool.acquire_connection 
     214              result = yield Thread.current['activerecord_connection'] 
     215            ensure 
     216              if !Thread.current['activerecord_connection'].nil? 
     217                @pool.release_connection(Thread.current['activerecord_connection']) 
     218                Thread.current['activerecord_connection'] = nil 
     219              end 
     220            end             
     221          else 
     222            result = yield Thread.current['activerecord_connection'] 
     223          end 
     224        end 
     225        result 
     226      end 
     227      alias_method :borrow, :borrow_connection 
     228 
     229      def begin_db_transaction 
     230        if Thread.current['activerecord_connection'].nil? 
     231          Thread.current['activerecord_connection'] = @pool.acquire_connection 
     232          Thread.current['activerecord_connection'].begin_db_transaction 
     233        end 
     234      end 
     235 
     236      def commit_db_transaction 
     237        if !Thread.current['activerecord_connection'].nil? 
     238          Thread.current['activerecord_connection'].commit_db_transaction 
     239          @pool.release_connection(Thread.current['activerecord_connection']) 
     240          Thread.current['activerecord_connection'] = nil 
     241        end 
     242      end 
     243 
     244      def rollback_db_transaction 
     245        if !Thread.current['activerecord_connection'].nil? 
     246          Thread.current['activerecord_connection'].rollback_db_transaction         
     247          @pool.release_connection(Thread.current['activerecord_connection']) 
     248          Thread.current['activerecord_connection'] = nil 
     249        end 
     250      end 
     251       
     252      def method_missing(method, *args, &proc) 
     253        borrow_connection do |conn| 
     254          conn.send(method, *args, &proc) 
     255        end 
     256      end 
     257       
     258      # For testability more than anything 
     259      def pool_size 
     260        return @pool.current_size 
     261      end 
     262       
     263      def shutdown_pool 
     264        @pool.shutdown 
     265      end 
     266    end 
     267     
     268  end   
     269end 
  • activerecord/lib/active_record.rb

    old new  
    7373  require "active_record/connection_adapters/#{adapter}_adapter" 
    7474end 
    7575 
     76require 'active_record/connection_pool' 
    7677require 'active_record/query_cache'