ANDY: A general, fault-tolerant tool for database searching on computer clusters.


Extended Presentation and Documentation.





ANDY is a set of Perl programs and modules for easily running large biological database searches, and similar applications, over the nodes of a Linux/Unix computer cluster in order to achieve speedup through parallelization. Users specify their cluster runs through a straightforward XML configuration file, in which they specify a template for the pipeline or sequence of commands they want to run on the nodes, as well as the locations and types of data sources for the command lines and other information. The tool works directly with Distributed Resource Management ("DRM") systems, such as GridEngine and PBS, and is easily extensible to different DRMs simply by writing a small DRM-specific module for submitting and monitoring jobs. Cluster runs can be done in both dedicated mode, where nodes are held and used until all tasks complete, or in fair mode, where each submitted job does only a specified amount of computation and then exits, allowing fair use of the cluster where other users' jobs can be interspersed among ANDY jobs. For efficiency, support is provided for (optional) piping of file input and output of commands through named pipes and buffering in memory, in order to avoid the possible performance hit of slow disk I/O. The tool is fault tolerant, checking that all jobs and tasks complete and resubmitting failed jobs and tasks until the run is complete. Users can provide their own application-specific error checking simply by adding error checking commands into the sequence or pipeline of commands, and the tool will detect non-zero exit statuses and flag error, causing the server to retry; users can similarly provide for succinct summaries of raw program output to be created, which can minimize communication with the server to give better performance. The tool also allows a command pipeline to be run at the server over the course of a run into which the results from node jobs are piped, allowing global summaries to be created. We also provide a set of concrete client and server side summarization and error checking routines for key bioinformatics programs such as BLAST, FASTA, and ssearch. The tool thus provides an infrastructure for generalized, distributed, fault tolerant, high-performance command pipelines over a compute cluster. The tool has been widely used in support of research in the Brenner computational biology lab at UC Berkeley, and we are making it publicly available at




A common task for computational biologists is searching, using programs such as BLAST, ssearch and FASTA, sequences against a database of other sequences (or a database against itself) to find similarities suggestive of homology. Such searches are being done more often and on a larger scale as genome sequencing and related projects generate ever more sequence data. Many organizations are acquiring computer clusters in order to run such searches more efficiently in parallel over the many nodes of the cluster. However, while most researchers can run their database search jobs themselves on a single machine, it is not trivial running such jobs in parallel over a cluster in a fault tolerant way. Some solutions exist for specific search programs, e.g. TurboBLAST is a product sold by the company TurboGenomics that is a version of the NCBI BLAST program for running in parallel over clusters. More generally, tools such as Disperse and WRAPID allow users to specify a database search command line and have it run in parallel over a cluster, and ANDY is most similar to these tools but provides additional features and enhancements.


On a cluster managed by a DRM, users submit jobs that they want to run on a cluster through the DRM interface, and the DRM controls where and when jobs will be run (jobs are placed in "queues" while waiting to run) and maintains fairness and efficiency of resource allocation. Commonly used DRMs include Platform LSF, Open PBS/PBS Pro, and Sun GridEngine. The Brenner lab possesses a 64 node Linux cluster and we have run Sun GridEngine and PBS Pro on it. We also had access to the Berkeley Millennium campus cluster running Ganglia/gexec and the Alvarez cluster at Lawrence Berkeley National Lab running PBS Pro, and we wanted to be able to seamlessly use these as additional sources of compute power. We needed a tool which would easily (almost as easy as running on a single machine), efficiently (to achieve speedup through parallelization), fairly (so users would not "hog" all of the cluster resources while running their jobs), and seamlessly allow biological database search jobs to be run over our cluster and other clusters to which we had access through the DRM. In addition, it was important that the tool maintain fault tolerance, i.e. checking that all individual sequence searches complete successfully and redoing failed searches.


We have built such a tool in Perl for Linux/Unix clusters, and we call it ANDY. Users specify their cluster runs by creating an XML configuration file in which they describe a parameterized template for the sequence or pipeline of commands they want to run using common Unix shell syntax, the locations and types (e.g. FASTA file, directory of files, list of values, etc.) of data sources which will give concrete values to the command template parameters to create instantiated commands to be run on the cluster nodes, and various other configuration information. The data sources are iterated over to generate all combinations of data source values, and each combination provides a set of concrete values for the command template's parameters and is a single task to be done.


Users then execute the ANDY cluster tool server on their configuration file from the cluster head node, which submits ANDY client jobs to the cluster nodes, and then sends tasks to and receives results from connecting ANDY clients; the server also handles fault tolerance, monitoring jobs and resubmitting failed jobs, and redispersing tasks that failed (up to some user-specified number of times) to be tried again, not exiting until all tasks are complete. If specified by the user, the server will also pipe results from clients into a server summary command which executes during the entire run. To submit and monitor client jobs the server uses a DRM-specific module, the only code that is specific to the DRM being used. Submitted clients are scheduled to nodes by the DRM, and when they start up they first contact the server to request run-specific configuration information (the command template, environment variables to set, what results should be sent back to the server, etc.) They then enter a loop where they continually (until the server says to exit or they lose contact with the server) request a task to do from the server, then instantiate the command template based on the received task values, execute the instantiated commands, and send results (or that error occurred) back to the server. As part of fault tolerance, clients also periodically send "ping" messages to the server to let it know they are still alive.


Task values can be input to commands in four ways: (1) raw (the value is directly interpolated into the command line), (2) file (the value is saved into a file on disk and the full path to this file is interpolated into the command line); where applicable and for performance to avoid potentially costly disk I/O, (3) named pipe (the full path to a named pipe is interpolated into the command line, and the value is written into the named pipe), and (4) memory buffer (the value is written into a named pipe, and a separate buffering process reads from this named pipe, buffering it in memory as necessary, and writes what it reads to one or more other named pipes whose full paths have been interpolated into the command line). Inter-command communication can be through files (one command writes a file and proceeding one(s) read it) and standard out piping, but also through named pipes (one command writes to a named pipe and another proceeding one reads it) and memory buffers (one command

writes to a named pipe, a separate buffering process reads from this named pipe, buffering it in memory as necessary, and writes what it reads to one or more other named pipes). Command output can also similarly be through files, named pipes, and memory buffers.


