Ruby on Rails | Screencasts | Download | Documentation | Weblog | Community | Source

Changeset 5027

Show
Ignore:
Timestamp:
09/05/06 22:57:29 (2 years ago)
Author:
minam
Message:

Connect to multiple servers in parallel, rather than serially.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • tools/capistrano/CHANGELOG

    r4830 r5027  
    11*SVN* 
     2 
     3* Connect to multiple servers in parallel, rather than serially. [Jamis Buck] 
    24 
    35* Add SCM module for Mercurial (closes #4150) [Matthew Elder] 
  • tools/capistrano/lib/capistrano/actor.rb

    r4566 r5027  
    464464      def establish_connections(servers) 
    465465        @factory = establish_gateway if needs_gateway? 
    466         servers.each do |server| 
    467           @sessions[server] ||= @factory.connect_to(server) 
    468         end 
     466        servers = Array(servers) 
     467 
     468        # because Net::SSH uses lazy loading for things, we need to make sure 
     469        # that at least one connection has been made successfully, to kind of 
     470        # "prime the pump", before we go gung-ho and do mass connection in 
     471        # parallel. Otherwise, the threads start doing things in wierd orders 
     472        # and causing Net::SSH to die of confusion. 
     473 
     474        if !@establish_gateway && @sessions.empty? 
     475          server, servers = servers.first, servers[1..-1] 
     476          @sessions[server] = @factory.connect_to(server) 
     477        end 
     478 
     479        servers.map { |server| 
     480          Thread.new { @sessions[server] ||= @factory.connect_to(server) } 
     481        }.each { |t| t.join } 
    469482      end 
    470483 
  • tools/capistrano/lib/capistrano/gateway.rb

    r3787 r5027  
    3232      @config = config 
    3333      @pending_forward_requests = {} 
    34       @mutex = Mutex.new 
    3534      @next_port = MAX_PORT 
    3635      @terminate_thread = false 
     36      @port_guard = Mutex.new 
    3737 
     38      mutex = Mutex.new 
    3839      waiter = ConditionVariable.new 
    3940 
     
    4243        SSH.connect(server, @config) do |@session| 
    4344          @config.logger.trace "gateway connection established" 
    44           @mutex.synchronize { waiter.signal } 
    45           connection = @session.registry[:connection][:driver] 
    46           loop do 
    47             break if @terminate_thread 
    48             sleep 0.1 unless connection.reader_ready? 
    49             connection.process true 
    50             Thread.new { process_next_pending_connection_request } 
    51           end 
     45          mutex.synchronize { waiter.signal } 
     46          @session.loop { !@terminate_thread } 
    5247        end 
    5348      end 
    5449 
    55       @mutex.synchronize { waiter.wait(@mutex) } 
     50      mutex.synchronize { waiter.wait(mutex) } 
    5651    end 
    5752 
     
    7469    # Net::SSH connection via that port. 
    7570    def connect_to(server) 
    76       @mutex.synchronize do 
    77         @pending_forward_requests[server] = ConditionVariable.new 
    78         @pending_forward_requests[server].wait(@mutex) 
    79         @pending_forward_requests.delete(server) 
     71      connection = nil 
     72 
     73      thread = Thread.new do 
     74        @config.logger.trace "establishing connection to #{server} via gateway" 
     75        port = next_port 
     76 
     77        begin 
     78          @session.forward.local(port, server, 22) 
     79          connection = SSH.connect('127.0.0.1', @config, port) 
     80          @config.logger.trace "connection to #{server} via gateway established" 
     81        rescue Errno::EADDRINUSE 
     82          port = next_port 
     83          retry 
     84        rescue Exception => e 
     85          puts e.class.name 
     86          puts e.backtrace.join("\n") 
     87        end 
    8088      end 
     89 
     90      thread.join 
     91      connection or raise "Could not establish connection to #{server}" 
    8192    end 
    8293 
     
    8495 
    8596      def next_port 
    86         port = @next_port 
    87         @next_port -= 1 
    88         @next_port = MAX_PORT if @next_port < MIN_PORT 
    89         port 
    90       end 
    91  
    92       def process_next_pending_connection_request 
    93         @mutex.synchronize do 
    94           key = @pending_forward_requests.keys.detect { |k| ConditionVariable === @pending_forward_requests[k] } or return 
    95           var = @pending_forward_requests[key] 
    96  
    97           @config.logger.trace "establishing connection to #{key} via gateway" 
    98  
    99           port = next_port 
    100  
    101           begin 
    102             @session.forward.local(port, key, 22) 
    103             @pending_forward_requests[key] = SSH.connect('127.0.0.1', @config, 
    104               port) 
    105             @config.logger.trace "connection to #{key} via gateway established" 
    106           rescue Errno::EADDRINUSE 
    107             port = next_port 
    108             retry 
    109           rescue Object 
    110             @pending_forward_requests[key] = nil 
    111             raise 
    112           ensure 
    113             var.signal 
    114           end 
     97        @port_guard.synchronize do 
     98          port = @next_port 
     99          @next_port -= 1 
     100          @next_port = MAX_PORT if @next_port < MIN_PORT 
     101          port 
    115102        end 
    116103      end