Ruby on Rails | Screencasts | Download | Documentation | Weblog | Community | Source

Changeset 4460

Show
Ignore:
Timestamp:
06/19/06 22:48:51 (4 years ago)
Author:
bitsweat
Message:

r4644@asus: jeremy | 2006-06-16 14:57:03 -0700
locking
r4645@asus: jeremy | 2006-06-17 12:41:30 -0700
missing reply fixture
r4646@asus: jeremy | 2006-06-19 13:05:23 -0700
Use a per-thread (rather than global) transaction mutex so you may execute concurrent transactions on separate connections.
r4647@asus: jeremy | 2006-06-19 13:07:23 -0700
PostgreSQL: introduce allow_concurrency option which determines whether to use blocking or asynchronous #execute. Adapters with blocking #execute will deadlock Ruby threads. The default value is ActiveRecord::Base.allow_concurrency.
r4648@asus: jeremy | 2006-06-19 13:08:40 -0700
Pass the default allow_concurrency when instantiating new connections.
r4649@asus: jeremy | 2006-06-19 13:11:12 -0700
Break out concurrent transaction tests and run them for PostgreSQLAdapter only (need to fork or system('some_test_script') for the other adapters)
r4650@asus: jeremy | 2006-06-19 13:42:48 -0700
Row locking. Provide a locking clause with the :lock finder option or true for the default "FOR UPDATE".
r4661@asus: jeremy | 2006-06-19 15:36:51 -0700
excise the junk mutex

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • trunk/activerecord/CHANGELOG

    r4459 r4460  
    11*SVN* 
     2 
     3* Row locking. Provide a locking clause with the :lock finder option or true for the default "FOR UPDATE". [Shugo Maeda] 
     4    # Obtain an exclusive lock on person 1 so we can safely increment visits. 
     5    Person.transaction do 
     6      # select * from people where id=1 for update 
     7      person = Person.find(1, :lock => true) 
     8      person.visits += 1 
     9      person.save! 
     10    end 
     11 
     12* PostgreSQL: introduce allow_concurrency option which determines whether to use blocking or asynchronous #execute. Adapters with blocking #execute will deadlock Ruby threads. The default value is ActiveRecord::Base.allow_concurrency. [Jeremy Kemper] 
     13 
     14* Use a per-thread (rather than global) transaction mutex so you may execute concurrent transactions on separate connections. [Jeremy Kemper] 
    215 
    316* Change AR::Base#to_param to return a String instead of a Fixnum. Closes #5320. [Nicholas Seckar] 
  • trunk/activerecord/lib/active_record/base.rb

    r4459 r4460  
    366366      #   include the joined columns. 
    367367      # * <tt>:readonly</tt>: Mark the returned records read-only so they cannot be saved or updated. 
     368      # * <tt>:lock</tt>: An SQL fragment like "FOR UPDATE" or "LOCK IN SHARE MODE". 
     369      #   :lock => true gives connection's default exclusive lock, usually "FOR UPDATE". 
    368370      # 
    369371      # Examples for find by id: 
     
    385387      #   Person.find(:all, :include => [ :account, :friends ]) 
    386388      #   Person.find(:all, :group => "category") 
     389      # 
     390      # Example for find with a lock. Imagine two concurrent transactions: 
     391      # each will read person.visits == 2, add 1 to it, and save, resulting 
     392      # in two saves of person.visits = 3.  By locking the row, the second 
     393      # transaction has to wait until the first is finished; we get the 
     394      # expected person.visits == 4. 
     395      #   Person.transaction do 
     396      #     person = Person.find(1, :lock => true) 
     397      #     person.visits += 1 
     398      #     person.save! 
     399      #   end 
    387400      def find(*args) 
    388401        options = extract_options_from_args!(args) 
     
    851864 
    852865        if f = method_scoping[:find] 
    853           f.assert_valid_keys([ :conditions, :joins, :select, :include, :from, :offset, :limit, :order, :readonly ]) 
     866          f.assert_valid_keys([ :conditions, :joins, :select, :include, :from, :offset, :limit, :order, :readonly, :lock ]) 
    854867          f[:readonly] = true if !f[:joins].blank? && !f.has_key?(:readonly) 
    855868        end 
     
    10291042          add_order!(sql, options[:order]) 
    10301043          add_limit!(sql, options, scope) 
     1044          add_lock!(sql, options, scope) 
    10311045 
    10321046          sql 
     
    10621076        def add_limit!(sql, options, scope = :auto) 
    10631077          scope = scope(:find) if :auto == scope 
    1064           if scope 
    1065             options[:limit]  ||= scope[:limit] 
    1066             options[:offset] ||= scope[:offset] 
    1067           end 
     1078          options = options.reverse_merge(:limit => scope[:limit], :offset => scope[:offset]) if scope 
    10681079          connection.add_limit_offset!(sql, options) 
     1080        end 
     1081 
     1082        # The optional scope argument is for the current :find scope. 
     1083        # The :lock option has precedence over a scoped :lock. 
     1084        def add_lock!(sql, options, scope = :auto) 
     1085          scope = scope(:find) if :auto == :scope 
     1086          options = options.reverse_merge(:lock => scope[:lock]) if scope 
     1087          connection.add_lock!(sql, options) 
    10691088        end 
    10701089 
     
    13621381 
    13631382        VALID_FIND_OPTIONS = [ :conditions, :include, :joins, :limit, :offset, 
    1364                                :order, :select, :readonly, :group, :from     
    1365          
     1383                               :order, :select, :readonly, :group, :from, :lock
     1384 
    13661385        def validate_find_options(options) #:nodoc: 
    13671386          options.assert_valid_keys(VALID_FIND_OPTIONS) 
    13681387        end 
    1369          
     1388 
    13701389        def set_readonly_option!(options) #:nodoc: 
    13711390          # Inherit :readonly from finder scope if set.  Otherwise, 
  • trunk/activerecord/lib/active_record/connection_adapters/abstract/connection_specification.rb

    r4078 r4460  
    249249        active_connections[name] = spec 
    250250      elsif spec.kind_of?(ConnectionSpecification) 
    251         self.connection = self.send(spec.adapter_method, spec.config) 
     251        config = spec.config.reverse_merge(:allow_concurrency => @@allow_concurrency) 
     252        self.connection = self.send(spec.adapter_method, config) 
    252253      elsif spec.nil? 
    253254        raise ConnectionNotEstablished 
  • trunk/activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb

    r2639 r4460  
    9292      end 
    9393 
     94      # Appends a locking clause to a SQL statement. *Modifies the +sql+ parameter*. 
     95      #   # SELECT * FROM suppliers FOR UPDATE 
     96      #   add_lock! 'SELECT * FROM suppliers', :lock => true 
     97      #   add_lock! 'SELECT * FROM suppliers', :lock => ' FOR UPDATE' 
     98      def add_lock!(sql, options) 
     99        case lock = options[:lock] 
     100          when true:   sql << ' FOR UPDATE' 
     101          when String: sql << " #{lock}" 
     102        end 
     103      end 
     104 
    94105      def default_sequence_name(table, column) 
    95106        nil 
  • trunk/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb

    r4401 r4460  
    4747    # * <tt>:encoding</tt> -- An optional client encoding that is using in a SET client_encoding TO <encoding> call on connection. 
    4848    # * <tt>:min_messages</tt> -- An optional client min messages that is using in a SET client_min_messages TO <min_messages> call on connection. 
     49    # * <tt>:allow_concurrency</tt> -- If true, use async query methods so Ruby threads don't deadlock; otherwise, use blocking query methods. 
    4950    class PostgreSQLAdapter < AbstractAdapter 
    5051      def adapter_name 
     
    5556        super(connection, logger) 
    5657        @config = config 
     58        @async = config[:allow_concurrency] 
    5759        configure_connection 
    5860      end 
     
    6870      # postgres-pr raises a NoMethodError when querying if no conn is available 
    6971      rescue PGError, NoMethodError 
    70         false       
     72        false 
    7173      end 
    7274 
     
    7981        end 
    8082      end 
    81        
     83 
    8284      def disconnect! 
    8385        # Both postgres and postgres-pr respond to :close 
     
    100102        } 
    101103      end 
    102        
     104 
    103105      def supports_migrations? 
    104106        true 
    105       end       
    106        
     107      end 
     108 
    107109      def table_alias_length 
    108110        63 
     
    142144 
    143145      def query(sql, name = nil) #:nodoc: 
    144         log(sql, name) { @connection.query(sql) } 
     146        log(sql, name) do 
     147          if @async 
     148            @connection.async_query(sql) 
     149          else 
     150            @connection.query(sql) 
     151          end 
     152        end 
    145153      end 
    146154 
    147155      def execute(sql, name = nil) #:nodoc: 
    148         log(sql, name) { @connection.exec(sql) } 
     156        log(sql, name) do 
     157          if @async 
     158            @connection.async_exec(sql) 
     159          else 
     160            @connection.exec(sql) 
     161          end 
     162        end 
    149163      end 
    150164 
     
    163177        execute "COMMIT" 
    164178      end 
    165        
     179 
    166180      def rollback_db_transaction #:nodoc: 
    167181        execute "ROLLBACK" 
     
    262276        # First try looking for a sequence with a dependency on the 
    263277        # given table's primary key. 
    264         result = execute(<<-end_sql, 'PK and serial sequence')[0] 
     278        result = query(<<-end_sql, 'PK and serial sequence')[0] 
    265279          SELECT attr.attname, name.nspname, seq.relname 
    266280          FROM pg_class      seq, 
     
    285299          # the 8.1+ nextval('foo'::regclass). 
    286300          # TODO: assumes sequence is in same schema as table. 
    287           result = execute(<<-end_sql, 'PK and custom sequence')[0] 
     301          result = query(<<-end_sql, 'PK and custom sequence')[0] 
    288302            SELECT attr.attname, name.nspname, split_part(def.adsrc, '\\\'', 2) 
    289303            FROM pg_class       t 
     
    306320        execute "ALTER TABLE #{name} RENAME TO #{new_name}" 
    307321      end 
    308              
     322 
    309323      def add_column(table_name, column_name, type, options = {}) 
    310324        execute("ALTER TABLE #{table_name} ADD #{column_name} #{type_to_sql(type, options[:limit])}") 
     
    326340        end 
    327341        change_column_default(table_name, column_name, options[:default]) unless options[:default].nil? 
    328       end       
     342      end 
    329343 
    330344      def change_column_default(table_name, column_name, default) #:nodoc: 
    331345        execute "ALTER TABLE #{table_name} ALTER COLUMN #{column_name} SET DEFAULT '#{default}'" 
    332346      end 
    333        
     347 
    334348      def rename_column(table_name, column_name, new_column_name) #:nodoc: 
    335349        execute "ALTER TABLE #{table_name} RENAME COLUMN #{column_name} TO #{new_column_name}" 
     
    380394              row.each_index do |cel_index| 
    381395                column = row[cel_index] 
    382                  
     396 
    383397                case res.type(cel_index) 
    384398                  when BYTEA_COLUMN_TYPE_OID 
     
    393407            end 
    394408          end 
     409          res.clear 
    395410          return rows 
    396411        end 
     
    443458          unescape_bytea(s) 
    444459        end 
    445          
     460 
    446461        # Query a table's column names, default values, and types. 
    447462        # 
     
    483498            when /^interval/i     then 'string' 
    484499            # geometric types (the line type is currently not implemented in postgresql) 
    485             when /^(?:point|lseg|box|"?path"?|polygon|circle)/i  then 'string'  
     500            when /^(?:point|lseg|box|"?path"?|polygon|circle)/i  then 'string' 
    486501            when /^bytea/i        then 'binary' 
    487502            else field_type       # Pass through standard types. 
     
    493508          return "t" if value =~ /true/i 
    494509          return "f" if value =~ /false/i 
    495            
     510 
    496511          # Char/String/Bytea type values 
    497512          return $1 if value =~ /^'(.*)'::(bpchar|text|character varying|bytea)$/ 
    498            
     513 
    499514          # Numeric values 
    500515          return value if value =~ /^-?[0-9]+(\.[0-9]*)?/ 
     
    502517          # Fixed dates / times 
    503518          return $1 if value =~ /^'(.+)'::(date|timestamp)/ 
    504            
     519 
    505520          # Anything else is blank, some user type, or some function 
    506521          # and we can't know the value of that, so return nil. 
  • trunk/activerecord/lib/active_record/connection_adapters/sqlite_adapter.rb

    r4419 r4460  
    185185 
    186186 
     187      # SELECT ... FOR UPDATE is redundant since the table is locked. 
     188      def add_lock!(sql, options) #:nodoc: 
     189        sql 
     190      end 
     191 
     192 
    187193      # SCHEMA STATEMENTS ======================================== 
    188194 
  • trunk/activerecord/lib/active_record/fixtures.rb

    r4392 r4460  
    520520            @@already_loaded_fixtures[self.class] = @loaded_fixtures 
    521521          end 
    522           ActiveRecord::Base.lock_mutex 
     522          ActiveRecord::Base.send :increment_open_transactions 
    523523          ActiveRecord::Base.connection.begin_db_transaction 
    524524 
     
    539539        if use_transactional_fixtures? 
    540540          ActiveRecord::Base.connection.rollback_db_transaction 
    541           ActiveRecord::Base.unlock_mutex 
     541          ActiveRecord::Base.send :decrement_open_transactions 
    542542        end 
    543543        ActiveRecord::Base.verify_active_connections! 
  • trunk/activerecord/lib/active_record/transactions.rb

    r4312 r4460  
    55module ActiveRecord 
    66  module Transactions # :nodoc: 
    7     TRANSACTION_MUTEX = Mutex.new 
    8  
    97    class TransactionError < ActiveRecordError # :nodoc: 
    108    end 
     
    8078      def transaction(*objects, &block) 
    8179        previous_handler = trap('TERM') { raise TransactionError, "Transaction aborted" } 
    82         lock_mutex 
    83          
     80        increment_open_transactions 
     81 
    8482        begin 
    8583          objects.each { |o| o.extend(Transaction::Simple) } 
     
    9492          raise 
    9593        ensure 
    96           unlock_mutex 
     94          decrement_open_transactions 
    9795          trap('TERM', previous_handler) 
    9896        end 
    9997      end 
    100        
    101       def lock_mutex#:nodoc: 
    102         Thread.current['open_transactions'] ||= 0 
    103         TRANSACTION_MUTEX.lock if Thread.current['open_transactions'] == 0 
    104         Thread.current['start_db_transaction'] = (Thread.current['open_transactions'] == 0) 
    105         Thread.current['open_transactions'] += 1 
    106       end 
    107        
    108       def unlock_mutex#:nodoc: 
    109         Thread.current['open_transactions'] -= 1 
    110         TRANSACTION_MUTEX.unlock if Thread.current['open_transactions'] == 0 
    111       end 
     98 
     99      private 
     100        def increment_open_transactions #:nodoc: 
     101          open = Thread.current['open_transactions'] ||= 0 
     102          Thread.current['start_db_transaction'] = open.zero? 
     103          Thread.current['open_transactions'] = open + 1 
     104        end 
     105 
     106        def decrement_open_transactions #:nodoc: 
     107          Thread.current['open_transactions'] -= 1 
     108        end 
    112109    end 
    113110 
  • trunk/activerecord/test/locking_test.rb

    r3422 r4460  
    33require 'fixtures/legacy_thing' 
    44 
    5 class LockingTest < Test::Unit::TestCase 
     5class OptimisticLockingTest < Test::Unit::TestCase 
    66  fixtures :people, :legacy_things 
    77 
     
    99    p1 = Person.find(1) 
    1010    p2 = Person.find(1) 
    11      
     11 
    1212    p1.first_name = "Michael" 
    1313    p1.save 
    14      
     14 
    1515    assert_raises(ActiveRecord::StaleObjectError) { 
    1616      p2.first_name = "should fail" 
     
    2525    p1.first_name = "Anika" 
    2626    p1.save 
    27      
     27 
    2828    assert_raises(ActiveRecord::StaleObjectError) { 
    2929      p2.first_name = "should fail" 
     
    3131    } 
    3232  end 
    33    
     33 
    3434  def test_lock_column_name_existing 
    3535    t1 = LegacyThing.find(1) 
     
    4242      t2.save 
    4343    } 
    44   end  
     44  end 
     45end 
    4546 
     47 
     48# TODO: test against the generated SQL since testing locking behavior itself 
     49# is so cumbersome. Will deadlock Ruby threads if the underlying db.execute 
     50# blocks, so separate script called by Kernel#system is needed. 
     51# (See exec vs. async_exec in the PostgreSQL adapter.) 
     52class PessimisticLockingTest < Test::Unit::TestCase 
     53  self.use_transactional_fixtures = false 
     54  fixtures :people 
     55 
     56  def setup 
     57    @allow_concurrency = ActiveRecord::Base.allow_concurrency 
     58    ActiveRecord::Base.allow_concurrency = true 
     59  end 
     60 
     61  def teardown 
     62    ActiveRecord::Base.allow_concurrency = @allow_concurrency 
     63  end 
     64 
     65  # Test that the adapter doesn't blow up on add_lock! 
     66  def test_sane_find_with_lock 
     67    assert_nothing_raised do 
     68      Person.transaction do 
     69        Person.find 1, :lock => true 
     70      end 
     71    end 
     72  end 
     73 
     74  # Test no-blowup for scoped lock. 
     75  def test_sane_find_with_lock 
     76    assert_nothing_raised do 
     77      Person.transaction do 
     78        Person.with_scope(:find => { :lock => true }) do 
     79          Person.find 1 
     80        end 
     81      end 
     82    end 
     83  end 
     84 
     85  if current_adapter?(:PostgreSQLAdapter) 
     86    def test_no_locks_no_wait 
     87      first, second = duel { Person.find 1 } 
     88      assert first.end > second.end 
     89    end 
     90 
     91    def test_second_lock_waits 
     92      first, second = duel { Person.find 1, :lock => true } 
     93      assert second.end > first.end 
     94    end 
     95 
     96    protected 
     97      def duel(zzz = 0.2) 
     98        t0, t1, t2, t3 = nil, nil, nil, nil 
     99 
     100        a = Thread.new do 
     101          t0 = Time.now 
     102          Person.transaction do 
     103            yield 
     104            sleep zzz       # block thread 2 for zzz seconds 
     105          end 
     106          t1 = Time.now 
     107        end 
     108 
     109        b = Thread.new do 
     110          sleep zzz / 2.0   # ensure thread 1 tx starts first 
     111          t2 = Time.now 
     112          Person.transaction { yield } 
     113          t3 = Time.now 
     114        end 
     115 
     116        a.join 
     117        b.join 
     118 
     119        assert t1 > t0 + zzz 
     120        assert t2 > t0 
     121        assert t3 > t2 
     122        [t0.to_f..t1.to_f, t2.to_f..t3.to_f] 
     123      end 
     124  end 
    46125end 
  • trunk/activerecord/test/threaded_connections_test.rb

    r4291 r4460  
    11require 'abstract_unit' 
    22require 'fixtures/topic' 
     3require 'fixtures/reply' 
    34 
    45unless %w(FrontBase).include? ActiveRecord::Base.connection.adapter_name 
  • trunk/activerecord/test/transactions_test.rb

    r4291 r4460  
    66class TransactionTest < Test::Unit::TestCase 
    77  self.use_transactional_fixtures = false 
    8  
    98  fixtures :topics, :developers 
    109 
    1110  def setup 
    12     # sqlite does not seem to return these in the right order, so we sort them 
    13     # explicitly for sqlite's sake. sqlite3 does fine. 
    1411    @first, @second = Topic.find(1, 2).sort_by { |t| t.id } 
    1512  end 
     
    138135  end 
    139136 
    140   # This will cause transactions to overlap and fail unless they are 
    141   # performed on separate database connections. 
    142   def test_transaction_per_thread 
    143     assert_nothing_raised do 
    144       threads = (1..20).map do 
    145         Thread.new do 
    146           Topic.transaction do 
    147             topic = Topic.find(:first) 
    148             topic.approved = !topic.approved? 
    149             topic.save! 
    150             topic.approved = !topic.approved? 
    151             topic.save! 
    152           end 
    153         end 
    154       end 
    155  
    156       threads.each { |t| t.join } 
    157     end 
    158   end 
    159  
    160   # Test for dirty reads among simultaneous transactions. 
    161   def test_transaction_isolation__read_committed 
    162     # Should be invariant. 
    163     original_salary = Developer.find(1).salary 
    164     temporary_salary = 200000 
    165  
    166     assert_nothing_raised do 
    167       threads = (1..20).map do 
    168         Thread.new do 
    169           Developer.transaction do 
    170             # Expect original salary. 
    171             dev = Developer.find(1) 
    172             assert_equal original_salary, dev.salary 
    173  
    174             dev.salary = temporary_salary 
    175             dev.save! 
    176  
    177             # Expect temporary salary. 
    178             dev = Developer.find(1) 
    179             assert_equal temporary_salary, dev.salary 
    180  
    181             dev.salary = original_salary 
    182             dev.save! 
    183  
    184             # Expect original salary. 
    185             dev = Developer.find(1) 
    186             assert_equal original_salary, dev.salary 
    187           end 
    188         end 
    189       end 
    190  
    191       # Keep our eyes peeled. 
    192       threads << Thread.new do 
    193         10.times do 
    194           sleep 0.05 
    195           Developer.transaction do 
    196             # Always expect original salary. 
    197             assert_equal original_salary, Developer.find(1).salary 
    198           end 
    199         end 
    200       end 
    201  
    202       threads.each { |t| t.join } 
    203     end 
    204  
    205     assert_equal original_salary, Developer.find(1).salary 
    206   end 
    207  
    208  
    209137  private 
    210138    def add_exception_raising_after_save_callback_to_topic 
     
    216144    end 
    217145end 
     146 
     147if current_adapter?(:PostgreSQLAdapter) 
     148  class ConcurrentTransactionTest < TransactionTest 
     149    def setup 
     150      @allow_concurrency = ActiveRecord::Base.allow_concurrency 
     151      ActiveRecord::Base.allow_concurrency = true 
     152      super 
     153    end 
     154 
     155    def teardown 
     156      super 
     157      ActiveRecord::Base.allow_concurrency = @allow_concurrency 
     158    end 
     159 
     160    # This will cause transactions to overlap and fail unless they are performed on 
     161    # separate database connections. 
     162    def test_transaction_per_thread 
     163      assert_nothing_raised do 
     164        threads = (1..3).map do 
     165          Thread.new do 
     166            Topic.transaction do 
     167              topic = Topic.find(1) 
     168              topic.approved = !topic.approved? 
     169              topic.save! 
     170              topic.approved = !topic.approved? 
     171              topic.save! 
     172            end 
     173          end 
     174        end 
     175 
     176        threads.each { |t| t.join } 
     177      end 
     178    end 
     179 
     180    # Test for dirty reads among simultaneous transactions. 
     181    def test_transaction_isolation__read_committed 
     182      # Should be invariant. 
     183      original_salary = Developer.find(1).salary 
     184      temporary_salary = 200000 
     185 
     186      assert_nothing_raised do 
     187        threads = (1..3).map do 
     188          Thread.new do 
     189            Developer.transaction do 
     190              # Expect original salary. 
     191              dev = Developer.find(1) 
     192              assert_equal original_salary, dev.salary 
     193 
     194              dev.salary = temporary_salary 
     195              dev.save! 
     196 
     197              # Expect temporary salary. 
     198              dev = Developer.find(1) 
     199              assert_equal temporary_salary, dev.salary 
     200 
     201              dev.salary = original_salary 
     202              dev.save! 
     203 
     204              # Expect original salary. 
     205              dev = Developer.find(1) 
     206              assert_equal original_salary, dev.salary 
     207            end 
     208          end 
     209        end 
     210 
     211        # Keep our eyes peeled. 
     212        threads << Thread.new do 
     213          10.times do 
     214            sleep 0.05 
     215            Developer.transaction do 
     216              # Always expect original salary. 
     217              assert_equal original_salary, Developer.find(1).salary 
     218            end 
     219          end 
     220        end 
     221 
     222        threads.each { |t| t.join } 
     223      end 
     224 
     225      assert_equal original_salary, Developer.find(1).salary 
     226    end 
     227  end 
     228end