Memory buffers thus support a "multi" tee, but they are also meant to be used in situations where direct use of named pipes would cause deadlock but the use of named pipes is still desired (e.g. command 1 writes streams A and B in that order, and command 2 reads streams B and A in that order; direct use of named pipes here and in analogous cases would cause deadlock, but with memory buffers the intervening buffering process will completely buffer A, then read B and write it so that command 2 can read it, and finally write A for command 2 to read). Of course, memory buffers cannot be used in situations where command output exceeds available memory --- the user must know these cases and either specify file output for them, or, if possible, rewrite commands to be able to read and write output in an order that will avoid blowing up memory with memory buffers or deadlocking with named pipes.


The ANDY cluster tool was thus designed to work directly with DRMs and to be easily extensible for use on clusters running any DRM software by simply writing a small DRM-specific module with implementations of a job-submitting subroutine and a fault-checking subroutine. The majority of the ANDY cluster tool code is in the core, which can be used unchanged across different clusters and DRMs. This is the key difference and advantage of the ANDY cluster tool over other similar systems such as Disperse and WRAPID: it is built to work easily with DRMs that are already in place rather than replacing them as Disperse and WRAPID do, i.e. it decouples the parallelizing of commands from the DRM functionality. This approach seems to be the more logical one: other research and corporate efforts have extensively investigated cluster distributed resource management and there are many powerful, full-featured systems available, such as Sun GridEngine, PBS Pro, and Platform LSF. Rather than reinventing the wheel in a simplistic and incomplete way by integrating limited DRM functionality into a cluster biological database search tool, we should instead leave the DRM functionality to the pros at Sun, Platform, PBS, etc. and concentrate on building systems that can easily and flexibly work with these DRMs and take advantage of their many features. This is the approach taken by the ANDY cluster tool.


In any case, as a practical matter most clusters, especially in the life sciences, are managed using DRMs such as PBS and Platform LSF and tools such as ANDY that work seamlessly and unobtrusively with them should be welcome. It is also worth pointing out that, in the absence of a proper DRM, the tool could still be used by writing a DRM-specific module that effectively encodes the basic, necessary job starting and monitoring functionality (e.g., using rsh or ssh, and ps). Other key advantages of the ANDY cluster tool over similar systems such as Disperse and WRAPID include ANDY's support for input and output through named pipes and memory buffers for performance and multi teeing, its support for server side code for summarization in addition to client code running on the nodes, its support for summarization and error checking simply as additional user commands in a command sequence/pipeline (and its inclusion of concrete summary and error checking routines for important bioinformatics applications), and its fault tolerance.


The remainder of this paper is organized as follows. The following two sections will cover the command template and XML configuration file formats, giving details of how key aspects of the tool, such as data source iterators, fault tolerance, and command instantiation, work as part of the discussion. The next section will then describe how the tool can be extended to a new DRM by writing a DRM-specific module. The section after that will give several concrete examples, showing configuration files and command templates for commonly used bioinformatics applications such as BLAST and ssearch, and giving some performance results. The final section will discuss some future development directions for the tool and conclude the paper.


Command Template Format


The commands to be run on the cluster nodes (client code) and the head node (server code) are specified using basic Unix shell syntax with two simple enhancements. First, since what is being specified is a template for the commands to be run there needs to be a way to parameterize, and this is done with variables. A variable is specified as some text with two underscore characters at the front and back, e.g. __INPUT_SEQUENCE__. While the text of the variable can contain any characters, it is good practice to highlight variables by only using all caps and numbers, e.g. __FORMATTED_DATABASE instead of __formatted_database__. Second, there needs to be a way to separate client code from server code, and this is done with the triple-bar symbol ||| or the triple ampersand symbol &&&. With this in mind, here is a detailed example of an ANDY command template:


client_command1 -i __IN__ -a __A_VAL__ -O __OUT__ 2> __ERR__ &

client_err_check1 -i __OUT__ -e __ERR__ -s __EC[0,0]__ &

client_sum1 < __OUT__ > __SUM1__ ;

prepare_input < __IN2__ |

client_command2 > __OUT2__ 2> __ERR2__ &

client_err_check2 -i __OUT2__ -e __ERR2__ -s1 __EC[1,0]__ -s2 __EC[1,1]__ &

client_sum2 -i __OUT2__ -o __SUM2__ &&&

server_sum_command -i1 __SUM1__ -i2 __SUM2__ -o __RESULTS_DIR__global_sum.txt


Except for the variables and the &&& symbol, all of this is simply common Unix shell syntax. Commands are separated by &, |, ;, &&&, or |||. & means to run concurrently, but not pass the standard out channel to the next command's standard in channel, e.g. in "A & B" A and B will each run concurrently but A's standard out will not be passed to B's standard in. | means run concurrently and do pass standard out to the next command's standard in, e.g. in "A | B" A's standard out will be piped in as B's standard in. ; means sequence commands, e.g. in "A ; B" A will be run to completion, and then B will be run after A completes (standard out of A will not be piped to standard in of A). The &&& symbol above separates the client code from the server code, i.e. everything to the left of the &&& is commands to be run on clients and everything to the right of the &&& is commands to be run on the server. ||| is exactly the same as &&&, except that the standard out of the command right before ||| (which will be run on some client) will be piped into the standard in of the command right after the ||| (which will be running on the server). Note that server code is optional --- if there is no &&& or ||| then all code is client code. Currently, only &&& is supported, but ||| can be achieved indirectly via PIPEs. The ; symbol can be thought of as separating commands into distinct concurrent execution units called pipe segments, e.g. the client code in the above example has two pipe segments while the server code has just one pipe segment. Note that server code is limited to one pipe segment, while there is no limit on the number of pipe segments in client code.


