Inside Capistrano: the Command abstraction
Capistrano really is the poster child for Net::SSH. In the last “Inside Capistrano” article (the Gateway implementation) I talked about Capistrano’s use of Net::SSH’s port forwarding feature. This time around, I’d like to focus on how Capistrano uses Net::SSH to execute a single command in parallel on multiple hosts.
For now, we’re just going to skip past all the magic in Capistrano::Actor that manages the connections to the servers. (We’ll discuss that another time.) We’ll jump straight to Capistrano::Command, located in capistrano/command.rb. It acceps five arguments: a list of named servers, a command to be executed, a Proc instance to act as a callback for any output from the servers, a hash of options, and a reference to the Actor instance that requested the command. (Whew!)
The initialization is pretty straightforward:
1 2 3 4 5 6 7 8 |
def initialize(servers, command, callback, options, actor) @servers = servers @command = command.strip.gsub(/\r?\n/, "\\\n") @callback = callback @options = options @actor = actor @channels = open_channels end |
The most significant part of the initialization is the call to open_channels
. For those of you unfamiliar with Net::SSH, every interaction with a remote host is encapsulated in a channel. Each connection can have multiple channels open simultaneously; it is this feature that lets you have multiple forwarded ports going over the same connection you are using to interact with your shell on the remote host. (Try doing that with telnet!)
Thus, in order to execute a command on the remote hosts, we need to open a channel for the command on each host. The open_channels
method does just this. It’s not a complicated method, but if you aren’t familiar with Net::SSH, it might appear a little daunting with all the callbacks involved. We’ll break it up and take it a piece at a time.
First, we just iterate over each server, using map
to return an array of channel objects that correspond to the servers. (We use the actor instance here to get at the actual Net::SSH sessions for each named server, so we can open those channels. It is assumed that each connection has been established previously.)
1 2 3 4 5 6 7 |
def open_channels @servers.map do |server| @actor.sessions[server].open_channel do |channel| ... end end end |
For each new channel, we do a bit of set up:
1 2 3 |
channel[:host] = server channel[:actor] = @actor channel.request_pty :want_reply => true |
Every channel instance can be treated as a hash, so you can store custom data in it for later access. Here, we’re storing the name of the server the channel is connected to, as well as the actor reference (so we can use it in the callback). Then, we tell the remote host that we want to allocate a pty for this connection.
With that out of the way, we set up some callbacks to handle different channel events. These are detailed below, with a bit of commentary:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# The on_success handler is called when the server # responds to our request_pty message, but only if # a pty was allocated. We use this opportunity to send # the actual command to the server, along with any data # that should be piped to it on stdin. channel.on_success do |ch| ch.exec command ch.send_data options[:data] if options[:data] end # Just as on_success is called when the server was able # to allocate a pty, on_failure is called when it can't. # In that case, we log a message and move on. channel.on_failure do |ch| logger.important "could not open channel", ch[:host] ch.close end # Any time the remote command emits data on its stdout, # Net::SSH will call the channel's on_data callback. We # delegate to the callback hook given when the Command # was instantiated. channel.on_data do |ch, data| @callback[ch, :out, data] if @callback end # Stderr (and any other, non-stdout data) gets sent to # the on_extended_data hook. We treat it all as stderr # and delegate it to the primary callback. channel.on_extended_data do |ch, type, data| @callback[ch, :err, data] if @callback end # The on_request hook is used for most other kinds of # response from the server. All we care about is the # 'exit-status' reply, which we use to determine whether # or not the command completed successfully. channel.on_request do |ch, request, reply, data| ch[:status] = data.read_long if request == "exit-status" end # When the command finishes, the on_close hook is called. # We set a flag here that let's us easily query whether # the channel is still active or not. channel.on_close do |ch| ch[:closed] = true end |
Alright! The channels are all ready for us now, and we can proceed with executing the command. This occurs in the process!
method, which has a bit of Net::SSH magic in it so that each channel is processed in parallel.
Each Net::SSH connection is event-driven, and as such requires an event loop to be running. Net::SSH gives you a method for running an event loop on a single connection (called “loop”), but if we want to drive multiple connections simultaneously, we need to implement our own event loop. That’s what the process!
method does.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
def process! # First, we mark the current time. This is so that we # can ping each connection every second, so that # long-running commands don't result in the connection # timing out. since = Time.now # This begins the event loop... loop do # This indicates how many channels are still active. # When there are no more active channels, we can # terminate the event loop. active = 0 # For every active channel, have it's associated # connection process any pending events. (The 'true' # parameter tells the poll not to block, if no # events are pending.) @channels.each do |ch| next if ch[:closed] active += 1 ch.connection.process(true) end # If there aren't any active channels, break out of # the loop break if active == 0 # If it has been at least a second since the last # ping, ping every connection. Note that we ping # whether the channel is active or not, since the # connection itself IS, and we don't want it timing # out just because one of the other channels is # lagging behind the others. if Time.now - since >= 1 since = Time.now @channels.each { |ch| ch.connection.ping! } end # a brief respite, to keep the CPU from going crazy sleep 0.01 end # If any command terminated with a non-zero exit # status, then we raise an exception. Ultimately, # Capistrano::Actor will catch that exception and try # to rollback the current task (if a rollback handler # is defined for it.) if failed = @channels.detect { |ch| ch[:status] != 0 } raise "command #{@command.inspect} failed on #{failed[:host]}" end self end |
When the command terminates, control reverts to the caller (the Capistrano::Actor instance). As you can see, there really isn’t that much to it—it just requires that we do a bit of manual labor to set up that custom event loop.
As with the Gateway code, you could probably mock up an actor instance and use the Command code independently of Capistrano, but it wasn’t really designed with that in mind. Still, it should provide plenty of inspiration for your own Net::SSH scripts.
If you’d like to learn more about Net::SSH, the manual is a good place to start.