ANDY: A general, fault-tolerant tool for database
searching on computer clusters.
Extended Presentation and Documentation.
Abstract
ANDY is
a set of Perl programs and modules for easily running large biological database
searches, and similar applications, over the nodes of a Linux/Unix computer
cluster in order to achieve speedup through parallelization. Users specify
their cluster runs through a straightforward XML configuration file, in which
they specify a template for the pipeline or sequence of commands they want to
run on the nodes, as well as the locations and types of data sources for the
command lines and other information. The tool works directly with Distributed
Resource Management ("DRM") systems, such as GridEngine and PBS, and
is easily extensible to different DRMs simply by writing a small DRM-specific
module for submitting and monitoring jobs. Cluster runs can be done in both
dedicated mode, where nodes are held and used until all tasks complete, or in
fair mode, where each submitted job does only a specified amount of computation
and then exits, allowing fair use of the cluster where other users' jobs can be
interspersed among ANDY jobs. For efficiency, support is provided for
(optional) piping of file input and output of commands through named pipes and
buffering in memory, in order to avoid the possible performance hit of slow
disk I/O. The tool is fault tolerant, checking that all jobs and tasks complete
and resubmitting failed jobs and tasks until the run is complete. Users can
provide their own application-specific error checking simply by adding error
checking commands into the sequence or pipeline of commands, and the tool will
detect non-zero exit statuses and flag error, causing the server to retry; users
can similarly provide for succinct summaries of raw program output to be
created, which can minimize communication with the server to give better
performance. The tool also allows a command pipeline to be run at the server
over the course of a run into which the results from node jobs are piped,
allowing global summaries to be created. We also provide a set of concrete
client and server side summarization and error checking routines for key
bioinformatics programs such as BLAST, FASTA, and ssearch. The tool thus
provides an infrastructure for generalized, distributed, fault tolerant,
high-performance command pipelines over a compute cluster. The tool has been
widely used in support of research in the Brenner computational biology lab at
UC Berkeley, and we are making it publicly available at http://compbio.berkeley.edu/proj/andy/.
Introduction
A common
task for computational biologists is searching, using programs such as BLAST,
ssearch and FASTA, sequences against a database of other sequences (or a
database against itself) to find similarities suggestive of homology. Such
searches are being done more often and on a larger scale as genome sequencing
and related projects generate ever more sequence data. Many organizations are
acquiring computer clusters in order to run such searches more efficiently in
parallel over the many nodes of the cluster. However, while most researchers
can run their database search jobs themselves on a single machine, it is not
trivial running such jobs in parallel over a cluster in a fault tolerant way.
Some solutions exist for specific search programs, e.g. TurboBLAST is a product
sold by the company TurboGenomics that is a version of the NCBI BLAST program
for running in parallel over clusters. More generally, tools such as Disperse
and WRAPID allow users to specify a database search command line and have it
run in parallel over a cluster, and ANDY is most similar to these tools but
provides additional features and enhancements.
On a
cluster managed by a DRM, users submit jobs that they want to run on a cluster
through the DRM interface, and the DRM controls where and when jobs will be run
(jobs are placed in "queues" while waiting to run) and maintains
fairness and efficiency of resource allocation. Commonly used DRMs include
Platform LSF, Open PBS/PBS Pro, and Sun GridEngine. The Brenner lab possesses a
64 node Linux cluster and we have run Sun GridEngine and PBS Pro on it. We also
had access to the Berkeley Millennium campus cluster running Ganglia/gexec and
the Alvarez cluster at Lawrence Berkeley National Lab running PBS Pro, and we
wanted to be able to seamlessly use these as additional sources of compute
power. We needed a tool which would easily (almost as easy as running on a
single machine), efficiently (to achieve speedup through parallelization),
fairly (so users would not "hog" all of the cluster resources while
running their jobs), and seamlessly allow biological database search jobs to be
run over our cluster and other clusters to which we had access through the DRM.
In addition, it was important that the tool maintain fault tolerance, i.e.
checking that all individual sequence searches complete successfully and
redoing failed searches.
We have
built such a tool in Perl for Linux/Unix clusters, and we call it ANDY. Users
specify their cluster runs by creating an XML configuration file in which they
describe a parameterized template for the sequence or pipeline of commands they
want to run using common Unix shell syntax, the locations and types (e.g. FASTA
file, directory of files, list of values, etc.) of data sources which will give
concrete values to the command template parameters to create instantiated
commands to be run on the cluster nodes, and various other configuration
information. The data sources are iterated over to generate all combinations of
data source values, and each combination provides a set of concrete values for
the command template's parameters and is a single task to be done.
Users
then execute the ANDY cluster tool server on their configuration file from the
cluster head node, which submits ANDY client jobs to the cluster nodes, and
then sends tasks to and receives results from connecting ANDY clients; the
server also handles fault tolerance, monitoring jobs and resubmitting failed
jobs, and redispersing tasks that failed (up to some user-specified number of
times) to be tried again, not exiting until all tasks are complete. If
specified by the user, the server will also pipe results from clients into a
server summary command which executes during the entire run. To submit and
monitor client jobs the server uses a DRM-specific module, the only code that
is specific to the DRM being used. Submitted clients are scheduled to nodes by
the DRM, and when they start up they first contact the server to request
run-specific configuration information (the command template, environment
variables to set, what results should be sent back to the server, etc.) They
then enter a loop where they continually (until the server says to exit or they
lose contact with the server) request a task to do from the server, then
instantiate the command template based on the received task values, execute the
instantiated commands, and send results (or that error occurred) back to the
server. As part of fault tolerance, clients also periodically send
"ping" messages to the server to let it know they are still alive.
Task
values can be input to commands in four ways: (1) raw (the value is directly
interpolated into the command line), (2) file (the value is saved into a file
on disk and the full path to this file is interpolated into the command line);
where applicable and for performance to avoid potentially costly disk I/O, (3)
named pipe (the full path to a named pipe is interpolated into the command
line, and the value is written into the named pipe), and (4) memory buffer (the
value is written into a named pipe, and a separate buffering process reads from
this named pipe, buffering it in memory as necessary, and writes what it reads
to one or more other named pipes whose full paths have been interpolated into
the command line). Inter-command communication can be through files (one
command writes a file and proceeding one(s) read it) and standard out piping,
but also through named pipes (one command writes to a named pipe and another
proceeding one reads it) and memory buffers (one command
writes
to a named pipe, a separate buffering process reads from this named pipe,
buffering it in memory as necessary, and writes what it reads to one or more
other named pipes). Command output can also similarly be through files, named
pipes, and memory buffers.
Memory
buffers thus support a "multi" tee, but they are also meant to be
used in situations where direct use of named pipes would cause deadlock but the
use of named pipes is still desired (e.g. command 1 writes streams A and B in
that order, and command 2 reads streams B and A in that order; direct use of
named pipes here and in analogous cases would cause deadlock, but with memory buffers
the intervening buffering process will completely buffer A, then read B and
write it so that command 2 can read it, and finally write A for command 2 to
read). Of course, memory buffers cannot be used in situations where command
output exceeds available memory --- the user must know these cases and either
specify file output for them, or, if possible, rewrite commands to be able to
read and write output in an order that will avoid blowing up memory with memory
buffers or deadlocking with named pipes.
The ANDY
cluster tool was thus designed to work directly with DRMs and to be easily
extensible for use on clusters running any DRM software by simply writing a
small DRM-specific module with implementations of a job-submitting subroutine
and a fault-checking subroutine. The majority of the ANDY cluster tool code is
in the core, which can be used unchanged across different clusters and DRMs.
This is the key difference and advantage of the ANDY cluster tool over other
similar systems such as Disperse and WRAPID: it is built to work easily with
DRMs that are already in place rather than replacing them as Disperse and
WRAPID do, i.e. it decouples the parallelizing of commands from the DRM
functionality. This approach seems to be the more logical one: other research
and corporate efforts have extensively investigated cluster distributed
resource management and there are many powerful, full-featured systems
available, such as Sun GridEngine, PBS Pro, and Platform LSF. Rather than
reinventing the wheel in a simplistic and incomplete way by integrating limited
DRM functionality into a cluster biological database search tool, we should
instead leave the DRM functionality to the pros at Sun, Platform, PBS, etc. and
concentrate on building systems that can easily and flexibly work with these
DRMs and take advantage of their many features. This is the approach taken by
the ANDY cluster tool.
In any
case, as a practical matter most clusters, especially in the life sciences, are
managed using DRMs such as PBS and Platform LSF and tools such as ANDY that
work seamlessly and unobtrusively with them should be welcome. It is also worth
pointing out that, in the absence of a proper DRM, the tool could still be used
by writing a DRM-specific module that effectively encodes the basic, necessary
job starting and monitoring functionality (e.g., using rsh or ssh, and ps).
Other key advantages of the ANDY cluster tool over similar systems such as
Disperse and WRAPID include ANDY's support for input and output through named
pipes and memory buffers for performance and multi teeing, its support for
server side code for summarization in addition to client code running on the
nodes, its support for summarization and error checking simply as additional
user commands in a command sequence/pipeline (and its inclusion of concrete
summary and error checking routines for important bioinformatics applications),
and its fault tolerance.
The
remainder of this paper is organized as follows. The following two sections
will cover the command template and XML configuration file formats, giving
details of how key aspects of the tool, such as data source iterators, fault
tolerance, and command instantiation, work as part of the discussion. The next
section will then describe how the tool can be extended to a new DRM by writing
a DRM-specific module. The section after that will give several concrete
examples, showing configuration files and command templates for commonly used
bioinformatics applications such as BLAST and ssearch, and giving some performance
results. The final section will discuss some future development directions for
the tool and conclude the paper.
Command Template Format
The
commands to be run on the cluster nodes (client code) and the head node (server
code) are specified using basic Unix shell syntax with two simple enhancements.
First, since what is being specified is a template for the commands to be run
there needs to be a way to parameterize, and this is done with variables. A
variable is specified as some text with two underscore characters at the front
and back, e.g. __INPUT_SEQUENCE__. While the text of the variable can contain
any characters, it is good practice to highlight variables by only using all
caps and numbers, e.g. __FORMATTED_DATABASE instead of __formatted_database__.
Second, there needs to be a way to separate client code from server code, and
this is done with the triple-bar symbol |||
or the triple ampersand symbol &&&.
With this in mind, here is a detailed example of an ANDY command template:
client_command1
-i __IN__ -a __A_VAL__ -O __OUT__ 2> __ERR__ &
client_err_check1
-i __OUT__ -e __ERR__ -s __EC[0,0]__ &
client_sum1
< __OUT__ > __SUM1__ ;
prepare_input
< __IN2__ |
client_command2
> __OUT2__ 2> __ERR2__ &
client_err_check2
-i __OUT2__ -e __ERR2__ -s1 __EC[1,0]__ -s2 __EC[1,1]__ &
client_sum2
-i __OUT2__ -o __SUM2__ &&&
server_sum_command
-i1 __SUM1__ -i2 __SUM2__ -o __RESULTS_DIR__global_sum.txt
Except
for the variables and the &&&
symbol, all of this is simply common Unix shell
syntax. Commands are separated by &,
|, ;, &&&, or |||.
& means to run concurrently, but
not pass the standard out channel to the next command's standard in channel,
e.g. in "A & B" A and B will each run concurrently but A's
standard out will not be passed to B's standard in. | means run concurrently and do pass standard out to the next
command's standard in, e.g. in "A | B" A's standard out will be piped
in as B's standard in. ; means
sequence commands, e.g. in "A ; B" A will be run to completion, and then
B will be run after A completes (standard out of A will not be piped to
standard in of A). The &&&
symbol above separates the client code from the server code, i.e. everything to
the left of the &&& is
commands to be run on clients and everything to the right of the &&& is commands to be run
on the server. ||| is exactly the
same as &&&, except that
the standard out of the command right before ||| (which will be run on some client) will be piped into the
standard in of the command right after the |||
(which will be running on the server). Note that server code is optional --- if
there is no &&& or ||| then all code is client code. Currently,
only &&& is supported,
but ||| can be achieved indirectly
via PIPEs. The ;
symbol can be thought of as separating commands into distinct concurrent
execution units called pipe segments, e.g. the client code in the above example
has two pipe segments while the server code has just one pipe segment. Note
that server code is limited to one pipe segment, while there is no limit on the
number of pipe segments in client code.
The
variables are where task values are input to and results read from commands,
and they are also inter-command communication channels. Client side variables
can be of type raw, file, pipe, or mem_buf, and server side variables can be of
type pipe or mem_buf (except for __RESULTS_DIR__ above, which is a special case
and is described below). There is an asymmetry between client code and server
code: client code will be independently executed many times on different nodes
for different tasks, but there is only one head node on which the server code
will be executed. The server code is not executed once per task, but rather is
started at the beginning of a ANDY run and runs
continuously
until complete. It should not be necessary to run any code on the server per
task --- if a user needs to do this then such code should properly be run on
the client and the results sent to the server. Variables that appear both in
client code and server code are assumed to be result output of client commands,
and the output of these variables will be received atomically from clients by
the server and piped as input to the running server code (i.e. a single
client's complete results will be received, and passed into the server code,
before another client's results are accepted; thus there will be no intermingling
of individual task results from different clients as input to the server code).
Thus, server side variables are limited to type pipe and mem_buf, and there can
be only one server side pipe segment, because server code must execute through
the entire run and have client results piped in continuously without any break
in execution. There can also be no output variables in server code --- since
the server code is the end of the distributed command pipeline it should simply
write final results into permanent storage such as files, databases, etc.; the
variable __RESULTS_DIR__ in the above example is a special variable which is
interpolated to the full path to a directory on the server where the server
code can write results.
The
variables of the form __EC[M,N]__, where M and N are numbers, are also special
cases, and stand for the exit code of command number N of pipe segment M (this
command must be before the command in which __EC[M,N]__ appears). The ANDY
clients running the commands on the cluster nodes will replace these with the
full paths to named pipes, and will write the exit status codes of the
referenced commands into them. This is used for fault tolerance, and the
commands in which __EC[M,N]__ variables appear should be error checking commands;
the ANDY clients running the commands will check THEIR exit status codes and,
if any are nonzero, will flag error to the server, which will then retry the
task that generated the command. The usual protocol for Unix commands is to
return zero on success and nonzero on failure, and if all commands follow this
protocol then __EC[M,N]__ variables are not needed. In this case, if no
commands contain __EC[M,N]__ variables, then the ANDY client will check the
exit status codes of ALL commands, and flag error to the server if any are
nonzero which will cause the server to retry the task that generated the
commands. Of course, it is possible that a task that generated an error is
simply a bad task that cannot succeed, and we would not want to keep continuing
such a task forever; thus, the user specifies in the configuration file the
maximum number of times to retry failed tasks (this is optional and if not
specified means to not do any error checking and task retrying). More about
fault tolerance handling will be described below. __EC[M,N]__ variables do not
need to be specified in the configuration file. However, the types and data
sources for other variables, and other information, are specified in the
configuration file which is described next.
Configuration File Format
ANDY
configuration files are XML files with the following tags and formats.
<ANDY_RUN
CONFIG_VERSION="1.0">
An ANDY
configuration file begins with the all enclosing ANDY_RUN tag, giving the
CONFIG_VERSION attribute as 1.0 (this will be used for backwards compatibility
in later versions of ANDY). The tags inside ANDY_RUN can be in any order, and
are the following.
<COMMAND_TEMPLATE
FILE="my_command.txt"/>
The
command template (described in the previous section) is usually placed in a
separate file, and the attribute FILE of COMMAND_TEMPLATE should be set to the
location of this file. The command template can also be placed directly in the
XML configuration file:
<COMMAND_TEMPLATE>...command
template here...</COMMAND_TEMPLATE>
This is
usually less desirable because then shell symbols such as >, <, etc. will
need to be encoded to >, <, etc. to avoid having them
confused as XML.
The
heart of the configuration file is the VARIABLES section where variable names,
types, and data sources are defined. Each variable definition is inside of a
VARIABLE tag, and all VARIABLE tags are enclosed within the VARIABLES tag. The
VARIABLE tag itself can have four attributes defined: NAME, TYPE, CLIENT_TYPE,
and SERVER_TYPE. The value of NAME should match a variable used in the command
template. The value of CLIENT_TYPE should be the type of the variable in client
code (RAW, FILE, PIPE, or MEM_BUF), while SERVER_TYPE should be the type of the
variable in server code (PIPE or MEM_BUF); thus, a variable can have different
types in client and server code. The value of tag TYPE, if defined, will be
used for the type of the variable in both client and server code (and will
overshadow any values given for CLIENT_TYPE and/or SERVER_TYPE). If there is no
definition for CLIENT_TYPE or TYPE the default client code type is RAW, and if
there is no definition for SERVER_TYPE or TYPE the default server code type is
PIPE. A variable that is used only in client code can have a data source
associated with it. The data source will be iterated over to generate all
concrete values for it, and the concrete values will be the input to
instantiated command lines run on clients. The way the values are input depends
on the variable type:
·
RAW:
Value is directly interpolated into the command line in place of the variable
name.
·
FILE:
Value is saved to a file and the full path to this file is interpolated into
the command line in place of the the variable name.
·
PIPE:
A named pipe is created, the full path to this named pipe is interpolated into
the command line in place of the variable, and the value is written into the
named pipe. Note that the variable can only occur once in the command line. A
named pipe's contents can only be read once (you can also not seek on a named
pipe) and thus a named pipe can have only a single reader (and a single writer
providing the contents). In this case, the command is the reader and the
external ANDY client code is the writer. Use MEM_BUF type if the variable needs
to occur multiple times and it is still desired to use named pipes.
·
MEM_BUF:
Multiple named pipes are created. The full paths to all but one are
interpolated respectively into the command line wherever the variable name
appears. The value is written into the last named pipe, and a separate
buffering process reads from this last named pipe and writes what it reads into
all the other named pipes whose full paths were interpolated into the command
line (the executing commands are assumed to read from these).
The
DATA_SOURCE tag is inside the VARIABLE tag. Inside the DATA_SOURCE tag there
must be a LIST tag with one or more ITEM tags within it. A TYPE attribute must
be defined for the DATA_SOURCE, and the legal values for this are themselves
defined in the DATA_SOURCE_TYPE_DEFS tag (see below). Each of the LIST ITEM
values is assumed to refer to something that itself contains concrete values
(e.g. the full path to a FASTA file containing sequences in FASTA format), and
the DATA_SOURCE TYPE is associated with a Perl module (the association is
defined in the DATA_SOURCE_TYPE_DEFS tag described below) that is used to parse
and read these concrete values. Here is an example to make this all clear:
<VARIABLE
NAME="__SEQ_IN__" TYPE="PIPE">
<DATA_SOURCE
TYPE="FASTA_FILE">
<LIST>
<ITEM>/home/db/test1.fa</ITEM>
<ITEM>/home/db/test2.fa</ITEM>
<ITEM>/home/db/test3.fa</ITEM>
</LIST>
</DATA_SOURCE>
</VARIABLE>
Here is
defined a variable with name __SEQ_IN__ and variable type PIPE. Thus, a named
pipe will be created and its full path will be interpolated into client
commands the single place where __SEQ_IN__ appears. This variable has a data
source of type FASTA_FILE, and thus each of /home/db/test1.fa,
/home/db/test2.fa, and /home/db/test3.fa are assumed to contain values that can
be parsed with a Perl module associated with the FASTA_FILE DATA_SOURCE TYPE
(i.e. they are each FASTA files to be read by a FASTA file reading module).Each
of the FASTA sequence records read from /home/db/test1.fa, etc. will be written
into the named pipe when executing the commands (and the other variables with
data sources will also have values written for them).
Variables
without data sources are assumed to be for output and inter-command communication
and can have the following types with associated behavior:
·
FILE:
A full path to a file is interpolated wherever the variable name appears in the
command line (it is assumed that the earliest executing command where the
interpolation took place will actually create and write the file, and later
commands will read it).
·
PIPE:
A named pipe is created and the full path to this named pipe is interpolated
into the command line in place of the variable occurrences. However, again
since a named pipe can have only a single reader and writer, there can be at
most two occurrences of the variable, with the first being the source (writer)
of the named pipe and the second being the reader. If the variable result value
is to be sent to the server then the variable name can only appear once (the
external ANDY client code is the reader in this case, reading the output
contents from the named pipe and sending it to the server). MEM_BUFs can be
used in cases where the contents of a named pipe needs to be written into
multiple other named pipes.
·
MEM_BUF:
Multiple named pipes are created. The full path to the first named pipe is
interpolated into the first (leftmost) occurrence of the variable in the
commands, and this is assumed to be the source. The full paths to the other
named pipes are interpolated respectively at all the other occurrences of the
variable in the command line. If the result value of the variable is to be sent
to the server, then another named pipe is created which the external ANDY
client will read from and write what it reads to the server. Finally, A
separate buffering process reads from the first (source) named pipe and writes
what it reads to all the other named pipes (the executing commands and/or the
external ANDY client are then assumed to read from these).
Here is
an example of a variable definition without a data source:
<VARIABLE
NAME="__OUT__" TYPE="PIPE"/>
Note
also that a PIPE or MEM_BUF variable must have all its occurrences within a
single pipe segment, since named pipes, which are used in the underlying
implementation of PIPE and MEM_BUF types, must be read from and written to by
concurrent processes (MEM_BUFs could technically cross pipe segment boundaries,
but that would require the external ANDY client buffering process to buffer
completely the contents of the source named pipe, and this is undesirable and
thus was made illegal). Also, while not prohibited by the system, care should
be taken in doing inter-command communication through files within a single
pipe segment, since there could be a race condition where the readers of the
file might execute before the creator/writer and die when they see that the
file does not exist yet.
The
DATA_SOURCE_TYPE_DEFS tag is where the associations between data source types
and the Perl modules that implement them are made. The DATA_SOURCE_TYPE_DEF tag
defines a data source type, and all DATA_SOURCE_TYPE_DEF tags are enclosed
within the DATA_SOURCE_TYPE_DEFS tag. Here is an example:
<DATA_SOURCE_TYPE_DEFS>
<DATA_SOURCE_TYPE_DEF NAME="FASTA_FILE"
TYPE="ITEM_READER"
MODULE="ReadFasta.pm"/>
<DATA_SOURCE_TYPE_DEF
NAME="RAW_VALUE" TYPE="FULL_READER"
MODULE="ReadRawList.pm"/>
</DATA_SOURCE_TYPE_DEFS>
The
three DATA_SOURCE_TYPE_DEF attributes NAME, TYPE, and MODULE are all required.
NAME is the name of the data source type, and is what is used as the TYPE
attribute of the DATA_SOURCE tag described above (i.e. it is the link between
the definition and use of a data source type). MODULE is the name of a Perl
module (which should be accessible to the Perl interpreter that is running the
server, i.e. @INC should include its directory) which implements the data
source type, i.e. is able to read each of the LIST ITEMs (e.g. FASTA files) of
that type and iterate over all their individual records, returning each
individual record in turn along with an index and text identifier for it. The
index is unique and consists of two parts, the index of the LIST ITEM and the
index of the record therein being referenced, e.g. 3rd FASTA file, 68th
sequence record. The index is usually just the numeric index, but in general
can be anything that uniquely identifies the record. The text identifier also
consists of two parts, the identifier for the LIST ITEM and the identifier for
the referenced record therein. It does not have to be unique, but should
ideally be something that users can use to identify and get information about
the referenced record; an example would be to use the FASTA filename and
defline identifier of a FASTA sequence record contained therein as the text
identifier. Data source modules also must be able to randomly access records
given their indices for fault tolerance (i.e. the ANDY server needs to be able
to re-retrieve failed records to retry them). TYPE must be either ITEM_READER
or FULL_READER. A MODULE that is a FULL_READER iterates directly over all the
LIST ITEMs and their contained records, e.g. iterates over all FASTA records
contained in three FASTA files whose paths are ITEMs in a LIST. A MODULE that
is an ITEM_READER only iterates over the records contained in a single LIST
ITEM; in this case, ANDY has internal code which uses the ITEM_READER MODULE as
a sub-module to iterate over all the records of all the LIST ITEMs (the
internal code uses the numeric index of the LIST ITEM as its index, and the
LIST ITEM's full value as its identifier). Finally, the ANDY server will
iterate through all data sources through their data source reader modules to
create all possible combinations of their individual data source values: each
such combination will become one task to do at a client.
With the
publicly available distribution of ANDY we provide implementations of data
source reader modules for several common formats such as FASTA files, directory
of files, raw list (i.e. each LIST ITEM is itself the value to be used), and
separated file (i.e. each LIST ITEM is a file with records separated by some
user-specified text). New data source reader modules can be easily created and
used, either as FULL_READERs or, more easily, as ITEM_READERS. The included
data source readers in the ANDY distribution detail and are examples of the
interface that data source reader modules must implement. In addition, we
include several base class data source reader modules which can be easily subclassed
to create specific data source reader modules of common types. For example, we
provide a module ReadRecFile.pm which can be subclassed to create data source
reader modules for any file type that contains a sequence of records --- the
subclass need only provide the implementation of a single method which, given a
file handle, reads the next record from the file and returns it (along with a
text identifier for it); the FASTA file data source reader module ReadFasta.pm
is a subclass of this and can be studied as an example. We also welcome
submissions of new data source reader modules, and any other useful ANDY
related code, to be shared with the ANDY user community.
Variables
and data sources can have attributes defined for them inside their tags, and the
following are the legal attributes and their behavior. A variable with
attribute SERVER_SAVE defined will have its output results saved at the server.
Inside the SERVER_SAVE tag, specify the template for the name of the result
file at the server. Templates are specified the same as the LIST ITEM of
ID_INDEX_INTERP data sources, described next. Here is an example of a variable
with SERVER_SAVE attribute defined:
<VARIABLE
NAME="__OUTPUT1__" TYPE="PIPE">
<SERVER_SAVE>[__SEQ_IN__.ID1][__SEQ_IN__.INDEX1].txt</SERVER_SAVE>
</VARIABLE>
RAW
variables whose data sources have attribute ID_INDEX_INTERP defined as YES are
treated as follows. First, only one item may be specified in the DATA_SOURCE
list. This should be a string into which will be interpolated (at the client
for each task) the current values of identifiers and indices for the current
task, where the locations at which these should be interpolated are specified
with the syntax (all other text is left as-is):
[__VARNAME__.(ID0|ID1|INDEX0|INDEX1)]
The
post-interpolation value of the list item will then itself be interpolated into
the command line in place of the variable name, like any other raw variable.
For
example:
<VARIABLE
NAME="__INTERP1__" TYPE="RAW">
<DATA_SOURCE
TYPE="RAW_VALUE">
<ID_INDEX_INTERP>YES</ID_INDEX_INTERP>
<LIST>
<ITEM>-s
'[__SEQIN__.INDEX0][__SEQIN__.INDEX1]'
-j
'[__JVALS__.INDEX0][__JVALS__.INDEX1]'
</ITEM>
</LIST>
</DATA_SOURCE>
</VARIABLE>
v
ID0
refers to the identifier of the list item corresponding to the referenced
variable's current value (e.g. FASTA file name "test1.fa").
v
ID1
refers to the identifier of the particular element inside the list item
corresponding to the referenced variable's current value (e.g. FASTA defline
ID).
v
INDEX0
refers to the index of the list item corresponding to the referenced variable's
current value (e.g. ITEM 3 of 10 LIST ITEMs).
v
INDEX1
refers to the index of the particular element inside the list item corresponding
to the referenced variable's current value (e.g. FASTA sequence record 50 out
of 9000).
Note
that variable names with ID_INDEX_INTERP data sources cannot themselves be
referenced in an ID_INDEX_INTERP data source list item.
RAW
variables which have data sources with attribute SERVER_DIR defined are treated
as follows. First, only one item may be specified in the data source's list.
This item should be the location of a directory on the server into which result
files should be placed and where the ANDY server can place temporary files.
There must be exactly one SERVER_DIR defined. Similarly, RAW variables which
have data sources with attribute CLIENT_DIR defined are treated the same as
SERVER_DIR, except the specified directory is on the cluster nodes (where the
ANDY client can place temporary results and files). Note that if a command must
output something to a file, but it is not desired to save this result, give
path "/dev/null" for the path to have it thrown away (without causing
deadlock from not reading it).
Variables
with data sources of type TASK_RESULTS_INFO are special and only used in server
code. Their are no list items associated with the data source. Input pipes and
mem_bufs in server code will have written to them the corresponding variable
output from all the clients sequentially, one after the other with no break.
Thus, the server code will need some way to determine where the breaks between
different results from different clients are and which task a result
corresponds to. The ANDY server can optionally pass this information in through
the pipe or mem_buf corresponding to a variable with a data source of type
TASK_RESULTS_INFO. The server will write into this pipe or mem_buf in a
regular, parseable format as follows:
__TASK_RESULTS_INFO__
__VAR1__\tINDEX0\tINDEX1\tID0\tID1
__VAR2__\tINDEX0\tINDEX1\tID0\tID1
__VAR3__\tN
__VAR4__\tN
...
__END_TASK_RESULTS_INFO__
Lines
with just one value, N, after the variable name specify variables whose output
value from a client is written into a pipe or mem_buf in the server code, and N
is the length in bytes of this output value. Lines with INDEX0\tINDEX1\tID0\ID1
give the index and identifier information of the input task variables at the
client that generated this output, thus allowing the server code to associate
output with the input that produced it. INDEX0, INDEX1, ID0, and ID1 mean the
same as described previously for ID_INDEX_INTERP and SERVER_SAVE. Thus, by
reading and parsing a TASK_RESULTS_INFO pipe or mem_buf the server code can know
how much to read for each task's output results, where the breaks between
different results from different clients are, and the task that these results
correspond to. Here is an example:
<VARIABLE
NAME="__TRI__" TYPE="PIPE">
<DATA_SOURCE TYPE="TASK_RESULTS_INFO"/>
</VARIABLE>
As
described previously, the SERVER_SAVE template will be instantiated to generate
the filename into which to save a result from a client. By default, all such
results will be saved inside a subdirectory (named based on a concatenation of
the user's login name, machine name, and timestamp) inside the SERVER_DIR. One
can optionally specify that output results should be saved in further
subdirectories inside SERVER_DIR as follows:
<SERVER_SAVE_SUBDIR_TEMPLATE>M,N</SERVER_SAVE_SUBDIR_TEMPLATE>
If
SERVER_SAVE_SUBDIR_TEMPLATE is defined, the server will take a substring
starting at the Mth character from the left of length N characters for each
instantiated SERVER_SAVE filename, and create a subdirectory whose name is this
substring; the corresponding result file will then be saved in this
subdirectory.
The
DRM-specific Perl module must be specified using the DRM tag as follows:
<DRM>DRM_module[.pm]</DRM>
This module
should be accessible to the Perl interpreter that is running the server, i.e.
@INC should include its directory. The ANDY distribution includes DRM-specific
modules for PBS and for the Berkeley Millennium cluster.
Also,
the DRM-specific module can have its own configuration information (e.g. full
path to qstat, qsub, etc.) and the DRM_CONFIG tag allows a user to specify a
separate configuration file that will be passed into the DRM-specific Perl
module's constructor; this is optional, and nothing will be passed into the
constructor if not specified. Note also that the DRM configuration file does
not need to be in XML, and its format depends on the specific DRM-specific
module being used.
<DRM_CONFIG>drm_config.txt</DRM_CONFIG>
The
following three tags specify how many jobs will be submitted by the ANDY server
and how many tasks each will do. The value of GRAIN_SIZE specifies how many
tasks will be sent at once from the server to a client when it connects to the
server requesting a task. This value can be experimented with to balance
communication overhead and dynamic adaptability to different task lengths (i.e.
a higher value means less communication overhead, but less task granularity so
that some jobs might be assigned many long running tasks and thus become
bottlenecks reducing performance; a smaller value means better adaptability to
differing task lengths, but more client-server communication overhead).
NUM_JOBS is how many client jobs the server will submit; each job upon starting
on a cluster node will continuously do tasks until there are no more tasks to
do and the server says to exit (this is thus dedicated mode). If MAX_GRAINS_PER_JOB is specified then NUM_JOBS
is ignored (it is not required if MAX_GRAINS_PER_JOB is specified). In this
case, each job will do GRAIN_SIZE * MAX_GRAINS_PER_JOB tasks, and the server
calculates how many total jobs need to be submitted in order to complete all
tasks (each job thus does a specified amount of computation and then exits, and
this is therefore fair mode).
<GRAIN_SIZE>1</GRAIN_SIZE>
<MAX_GRAINS_PER_JOB>5</MAX_GRAINS_PER_JOB>
<NUM_JOBS>32</NUM_JOBS>
The
KEEPALIVE_CONNECTIONS tag should have value YES or NO. If YES, ANDY clients
will maintain a constantly open socket connection to the ANDY server, and if NO
they will initiate a new socket connection for each transaction with the
server. For runs with a small number of clients KEEPALIVE_CLIENTS YES could
give better performance by eliminating socket connection overhead. For larger
runs with more clients, KEEPALIVE_CLIENTS NO might be necessary because of too
many open file descriptors if YES (sockets use file descriptors and Unix/Linux
has limits on them), or at any rate could give better performance by
eliminating the overhead of select-based switching among open sockets. Again,
this is something that can be experimented with for better performance.
<KEEPALIVE_CONNECTIONS>YES</KEEPALIVE_CONNECTIONS>
The
command template format section above described how clients will check exit
status codes of commands, flagging error to the server if any are nonzero so
that the server can retry. ERROR_RETRY_COUNT specifies how many times the
server will retry a failed task; if greater than zero it will retry that number
of times, if zero the error will be flagged to the user in the run's log file
but the task will not be retried, and if not defined no exit status code error
checking will be done. If an ANDY run is in fair mode then a new job will be
submitted for each failed task to be retried, and if in dedicated mode no new
jobs will be submitted for failed tasks to be retried (they will simply be
eventually given to one of the long running clients).
<ERROR_RETRY_COUNT>3</ERROR_RETRY_COUNT>
Retrying
failed tasks is one aspect of fault tolerance handling. The other major part is
client job fault tolerance, i.e. monitoring ANDY client jobs, resubmitting
failed ones, and retrying the outstanding tasks of failed clients. Client fault
tolerance in ANDY uses two sources of information. First, DRMs generally
provide a "qstat" command which returns the status of jobs submitted
to the cluster, i.e. telling which jobs are waiting in queues to be executed,
which ones are running, which ones are in error, which ones are suspended, etc.
ANDY uses the results of periodic calls to a DRM's qstat in determining which
jobs have failed. Second, ANDY clients "ping" (i.e. send a short
message to the server over a socket connection) the server periodically to let
it know they are alive and still running, and ANDY keeps track of each clients'
last ping time and uses this information also in determining failed jobs. In
fact, the more reliable of these two sources of information is the history of
pings from the clients, and once the first ping is received from a client the
qstat information for it is no longer used. However, since in general we can
assume no time boundaries for how long it will take a job to proceed from
"queued" state to "running" state --- it depends on how
many jobs are ahead in the queue and how long they run --- up until receiving
the first ping from a client we will need to use the results of qstat to
monitor it.
The
fault tolerance mechanisms themselves should be, in a sense, fault tolerant,
i.e. eventually able to determine with high confidence that a failed job has
indeed failed but conservative in making this estimation. For example, a single
call to qstat might transiently fail or a client might miss a ping, even a few
times in a row, and we do not want to overburden the DRM and/or lock out other
cluster users behind a long queue of unnecessary jobs by spuriously
resubmitting false positive failed jobs over and over. Thus, in ANDY
determinations of client job failure are based on a history of multiple calls
to qstat and multiple pings from clients, which smooths out isolated failed
qstat calls or missed pings. Note also that ANDY does not fail if jobs that
were determined failed actually are still alive and continue running and
communicating with the server; the server will receive any results from these
jobs (but not give any new tasks), keep track of which tasks have completed so
that it doesn't mistakenly accept results for the same task more than once, and
not exit until all tasks complete.
With
this in mind, here are the details of client job fault tolerance in ANDY. The
actual call to a DRM's qstat call is done in the DRM-specific module, and the
method in the module that makes this call must translate the DRM-specific state
of each job into an abstract ANDY state: Queued, Running, Error, Suspended, or
Other. Queued means the job is okay and waiting to be run, but has not yet
started executing (i.e., it is waiting in a queue to run); Running means that
the job is actively executing on a cluster node; Error means that the job has
failed or is otherwise in error; Suspended means the job is not in error and
was previously running but is now in a suspended, not-running state and will
remain so for an unknown amount of time (and thus the job cannot send periodic
pings which the fault tolerance must take into account); finally, Other means
the job is in some state that does not match any of these. Also, immediately
after a client job is submitted and before any qstat results are gotten for it,
it is put in the Init state. A separate "Pinging" state is kept for
each client job: if a ping has been received from a client its pinging state is
1, otherwise 0. Then, the ANDY server keeps track of each client's job state
and pinging state and determines failed jobs based on these rules:
v
Keep
a timestamp for the last time each client sent a ping, and reset it whenever a
client pings.
v
Keep
a timestamp of the last time each client's job state changed, and reset it
whenever the job state changes to something different.
v
For
client jobs in pinging state 0:
§
If
the job stays in Init job state > T0 seconds then the job has failed and is
resubmitted. Its outstanding tasks are given out to a client to be done again.
§
If
the job stays in Error job state > T1 seconds then the job has failed and is
resubmitted. Its outstanding tasks are given out to a client to be done again.
v
For
client jobs in pinging state 1:
§
If
the job switches from Suspend state to any other state, then reset the last
ping timestamp to the current time.
§
If
the job has been in Suspend state for greater than T2 seconds, but less than T3
seconds, then do nothing (job is not considered failed yet).
§
If
the job has been in Suspend state for greater than T3 seconds, and a ping
hasn't been received in more than T4 seconds, then the job has failed and is
resubmitted. Its outstanding tasks are given out to a client to be done again.
§
If
neither of the previous 2 rules apply, and if a ping hasn't been received in
more than T4 seconds, then the job has failed and is resubmitted. Its
outstanding tasks are given out to a client to be done again.
The
server does fault tolerance handling (i.e. checks the above rules and resubmits
failed jobs) at regular intervals. The FAULT_CHECK_INTERVAL tag value specifies
the length in seconds of the intervals:
<FAULT_CHECK_INTERVAL>60</FAULT_CHECK_INTERVAL>
ANDY
client jobs ping the server at regular intervals, and the CLIENT_PING_INTERVAL
tag value specifies the length in seconds of the intervals:
<CLIENT_PING_INTERVAL>60</CLIENT_PING_INTERVAL>
Finally,
the values T0,T1,T2,T3, and T4 referenced in the above
rules are specified as follows:
<T0>240</T0>
<T1>240</T1>
<T2>240</T2>
<T3>240</T3>
<T4>240</T4>
The user
can set the values of these fault tolerance parameters to balance as needed for
his application quick fault tolerance response time versus application
performance; smaller values for these values mean failures will more quickly be
detected and handled, but at the potential cost of degraded performance since
more compute time will thus be spent maintaining fault tolerance and not in
doing the actual application computation.
The last
tag in the configuration file is required and is NODE_PERL, whose value should
be the full path to a Perl interpreter to be used on the cluster nodes to run
ANDY clients:
<NODE_PERL>/lab/bin/perl</NODE_PERL>
Finally,
the ANDY configuration file ends with the closing ANDY_RUN tag:
</ANDY_RUN>
Writing a DRM-specific Module
To
extend ANDY to be used on a new DRM, one must write a small DRM-specific Perl
module with 3 required methods:
1) A constructor "new" that
optionally takes a DRM-specific configuration file as an argument:
sub new {
my ($class,$drmConfigFile) = @_;
....
2) A method "submitJob" which
submits jobs to the cluster using the DRM's interface:
sub submitJob {
my ($this,$execClientCmd) = @_;
....
$execClientCmd is an array reference that contains the parts
of the command to run the ANDY client on the nodes, which submitJob should
submit to be run on a cluster node. submitJob should return a DRM-specific
identifier for the submitted client job, or nothing on failure. The identifier
can be anything, but it should uniquely identify the job and is used in the
next required method to query job status. Most DRMs return such a unique job
identifier as the return value of their "qsub" command, and this can
be used to query the job's status with the DRM's "qstat" command.
Unix process IDs could also be used for this (the Berkeley Millennium module
does this).
3) A method "jobStates"
which, given the DRM-specific job IDs (i.e. returned by submitJob) of all
submitted, still-running client jobs, queries their DRM-specific job status and
translates this into the abstract ANDY job status described previously (i.e.
Queued, Running, Suspended, etc.) and returns it.
my %jobStateMap = (
"E" => "E", #exiting --> E
"R" => "R", #running
--> R
"Q" => "Q", #queued
--> Q
"H" => "Q", #held
--> Q
"T" => "R", #transition
-> R
"W" => "Q", #waiting
--> Q
"S" => "S" #suspended
--> S
);
sub
jobStates {
my ($this,$jobIdsHash) = @_;
....
Most DRM-specific modules can just run the DRM's
"qstat" command and translate the results to the abstract ANDY job
states. Another possibility (used by the Berkeley Millennium module) is to do
"ps -uxw" on all the nodes running clients and use the Unix process
status information (this is done using "gexec" in Berkeley
Millennium, but could also be done with rsh or ssh, e.g. in the case where
there is no proper DRM installed and the DRM-specific module must encapsulate
the necessary job submitting and monitoring functionality).
Since
the DRM-specific module is a Perl module that is instantiated to an object,
state can be kept in it to support advanced and cluster-specific scheduling
policies and allow the full potential of the DRM to be exploited. For example,
the BLAST program memory maps the database to be searched for performance, and
later executions of BLAST can make use of the still memory mapped database of a
previous BLAST execution to avoid the performance hit of having to re-memory
map. This could be exploited for increased performance in a DRM-specific module
for running BLAST (and other programs that similarly memory map). The
DRM-specific module could be written to store the cluster nodes on which
previously submitted BLAST-executing clients ran, and to preferentially run new
BLAST-executing clients (searching the same database) on one of them (it is
also desirable to schedule preferentially away from cluster nodes that recently
memory-mapped other large databases, i.e. to try to avoid clobbering another
run's memory mappings). We have not tried this yet, but have plans to in future
development of the ANDY cluster tool.
Conclusions and Possible Future
Directions
We have
built the ANDY cluster tool as a practical and simple-to-use, extensible and
fault tolerant tool for running large biological database searches, and similar
applications, over the nodes of a Linux/Unix computer cluster managed by a DRM
in order to achieve speedup through parallelism. In our tests it achieves good
performance of close to 90% processor efficiency on our 64 node cluster, and it
has been widely used in our computational biology lab in support of research.
Many other research labs and organizations will likely have similar clusters
and can benefit from ANDY, and we are thus releasing it freely to the public at
http://compbio.berkeley.edu/proj/andy/
in hopes that it proves useful to a wide audience.
There is
still room for improvement of ANDY, however, and the following are some key
directions for further investigation, development, and experimentation.
·
Server side fault tolerance.
ANDY's fault tolerance mechanisms are currently limited to
client fault tolerance, and a run can fail if the ANDY server fails. In this
case there will still be partial results (any result files and summary
information received before the server failed). But there needs to be
mechanisms for server fault tolerance. Some possibilities include having backup
server processes or having a mechanism to restart from the logfile that ANDY
generates during a run (or restart from some other persistent record of the
current state of the run). Restarting from the logfile is probably the most
practical and straightforward way. However, there are problems that would need
to be solved for cases when server summary code is used. If there is no server
summary code, then all the client results are each individually stored in
separate files and restarting from the logfile should be easy --- any partially
finished tasks which weren't recorded as complete in the logfile can simply be
redone and any partial results overwritten (and tasks recorded as complete
don't have to be redone). In other words, transactional rollback is not a
problem if there is no server code, but is a problem if there is server code
since server summary code generally appends all summary results to single
files, and in general ANDY cannot know what partial results should be expunged
from such summary files in order to restart in a consistent state. The solution
will probably require the user to parse the partial summary files and get them
into a consistent state before restart, and we should concentrate on making
this as easy to do as possible.
·
More flexibility in specifying tasks
and task order.
ANDY currently allows a single command template to be
specified, and iterates over all data sources and each combination of data
source values becomes a task. This is generally what is desired, but there
might be cases where tasks and task order need to be more flexibly specified,
or multiple command templates need to be run. For example, the user might not
want to execute commands for all combinations of data source values, or might
want to order them in a special way (the only control over task order the user
currently has is by the order of the data sources in the XML configuration file
--- the top to bottom order in the configuration file matches the outer to
inner loop ordering of the iteration). One possible simple solution to the task
and task ordering problem would be to have a separate utility which iterates
over the data sources in the normal, default order and prints them (their
indices) in order to a task file. The user could then reorder and delete from
this task file as desired, and tell the server to do the tasks in this file, in
order, instead of iterating over the data sources in the default way. As far as
allowing different command templates, one could imagine creating a meta-
configuration file to specify multiple ANDY runs that execute together as part
of a larger run; but it would probably just be easier to consider ANDY a
primitive, basic tool and let users write their own scripts that run ANDY
multiple times for multiple command templates.
·
Persistent server, clients,
commands, and RAM-Disk for better performance.
The more we can eliminate the overhead of starting up or
switching between processes, having to startup the Perl interpreter, writing to
disk, etc. the better performance we can get. In the current ANDY, for each
separate run a server and many clients must all be started up. If several users
are all doing ANDY runs on the same cluster, they will all be running their own
independent servers on the head node, and these independent servers will be
competing for CPU time, there will be overhead from the operating system
switching these different servers in and out, and there will be redundant use
of memory (data structures which could be shared among all the servers but are
instantiated in each). Top performance of the server is crucial to get overall
good performance, since the slower the server is the more the clients will be
held up waiting to get tasks from or send results to it and thus overall
performance will be degraded. Thus, having a single persistent ANDY server
running on the cluster head node which all users use for their runs would
reduce this overhead and give better performance. This would also allow more
state to be kept to help in scheduling (e.g. the single server could know at
once where all users' jobs were submitted and what databases they are using,
etc., and could use this information to better schedule new jobs, etc.)
We could also do the same thing with the ANDY clients, and
this could give an even bigger performance boost, especially for "fair
mode" runs (where each job does a specified amount of work and then exits,
and the server submits enough jobs at the beginning so all tasks can complete
--- client startup overhead, especially if many clients were submitted, can be
large). A persistent client job could run (started outside of the DRM) on each
cluster node; the ANDY server would still submit jobs to be run on the cluster
nodes as before, but these jobs instead of running the client code themselves
would simply contact the persistent client running on the node and tell them to
start executing on behalf of them (i.e. tell them the command line to execute,
the ID to use in talking with the server, etc.) If there are no ANDY jobs
active on a node, then the persistent client will just sleep and not consume
the CPU while waiting for a connection from a new job. Having persistent clients
would remove the overhead of having to continually startup the Perl
interpreter, load in Perl modules, initialize data structures, etc. and would
give a performance benefit in the same way that running CGI scripts in the
Apache web server through mod_perl gives a substantial performance benefit as
compared to running raw CGI scripts (mod_perl is basically a persistent Perl
interpreter running inside Apache, and thus there is no per-script Perl
interpreter startup overhead). Of course, we could avoid the overhead of
starting up and using the Perl interpreter by rewriting ANDY in a language such
as C or C++ and compiling and linking it to a binary machine executable, and
this is something else to consider.
It might even be possible to take this even further and have
certain specific application commands be persistent. For example, BLAST can run
in a mode where, in a single execution of BLAST, input FASTA sequences can be
continually piped in and BLAST pipes the results of them concatenated in order
through standard out or an output file. Thus, for BLAST and similar
applications in this respect, we could potentially have ANDY clients execute
them once, thus avoiding the overhead of multiple forks and execs and multiple
reloading of the databases into memory, set their input and output channels to
autoflush (so input is immediately sent and output is immediately received from
it without any intermediate stdio buffering), and then continually feed input
to them and read the corresponding output (ANDY would need to know how to break
the concatenated output stream into distinct task result chunks to send back to
the server).
One final idea is, instead of using named pipes to avoid
having commands write to disk, set up a RAM disk and have the executing
commands read and write file input and output to this. This would be less
portable and more difficult to set up, but would make it much easier for users
to specify their commands and runs (they wouldn't then have to worry about
specifying type PIPE or MEM_BUF --- everything except RAW would be FILE, but
would really be flowing through the memory of the RAM disk).
·
Cluster node local disk space
management
Programs such as BLAST take a database to search against as
an argument, and read this database file from disk into memory to search
against. A common use for ANDY is to run many executions of programs like BLAST
in parallel over the cluster nodes, and these parallel executions will all need
to read the same database at the same time. Where will they read it from? This was
never really discussed above; in the examples the databases were read from the
cluster head node over NFS. For small to moderate sized databases this would
not be a problem; there would be a small hiccup at the beginning as the
starting clients all read the database over NFS into memory at the same time,
but then the database would be memory resident on all the nodes and would not
need to be read from disk again. However, for very large databases (ones that
approach or exceed available memory) to be read over NFS from the head node
would be disastrous for performance. Such databases could not be completely
memory resident, and the parallel executing clients would need to continuously
read and reread parts of the database from disk over NFS into memory, and the
NFS server would be overburdened and might even fail; performance could be
worse than if the job had simply been run serially on a single computer.
In cases where a database exceeds available memory and will
need to be continually read from disk, the database file should be available
locally on each client's disk and accessed from there independently by each
client, or in some other way the access should be made efficient. With the ever
increasing size of databases used in computational biology, this is becoming
the common case which should be optimized for. A simple, albeit expensive,
solution would be to employ a high-performance distributed file system such as
IBM's General Parallel File System ("GPFS"), and we have considered
this. Alternatively, in support of ANDY we could build a complementary disk
space management system that could quickly distribute large database files to
all the local node disks. To achieve this, we could keep track of the locations
of database files (i.e. locations on the head node and any copies on node local
disks) and build a program that would copy a requested database to all node
local disks in a binary tree pattern starting from the currently available
locations. For example, if there were 3 current copies, 3 initial copies could
be started, then 6 more once the first 3 complete, then 12 more once those
complete, etc., until copies exist at all node local disks. Each new copy
location is entered into the record of database file locations to be made
available for future copies.
Another thing to consider is that many different people will
be using the cluster and filling up node local disk space with databases and
other files used by their cluster jobs. The disk space management system thus
must balance two conflicting goals: (1) We don't want the local disk space to
become too cluttered and fill up to capacity so that jobs to be run don't have
enough disk space for their needs, and (2) At the same time, we'd like to keep
users' files, especially their often used large database files, around for as
long as possible so that jobs can keep efficiently reusing them and thus they
won't be recopied over and over. The basic idea for a solution to this can be
summarized as "erase on demand" or "lazy erase" and is
similar to the way cache memories and virtual memory page tables are managed:
the disk space management system does not erase files until necessary. A job
running on a cluster node can request to have a large file copied to the node's
local disk, and the disk space management system will free up just enough disk
space to accommodate the file (or free nothing if there is already enough
space). Then, to perform the actual copy we can employ a dynamic generalization
of the binary tree copying strategy described above: each copy request must
first obtain an open "slot" for the file to be copied (i.e., only
allow a fixed number of simultaneous copies for a given file at some location,
to avoid overburdening the file system). The disk space management system keeps
track of how many slots are in use and how many are available and from where,
and copy operations must block until a slot is available. Finally, while we
would want to employ some kind of "least recently used" strategy in
picking files to erase to make space for new copies, the disk space management
system must ensure that the files of actively running jobs are not erased.