The variables are where task values are input to and results read from commands, and they are also inter-command communication channels. Client side variables can be of type raw, file, pipe, or mem_buf, and server side variables can be of type pipe or mem_buf (except for __RESULTS_DIR__ above, which is a special case and is described below). There is an asymmetry between client code and server code: client code will be independently executed many times on different nodes for different tasks, but there is only one head node on which the server code will be executed. The server code is not executed once per task, but rather is started at the beginning of a ANDY run and runs

continuously until complete. It should not be necessary to run any code on the server per task --- if a user needs to do this then such code should properly be run on the client and the results sent to the server. Variables that appear both in client code and server code are assumed to be result output of client commands, and the output of these variables will be received atomically from clients by the server and piped as input to the running server code (i.e. a single client's complete results will be received, and passed into the server code, before another client's results are accepted; thus there will be no intermingling of individual task results from different clients as input to the server code). Thus, server side variables are limited to type pipe and mem_buf, and there can be only one server side pipe segment, because server code must execute through the entire run and have client results piped in continuously without any break in execution. There can also be no output variables in server code --- since the server code is the end of the distributed command pipeline it should simply write final results into permanent storage such as files, databases, etc.; the variable __RESULTS_DIR__ in the above example is a special variable which is interpolated to the full path to a directory on the server where the server code can write results.


The variables of the form __EC[M,N]__, where M and N are numbers, are also special cases, and stand for the exit code of command number N of pipe segment M (this command must be before the command in which __EC[M,N]__ appears). The ANDY clients running the commands on the cluster nodes will replace these with the full paths to named pipes, and will write the exit status codes of the referenced commands into them. This is used for fault tolerance, and the commands in which __EC[M,N]__ variables appear should be error checking commands; the ANDY clients running the commands will check THEIR exit status codes and, if any are nonzero, will flag error to the server, which will then retry the task that generated the command. The usual protocol for Unix commands is to return zero on success and nonzero on failure, and if all commands follow this protocol then __EC[M,N]__ variables are not needed. In this case, if no commands contain __EC[M,N]__ variables, then the ANDY client will check the exit status codes of ALL commands, and flag error to the server if any are nonzero which will cause the server to retry the task that generated the commands. Of course, it is possible that a task that generated an error is simply a bad task that cannot succeed, and we would not want to keep continuing such a task forever; thus, the user specifies in the configuration file the maximum number of times to retry failed tasks (this is optional and if not specified means to not do any error checking and task retrying). More about fault tolerance handling will be described below. __EC[M,N]__ variables do not need to be specified in the configuration file. However, the types and data sources for other variables, and other information, are specified in the configuration file which is described next.


Configuration File Format


ANDY configuration files are XML files with the following tags and formats.




An ANDY configuration file begins with the all enclosing ANDY_RUN tag, giving the CONFIG_VERSION attribute as 1.0 (this will be used for backwards compatibility in later versions of ANDY). The tags inside ANDY_RUN can be in any order, and are the following.


<COMMAND_TEMPLATE FILE="my_command.txt"/>


The command template (described in the previous section) is usually placed in a separate file, and the attribute FILE of COMMAND_TEMPLATE should be set to the location of this file. The command template can also be placed directly in the XML configuration file:


<COMMAND_TEMPLATE>...command template here...</COMMAND_TEMPLATE>


This is usually less desirable because then shell symbols such as >, <, etc. will need to be encoded to &gt;, &lt;, etc. to avoid having them confused as XML.


The heart of the configuration file is the VARIABLES section where variable names, types, and data sources are defined. Each variable definition is inside of a VARIABLE tag, and all VARIABLE tags are enclosed within the VARIABLES tag. The VARIABLE tag itself can have four attributes defined: NAME, TYPE, CLIENT_TYPE, and SERVER_TYPE. The value of NAME should match a variable used in the command template. The value of CLIENT_TYPE should be the type of the variable in client code (RAW, FILE, PIPE, or MEM_BUF), while SERVER_TYPE should be the type of the variable in server code (PIPE or MEM_BUF); thus, a variable can have different types in client and server code. The value of tag TYPE, if defined, will be used for the type of the variable in both client and server code (and will overshadow any values given for CLIENT_TYPE and/or SERVER_TYPE). If there is no definition for CLIENT_TYPE or TYPE the default client code type is RAW, and if there is no definition for SERVER_TYPE or TYPE the default server code type is PIPE. A variable that is used only in client code can have a data source associated with it. The data source will be iterated over to generate all concrete values for it, and the concrete values will be the input to instantiated command lines run on clients. The way the values are input depends on the variable type:


        RAW: Value is directly interpolated into the command line in place of the variable name.

        FILE: Value is saved to a file and the full path to this file is interpolated into the command line in place of the the variable name.

        PIPE: A named pipe is created, the full path to this named pipe is interpolated into the command line in place of the variable, and the value is written into the named pipe. Note that the variable can only occur once in the command line. A named pipe's contents can only be read once (you can also not seek on a named pipe) and thus a named pipe can have only a single reader (and a single writer providing the contents). In this case, the command is the reader and the external ANDY client code is the writer. Use MEM_BUF type if the variable needs to occur multiple times and it is still desired to use named pipes.

        MEM_BUF: Multiple named pipes are created. The full paths to all but one are interpolated respectively into the command line wherever the variable name appears. The value is written into the last named pipe, and a separate buffering process reads from this last named pipe and writes what it reads into all the other named pipes whose full paths were interpolated into the command line (the executing commands are assumed to read from these).


The DATA_SOURCE tag is inside the VARIABLE tag. Inside the DATA_SOURCE tag there must be a LIST tag with one or more ITEM tags within it. A TYPE attribute must be defined for the DATA_SOURCE, and the legal values for this are themselves defined in the DATA_SOURCE_TYPE_DEFS tag (see below). Each of the LIST ITEM values is assumed to refer to something that itself contains concrete values (e.g. the full path to a FASTA file containing sequences in FASTA format), and the DATA_SOURCE TYPE is associated with a Perl module (the association is defined in the DATA_SOURCE_TYPE_DEFS tag described below) that is used to parse and read these concrete values. Here is an example to make this all clear:












Here is defined a variable with name __SEQ_IN__ and variable type PIPE. Thus, a named pipe will be created and its full path will be interpolated into client commands the single place where __SEQ_IN__ appears. This variable has a data source of type FASTA_FILE, and thus each of /home/db/test1.fa, /home/db/test2.fa, and /home/db/test3.fa are assumed to contain values that can be parsed with a Perl module associated with the FASTA_FILE DATA_SOURCE TYPE (i.e. they are each FASTA files to be read by a FASTA file reading module).Each of the FASTA sequence records read from /home/db/test1.fa, etc. will be written into the named pipe when executing the commands (and the other variables with data sources will also have values written for them).


Variables without data sources are assumed to be for output and inter-command communication and can have the following types with associated behavior:


        FILE: A full path to a file is interpolated wherever the variable name appears in the command line (it is assumed that the earliest executing command where the interpolation took place will actually create and write the file, and later commands will read it).

        PIPE: A named pipe is created and the full path to this named pipe is interpolated into the command line in place of the variable occurrences. However, again since a named pipe can have only a single reader and writer, there can be at most two occurrences of the variable, with the first being the source (writer) of the named pipe and the second being the reader. If the variable result value is to be sent to the server then the variable name can only appear once (the external ANDY client code is the reader in this case, reading the output contents from the named pipe and sending it to the server). MEM_BUFs can be used in cases where the contents of a named pipe needs to be written into multiple other named pipes.

        MEM_BUF: Multiple named pipes are created. The full path to the first named pipe is interpolated into the first (leftmost) occurrence of the variable in the commands, and this is assumed to be the source. The full paths to the other named pipes are interpolated respectively at all the other occurrences of the variable in the command line. If the result value of the variable is to be sent to the server, then another named pipe is created which the external ANDY client will read from and write what it reads to the server. Finally, A separate buffering process reads from the first (source) named pipe and writes what it reads to all the other named pipes (the executing commands and/or the external ANDY client are then assumed to read from these).


Here is an example of a variable definition without a data source:




Note also that a PIPE or MEM_BUF variable must have all its occurrences within a single pipe segment, since named pipes, which are used in the underlying implementation of PIPE and MEM_BUF types, must be read from and written to by concurrent processes (MEM_BUFs could technically cross pipe segment boundaries, but that would require the external ANDY client buffering process to buffer completely the contents of the source named pipe, and this is undesirable and thus was made illegal). Also, while not prohibited by the system, care should be taken in doing inter-command communication through files within a single pipe segment, since there could be a race condition where the readers of the file might execute before the creator/writer and die when they see that the file does not exist yet.


The DATA_SOURCE_TYPE_DEFS tag is where the associations between data source types and the Perl modules that implement them are made. The DATA_SOURCE_TYPE_DEF tag defines a data source type, and all DATA_SOURCE_TYPE_DEF tags are enclosed within the DATA_SOURCE_TYPE_DEFS tag. Here is an example:











The three DATA_SOURCE_TYPE_DEF attributes NAME, TYPE, and MODULE are all required. NAME is the name of the data source type, and is what is used as the TYPE attribute of the DATA_SOURCE tag described above (i.e. it is the link between the definition and use of a data source type). MODULE is the name of a Perl module (which should be accessible to the Perl interpreter that is running the server, i.e. @INC should include its directory) which implements the data source type, i.e. is able to read each of the LIST ITEMs (e.g. FASTA files) of that type and iterate over all their individual records, returning each individual record in turn along with an index and text identifier for it. The index is unique and consists of two parts, the index of the LIST ITEM and the index of the record therein being referenced, e.g. 3rd FASTA file, 68th sequence record. The index is usually just the numeric index, but in general can be anything that uniquely identifies the record. The text identifier also consists of two parts, the identifier for the LIST ITEM and the identifier for the referenced record therein. It does not have to be unique, but should ideally be something that users can use to identify and get information about the referenced record; an example would be to use the FASTA filename and defline identifier of a FASTA sequence record contained therein as the text identifier. Data source modules also must be able to randomly access records given their indices for fault tolerance (i.e. the ANDY server needs to be able to re-retrieve failed records to retry them). TYPE must be either ITEM_READER or FULL_READER. A MODULE that is a FULL_READER iterates directly over all the LIST ITEMs and their contained records, e.g. iterates over all FASTA records contained in three FASTA files whose paths are ITEMs in a LIST. A MODULE that is an ITEM_READER only iterates over the records contained in a single LIST ITEM; in this case, ANDY has internal code which uses the ITEM_READER MODULE as a sub-module to iterate over all the records of all the LIST ITEMs (the internal code uses the numeric index of the LIST ITEM as its index, and the LIST ITEM's full value as its identifier). Finally, the ANDY server will iterate through all data sources through their data source reader modules to create all possible combinations of their individual data source values: each such combination will become one task to do at a client.


With the publicly available distribution of ANDY we provide implementations of data source reader modules for several common formats such as FASTA files, directory of files, raw list (i.e. each LIST ITEM is itself the value to be used), and separated file (i.e. each LIST ITEM is a file with records separated by some user-specified text). New data source reader modules can be easily created and used, either as FULL_READERs or, more easily, as ITEM_READERS. The included data source readers in the ANDY distribution detail and are examples of the interface that data source reader modules must implement. In addition, we include several base class data source reader modules which can be easily subclassed to create specific data source reader modules of common types. For example, we provide a module which can be subclassed to create data source reader modules for any file type that contains a sequence of records --- the subclass need only provide the implementation of a single method which, given a file handle, reads the next record from the file and returns it (along with a text identifier for it); the FASTA file data source reader module is a subclass of this and can be studied as an example. We also welcome submissions of new data source reader modules, and any other useful ANDY related code, to be shared with the ANDY user community.


Variables and data sources can have attributes defined for them inside their tags, and the following are the legal attributes and their behavior. A variable with attribute SERVER_SAVE defined will have its output results saved at the server. Inside the SERVER_SAVE tag, specify the template for the name of the result file at the server. Templates are specified the same as the LIST ITEM of ID_INDEX_INTERP data sources, described next. Here is an example of a variable with SERVER_SAVE attribute defined:






RAW variables whose data sources have attribute ID_INDEX_INTERP defined as YES are treated as follows. First, only one item may be specified in the DATA_SOURCE list. This should be a string into which will be interpolated (at the client for each task) the current values of identifiers and indices for the current task, where the locations at which these should be interpolated are specified with the syntax (all other text is left as-is):




The post-interpolation value of the list item will then itself be interpolated into the command line in place of the variable name, like any other raw variable.


For example:







-j '[__JVALS__.INDEX0][__JVALS__.INDEX1]'






v     ID0 refers to the identifier of the list item corresponding to the referenced variable's current value (e.g. FASTA file name "test1.fa").

v     ID1 refers to the identifier of the particular element inside the list item corresponding to the referenced variable's current value (e.g. FASTA defline ID).

v     INDEX0 refers to the index of the list item corresponding to the referenced variable's current value (e.g. ITEM 3 of 10 LIST ITEMs).

v     INDEX1 refers to the index of the particular element inside the list item corresponding to the referenced variable's current value (e.g. FASTA sequence record 50 out of 9000).


Note that variable names with ID_INDEX_INTERP data sources cannot themselves be referenced in an ID_INDEX_INTERP data source list item.


RAW variables which have data sources with attribute SERVER_DIR defined are treated as follows. First, only one item may be specified in the data source's list. This item should be the location of a directory on the server into which result files should be placed and where the ANDY server can place temporary files. There must be exactly one SERVER_DIR defined. Similarly, RAW variables which have data sources with attribute CLIENT_DIR defined are treated the same as SERVER_DIR, except the specified directory is on the cluster nodes (where the ANDY client can place temporary results and files). Note that if a command must output something to a file, but it is not desired to save this result, give path "/dev/null" for the path to have it thrown away (without causing deadlock from not reading it).


Variables with data sources of type TASK_RESULTS_INFO are special and only used in server code. Their are no list items associated with the data source. Input pipes and mem_bufs in server code will have written to them the corresponding variable output from all the clients sequentially, one after the other with no break. Thus, the server code will need some way to determine where the breaks between different results from different clients are and which task a result corresponds to. The ANDY server can optionally pass this information in through the pipe or mem_buf corresponding to a variable with a data source of type TASK_RESULTS_INFO. The server will write into this pipe or mem_buf in a regular, parseable format as follows:










Lines with just one value, N, after the variable name specify variables whose output value from a client is written into a pipe or mem_buf in the server code, and N is the length in bytes of this output value. Lines with INDEX0\tINDEX1\tID0\ID1 give the index and identifier information of the input task variables at the client that generated this output, thus allowing the server code to associate output with the input that produced it. INDEX0, INDEX1, ID0, and ID1 mean the same as described previously for ID_INDEX_INTERP and SERVER_SAVE. Thus, by reading and parsing a TASK_RESULTS_INFO pipe or mem_buf the server code can know how much to read for each task's output results, where the breaks between different results from different clients are, and the task that these results correspond to. Here is an example:






As described previously, the SERVER_SAVE template will be instantiated to generate the filename into which to save a result from a client. By default, all such results will be saved inside a subdirectory (named based on a concatenation of the user's login name, machine name, and timestamp) inside the SERVER_DIR. One can optionally specify that output results should be saved in further subdirectories inside SERVER_DIR as follows:




If SERVER_SAVE_SUBDIR_TEMPLATE is defined, the server will take a substring starting at the Mth character from the left of length N characters for each instantiated SERVER_SAVE filename, and create a subdirectory whose name is this substring; the corresponding result file will then be saved in this subdirectory.


The DRM-specific Perl module must be specified using the DRM tag as follows:




This module should be accessible to the Perl interpreter that is running the server, i.e. @INC should include its directory. The ANDY distribution includes DRM-specific modules for PBS and for the Berkeley Millennium cluster.


Also, the DRM-specific module can have its own configuration information (e.g. full path to qstat, qsub, etc.) and the DRM_CONFIG tag allows a user to specify a separate configuration file that will be passed into the DRM-specific Perl module's constructor; this is optional, and nothing will be passed into the constructor if not specified. Note also that the DRM configuration file does not need to be in XML, and its format depends on the specific DRM-specific module being used.




The following three tags specify how many jobs will be submitted by the ANDY server and how many tasks each will do. The value of GRAIN_SIZE specifies how many tasks will be sent at once from the server to a client when it connects to the server requesting a task. This value can be experimented with to balance communication overhead and dynamic adaptability to different task lengths (i.e. a higher value means less communication overhead, but less task granularity so that some jobs might be assigned many long running tasks and thus become bottlenecks reducing performance; a smaller value means better adaptability to differing task lengths, but more client-server communication overhead). NUM_JOBS is how many client jobs the server will submit; each job upon starting on a cluster node will continuously do tasks until there are no more tasks to do and the server says to exit (this is thus dedicated mode). If MAX_GRAINS_PER_JOB is specified then NUM_JOBS is ignored (it is not required if MAX_GRAINS_PER_JOB is specified). In this case, each job will do GRAIN_SIZE * MAX_GRAINS_PER_JOB tasks, and the server calculates how many total jobs need to be submitted in order to complete all tasks (each job thus does a specified amount of computation and then exits, and this is therefore fair mode).






The KEEPALIVE_CONNECTIONS tag should have value YES or NO. If YES, ANDY clients will maintain a constantly open socket connection to the ANDY server, and if NO they will initiate a new socket connection for each transaction with the server. For runs with a small number of clients KEEPALIVE_CLIENTS YES could give better performance by eliminating socket connection overhead. For larger runs with more clients, KEEPALIVE_CLIENTS NO might be necessary because of too many open file descriptors if YES (sockets use file descriptors and Unix/Linux has limits on them), or at any rate could give better performance by eliminating the overhead of select-based switching among open sockets. Again, this is something that can be experimented with for better performance.




The command template format section above described how clients will check exit status codes of commands, flagging error to the server if any are nonzero so that the server can retry. ERROR_RETRY_COUNT specifies how many times the server will retry a failed task; if greater than zero it will retry that number of times, if zero the error will be flagged to the user in the run's log file but the task will not be retried, and if not defined no exit status code error checking will be done. If an ANDY run is in fair mode then a new job will be submitted for each failed task to be retried, and if in dedicated mode no new jobs will be submitted for failed tasks to be retried (they will simply be eventually given to one of the long running clients).




Retrying failed tasks is one aspect of fault tolerance handling. The other major part is client job fault tolerance, i.e. monitoring ANDY client jobs, resubmitting failed ones, and retrying the outstanding tasks of failed clients. Client fault tolerance in ANDY uses two sources of information. First, DRMs generally provide a "qstat" command which returns the status of jobs submitted to the cluster, i.e. telling which jobs are waiting in queues to be executed, which ones are running, which ones are in error, which ones are suspended, etc. ANDY uses the results of periodic calls to a DRM's qstat in determining which jobs have failed. Second, ANDY clients "ping" (i.e. send a short message to the server over a socket connection) the server periodically to let it know they are alive and still running, and ANDY keeps track of each clients' last ping time and uses this information also in determining failed jobs. In fact, the more reliable of these two sources of information is the history of pings from the clients, and once the first ping is received from a client the qstat information for it is no longer used. However, since in general we can assume no time boundaries for how long it will take a job to proceed from "queued" state to "running" state --- it depends on how many jobs are ahead in the queue and how long they run --- up until receiving the first ping from a client we will need to use the results of qstat to monitor it.


The fault tolerance mechanisms themselves should be, in a sense, fault tolerant, i.e. eventually able to determine with high confidence that a failed job has indeed failed but conservative in making this estimation. For example, a single call to qstat might transiently fail or a client might miss a ping, even a few times in a row, and we do not want to overburden the DRM and/or lock out other cluster users behind a long queue of unnecessary jobs by spuriously resubmitting false positive failed jobs over and over. Thus, in ANDY determinations of client job failure are based on a history of multiple calls to qstat and multiple pings from clients, which smooths out isolated failed qstat calls or missed pings. Note also that ANDY does not fail if jobs that were determined failed actually are still alive and continue running and communicating with the server; the server will receive any results from these jobs (but not give any new tasks), keep track of which tasks have completed so that it doesn't mistakenly accept results for the same task more than once, and not exit until all tasks complete.


With this in mind, here are the details of client job fault tolerance in ANDY. The actual call to a DRM's qstat call is done in the DRM-specific module, and the method in the module that makes this call must translate the DRM-specific state of each job into an abstract ANDY state: Queued, Running, Error, Suspended, or Other. Queued means the job is okay and waiting to be run, but has not yet started executing (i.e., it is waiting in a queue to run); Running means that the job is actively executing on a cluster node; Error means that the job has failed or is otherwise in error; Suspended means the job is not in error and was previously running but is now in a suspended, not-running state and will remain so for an unknown amount of time (and thus the job cannot send periodic pings which the fault tolerance must take into account); finally, Other means the job is in some state that does not match any of these. Also, immediately after a client job is submitted and before any qstat results are gotten for it, it is put in the Init state. A separate "Pinging" state is kept for each client job: if a ping has been received from a client its pinging state is 1, otherwise 0. Then, the ANDY server keeps track of each client's job state and pinging state and determines failed jobs based on these rules:


v     Keep a timestamp for the last time each client sent a ping, and reset it whenever a client pings.

v     Keep a timestamp of the last time each client's job state changed, and reset it whenever the job state changes to something different.

v     For client jobs in pinging state 0:

         If the job stays in Init job state > T0 seconds then the job has failed and is resubmitted. Its outstanding tasks are given out to a client to be done again.

         If the job stays in Error job state > T1 seconds then the job has failed and is resubmitted. Its outstanding tasks are given out to a client to be done again.

v     For client jobs in pinging state 1:

         If the job switches from Suspend state to any other state, then reset the last ping timestamp to the current time.

         If the job has been in Suspend state for greater than T2 seconds, but less than T3 seconds, then do nothing (job is not considered failed yet).

         If the job has been in Suspend state for greater than T3 seconds, and a ping hasn't been received in more than T4 seconds, then the job has failed and is resubmitted. Its outstanding tasks are given out to a client to be done again.

         If neither of the previous 2 rules apply, and if a ping hasn't been received in more than T4 seconds, then the job has failed and is resubmitted. Its outstanding tasks are given out to a client to be done again.


The server does fault tolerance handling (i.e. checks the above rules and resubmits failed jobs) at regular intervals. The FAULT_CHECK_INTERVAL tag value specifies the length in seconds of the intervals:




ANDY client jobs ping the server at regular intervals, and the CLIENT_PING_INTERVAL tag value specifies the length in seconds of the intervals:




Finally, the values T0,T1,T2,T3, and T4 referenced in the above rules are specified as follows:








The user can set the values of these fault tolerance parameters to balance as needed for his application quick fault tolerance response time versus application performance; smaller values for these values mean failures will more quickly be detected and handled, but at the potential cost of degraded performance since more compute time will thus be spent maintaining fault tolerance and not in doing the actual application computation.


The last tag in the configuration file is required and is NODE_PERL, whose value should be the full path to a Perl interpreter to be used on the cluster nodes to run ANDY clients:




Finally, the ANDY configuration file ends with the closing ANDY_RUN tag:




Writing a DRM-specific Module


To extend ANDY to be used on a new DRM, one must write a small DRM-specific Perl module with 3 required methods:


1)     A constructor "new" that optionally takes a DRM-specific configuration file as an argument:


sub new {

my ($class,$drmConfigFile) = @_;



2)     A method "submitJob" which submits jobs to the cluster using the DRM's interface:


sub submitJob {

my ($this,$execClientCmd) = @_;



$execClientCmd is an array reference that contains the parts of the command to run the ANDY client on the nodes, which submitJob should submit to be run on a cluster node. submitJob should return a DRM-specific identifier for the submitted client job, or nothing on failure. The identifier can be anything, but it should uniquely identify the job and is used in the next required method to query job status. Most DRMs return such a unique job identifier as the return value of their "qsub" command, and this can be used to query the job's status with the DRM's "qstat" command. Unix process IDs could also be used for this (the Berkeley Millennium module does this).


3)     A method "jobStates" which, given the DRM-specific job IDs (i.e. returned by submitJob) of all submitted, still-running client jobs, queries their DRM-specific job status and translates this into the abstract ANDY job status described previously (i.e. Queued, Running, Suspended, etc.) and returns it.


my %jobStateMap = ( "E" => "E", #exiting --> E

"R" => "R", #running --> R

"Q" => "Q", #queued --> Q

"H" => "Q", #held --> Q

"T" => "R", #transition -> R

"W" => "Q", #waiting --> Q

"S" => "S" #suspended --> S



sub jobStates {

my ($this,$jobIdsHash) = @_;



Most DRM-specific modules can just run the DRM's "qstat" command and translate the results to the abstract ANDY job states. Another possibility (used by the Berkeley Millennium module) is to do "ps -uxw" on all the nodes running clients and use the Unix process status information (this is done using "gexec" in Berkeley Millennium, but could also be done with rsh or ssh, e.g. in the case where there is no proper DRM installed and the DRM-specific module must encapsulate the necessary job submitting and monitoring functionality).


Since the DRM-specific module is a Perl module that is instantiated to an object, state can be kept in it to support advanced and cluster-specific scheduling policies and allow the full potential of the DRM to be exploited. For example, the BLAST program memory maps the database to be searched for performance, and later executions of BLAST can make use of the still memory mapped database of a previous BLAST execution to avoid the performance hit of having to re-memory map. This could be exploited for increased performance in a DRM-specific module for running BLAST (and other programs that similarly memory map). The DRM-specific module could be written to store the cluster nodes on which previously submitted BLAST-executing clients ran, and to preferentially run new BLAST-executing clients (searching the same database) on one of them (it is also desirable to schedule preferentially away from cluster nodes that recently memory-mapped other large databases, i.e. to try to avoid clobbering another run's memory mappings). We have not tried this yet, but have plans to in future development of the ANDY cluster tool.


Conclusions and Possible Future Directions


We have built the ANDY cluster tool as a practical and simple-to-use, extensible and fault tolerant tool for running large biological database searches, and similar applications, over the nodes of a Linux/Unix computer cluster managed by a DRM in order to achieve speedup through parallelism. In our tests it achieves good performance of close to 90% processor efficiency on our 64 node cluster, and it has been widely used in our computational biology lab in support of research. Many other research labs and organizations will likely have similar clusters and can benefit from ANDY, and we are thus releasing it freely to the public at in hopes that it proves useful to a wide audience.


There is still room for improvement of ANDY, however, and the following are some key directions for further investigation, development, and experimentation.


        Server side fault tolerance.


ANDY's fault tolerance mechanisms are currently limited to client fault tolerance, and a run can fail if the ANDY server fails. In this case there will still be partial results (any result files and summary information received before the server failed). But there needs to be mechanisms for server fault tolerance. Some possibilities include having backup server processes or having a mechanism to restart from the logfile that ANDY generates during a run (or restart from some other persistent record of the current state of the run). Restarting from the logfile is probably the most practical and straightforward way. However, there are problems that would need to be solved for cases when server summary code is used. If there is no server summary code, then all the client results are each individually stored in separate files and restarting from the logfile should be easy --- any partially finished tasks which weren't recorded as complete in the logfile can simply be redone and any partial results overwritten (and tasks recorded as complete don't have to be redone). In other words, transactional rollback is not a problem if there is no server code, but is a problem if there is server code since server summary code generally appends all summary results to single files, and in general ANDY cannot know what partial results should be expunged from such summary files in order to restart in a consistent state. The solution will probably require the user to parse the partial summary files and get them into a consistent state before restart, and we should concentrate on making this as easy to do as possible.

        More flexibility in specifying tasks and task order.


ANDY currently allows a single command template to be specified, and iterates over all data sources and each combination of data source values becomes a task. This is generally what is desired, but there might be cases where tasks and task order need to be more flexibly specified, or multiple command templates need to be run. For example, the user might not want to execute commands for all combinations of data source values, or might want to order them in a special way (the only control over task order the user currently has is by the order of the data sources in the XML configuration file --- the top to bottom order in the configuration file matches the outer to inner loop ordering of the iteration). One possible simple solution to the task and task ordering problem would be to have a separate utility which iterates over the data sources in the normal, default order and prints them (their indices) in order to a task file. The user could then reorder and delete from this task file as desired, and tell the server to do the tasks in this file, in order, instead of iterating over the data sources in the default way. As far as allowing different command templates, one could imagine creating a meta- configuration file to specify multiple ANDY runs that execute together as part of a larger run; but it would probably just be easier to consider ANDY a primitive, basic tool and let users write their own scripts that run ANDY multiple times for multiple command templates.


        Persistent server, clients, commands, and RAM-Disk for better performance.


The more we can eliminate the overhead of starting up or switching between processes, having to startup the Perl interpreter, writing to disk, etc. the better performance we can get. In the current ANDY, for each separate run a server and many clients must all be started up. If several users are all doing ANDY runs on the same cluster, they will all be running their own independent servers on the head node, and these independent servers will be competing for CPU time, there will be overhead from the operating system switching these different servers in and out, and there will be redundant use of memory (data structures which could be shared among all the servers but are instantiated in each). Top performance of the server is crucial to get overall good performance, since the slower the server is the more the clients will be held up waiting to get tasks from or send results to it and thus overall performance will be degraded. Thus, having a single persistent ANDY server running on the cluster head node which all users use for their runs would reduce this overhead and give better performance. This would also allow more state to be kept to help in scheduling (e.g. the single server could know at once where all users' jobs were submitted and what databases they are using, etc., and could use this information to better schedule new jobs, etc.)


We could also do the same thing with the ANDY clients, and this could give an even bigger performance boost, especially for "fair mode" runs (where each job does a specified amount of work and then exits, and the server submits enough jobs at the beginning so all tasks can complete --- client startup overhead, especially if many clients were submitted, can be large). A persistent client job could run (started outside of the DRM) on each cluster node; the ANDY server would still submit jobs to be run on the cluster nodes as before, but these jobs instead of running the client code themselves would simply contact the persistent client running on the node and tell them to start executing on behalf of them (i.e. tell them the command line to execute, the ID to use in talking with the server, etc.) If there are no ANDY jobs active on a node, then the persistent client will just sleep and not consume the CPU while waiting for a connection from a new job. Having persistent clients would remove the overhead of having to continually startup the Perl interpreter, load in Perl modules, initialize data structures, etc. and would give a performance benefit in the same way that running CGI scripts in the Apache web server through mod_perl gives a substantial performance benefit as compared to running raw CGI scripts (mod_perl is basically a persistent Perl interpreter running inside Apache, and thus there is no per-script Perl interpreter startup overhead). Of course, we could avoid the overhead of starting up and using the Perl interpreter by rewriting ANDY in a language such as C or C++ and compiling and linking it to a binary machine executable, and this is something else to consider.


It might even be possible to take this even further and have certain specific application commands be persistent. For example, BLAST can run in a mode where, in a single execution of BLAST, input FASTA sequences can be continually piped in and BLAST pipes the results of them concatenated in order through standard out or an output file. Thus, for BLAST and similar applications in this respect, we could potentially have ANDY clients execute them once, thus avoiding the overhead of multiple forks and execs and multiple reloading of the databases into memory, set their input and output channels to autoflush (so input is immediately sent and output is immediately received from it without any intermediate stdio buffering), and then continually feed input to them and read the corresponding output (ANDY would need to know how to break the concatenated output stream into distinct task result chunks to send back to the server).


One final idea is, instead of using named pipes to avoid having commands write to disk, set up a RAM disk and have the executing commands read and write file input and output to this. This would be less portable and more difficult to set up, but would make it much easier for users to specify their commands and runs (they wouldn't then have to worry about specifying type PIPE or MEM_BUF --- everything except RAW would be FILE, but would really be flowing through the memory of the RAM disk).


        Cluster node local disk space management


Programs such as BLAST take a database to search against as an argument, and read this database file from disk into memory to search against. A common use for ANDY is to run many executions of programs like BLAST in parallel over the cluster nodes, and these parallel executions will all need to read the same database at the same time. Where will they read it from? This was never really discussed above; in the examples the databases were read from the cluster head node over NFS. For small to moderate sized databases this would not be a problem; there would be a small hiccup at the beginning as the starting clients all read the database over NFS into memory at the same time, but then the database would be memory resident on all the nodes and would not need to be read from disk again. However, for very large databases (ones that approach or exceed available memory) to be read over NFS from the head node would be disastrous for performance. Such databases could not be completely memory resident, and the parallel executing clients would need to continuously read and reread parts of the database from disk over NFS into memory, and the NFS server would be overburdened and might even fail; performance could be worse than if the job had simply been run serially on a single computer.


In cases where a database exceeds available memory and will need to be continually read from disk, the database file should be available locally on each client's disk and accessed from there independently by each client, or in some other way the access should be made efficient. With the ever increasing size of databases used in computational biology, this is becoming the common case which should be optimized for. A simple, albeit expensive, solution would be to employ a high-performance distributed file system such as IBM's General Parallel File System ("GPFS"), and we have considered this. Alternatively, in support of ANDY we could build a complementary disk space management system that could quickly distribute large database files to all the local node disks. To achieve this, we could keep track of the locations of database files (i.e. locations on the head node and any copies on node local disks) and build a program that would copy a requested database to all node local disks in a binary tree pattern starting from the currently available locations. For example, if there were 3 current copies, 3 initial copies could be started, then 6 more once the first 3 complete, then 12 more once those complete, etc., until copies exist at all node local disks. Each new copy location is entered into the record of database file locations to be made available for future copies.


Another thing to consider is that many different people will be using the cluster and filling up node local disk space with databases and other files used by their cluster jobs. The disk space management system thus must balance two conflicting goals: (1) We don't want the local disk space to become too cluttered and fill up to capacity so that jobs to be run don't have enough disk space for their needs, and (2) At the same time, we'd like to keep users' files, especially their often used large database files, around for as long as possible so that jobs can keep efficiently reusing them and thus they won't be recopied over and over. The basic idea for a solution to this can be summarized as "erase on demand" or "lazy erase" and is similar to the way cache memories and virtual memory page tables are managed: the disk space management system does not erase files until necessary. A job running on a cluster node can request to have a large file copied to the node's local disk, and the disk space management system will free up just enough disk space to accommodate the file (or free nothing if there is already enough space). Then, to perform the actual copy we can employ a dynamic generalization of the binary tree copying strategy described above: each copy request must first obtain an open "slot" for the file to be copied (i.e., only allow a fixed number of simultaneous copies for a given file at some location, to avoid overburdening the file system). The disk space management system keeps track of how many slots are in use and how many are available and from where, and copy operations must block until a slot is available. Finally, while we would want to employ some kind of "least recently used" strategy in picking files to erase to make space for new copies, the disk space management system must ensure that the files of actively running jobs are not erased.