Showing posts with label abstractions. Show all posts
Showing posts with label abstractions. Show all posts

Friday, July 3, 2009

Make as an Abstraction for Distributed Computing

In previous articles, I have introduced the idea of abstractions for distributed computing. An abstraction is a way of specifying a large amount of work in a way that makes it possible to be distributed across a large computing system. All of the abstractions I have discussed so far have a compact, regular structure.


However, many people have large workloads that do not have a regular structure. They may have one program that generates three output files, each consumed by another program, and then joined back together. You can think of these workloads as a directed graph of processes and files, like the figure to the right.

If each of these programs may run for a long time, then you need a workflow engine that will keep track of the state of the graph, submit the jobs for execution, and deal with failures. There exist a number of workflow engines today, but without naming names, they aren't exactly easy to use. The workflow designer has to write a whole lot of batch scripts, or XML, or learn a rather complicated language to put it all together.

We recently wondered if there was a simpler way to accomplish this, A good workflow language should make it easy to do simple things, and at least obvious (if not easy) how to specify complex things. For implementation reasons, a workflow language needs to clearly state the data needs of an application: if we know in advance that program A needs file X, then we can deliver it efficiently before A begins executing. If possible, it shouldn't require the user to know anything about the underlying batch or grid systems.

After scratching our heads for a while, we finally came to the conclusion that good old Make is an attractive worfklow language. It is very compact, it states data dependencies nicely, and lots of people already know it. So, we have built a workflow system called Makeflow, which takes Makefiles, and runs them on parallel and distributed systems. Using Makeflow, you can take a very large workflow and run it on your local workstation, a single 32-core server, or a 1000-node Condor pool.

What makes Makeflow different from previous distributed makes is that is does not rely on a distributed file system. Instead, it uses the dependency information already present in the Makefile to send data to remote jobs. For example, if you have a rule like this:

output.data final.state : input.data mysim.exe
./mysim.exe -temp 325 input.data


then Makeflow will ensure that the input files input.data and mysim.exe are placed at the worker node before running mysim.exe. Afterwards, Makeflow brings the output files back to the initiator.

Because of this property, you don't need a data center in order to run a Makeflow. We provide a simple process called worker that you can run on your desktop, your laptop, or any other old computers you have lying around. The workers call home to Makeflow, which coordinates the execution on whatever machines you have available.


You can download and try out Makeflow yourself from the CCL web site.

Tuesday, April 14, 2009

Distributed Genome Assembly on 1000 Computers

Lately, my research group has been collaborating with Prof. Scott Emrich on several problems in bioinformatics. Our students Chris Moretti and Mike Olson have been building a system for carrying out whole-genome assembly problems on campus grids. They recently got it scaled up to run on nearly 1000 nodes spread across Notre Dame, Purdue, and Wisconsin, making the problem complete in a few hours instead of a few weeks. We are excited to move the system into production use to start working on some real assembly problems.

Here is what the genome assembly problem looks like from a computer science perspective. As you should remember from biology class, your entire genetic makeup is encoded into a long string of DNA, which is a chemical sequence of base pairs that we represent by the letters A, T, C, and G. A sequencing device takes a biological sample, and through some chemical manipulations can extract the DNA and produce your entire string of DNA, which is some 2 billion characters (bases) long:

AGTCGATCGATCGATAATCGATCCTAGCTAGCTACGA

Except that it isn't that simple. The chemical process of extracting the sequence runs out of energy after about 100-1000 characters. depending on the exact process in use. Instead what you end up with is a large set of "reads" which are random substrings from the entire genome. For example, here are three random substrings of the previous string:

1. ATCCTAGCTAGCTACGA


2. AGTCGATCGATCG

3. CGATCGATAATCGATCCTAG

Now, you have to examine all of the reads, and figure out which ones overlap. In principle, you want to compare all of them to each other with the All-Pairs framework, but that would be computationally infeasible. Instead, there are a number of heuristics that can be used to generate candidate pairs, which then can be matched in detail and then assembled. For example, the three reads from before overlap like this:

AGTCGATCGATCGATAATCGATCCTAGCTAGCTACGA

.....................................

AGTCGATCGATCG........................

.......CGATCGATAATCGATCCTAG..........

....................ATCCTAGCTAGCTACGA

There are many wide open questions of exactly what heuristics to use in selecting candidates, performing alignments, and completing the assembly. Our job is to give researchers a modular framework that allows them to try many different kinds of algorithms, using hundreds or thousands of CPUs to complete the job quickly.

We started with the work queue framework from the Wavefront abstraction. An assembly master process reads the candidates and sequences from disk, builds small units of work, and sends them out to worker processes running on various grids. No particular alignment code is baked into the system. Instead, the user provides an alignment program written in whatever language they find convenient. The system moves the executable and the necessary files out to the execution node, and puts it to work.

Here is an example of the system in action on a multi-institutional grid. The X axis shows time, and the various lines show number of tasks running (red), percent complete (blue), and cumulative speedup (green). We started by running a worker on one workstation, then another, then on a 32-node cluster, then on the Notre Dame campus grid, then on Condor pools at Purdue and Wisconsin, growing up to nearly 700 CPUs total. About halfway through, we forced a failure by unplugging the workstation running the master. Upon restarting, the master loaded the completed results, and picked up right where it left off.



I'm looking forward to putting our system into a production mode and attacking some really big problems.

Wednesday, February 25, 2009

On Parallel Programming with Processes

About once a week, a well-meaning person stops by my office to ask a question like this:

I need to run about 1000 simulations that take about an hour each. I can't wait a thousand hours for the results, so I need to parallelize my simulation. So, should I re-write my application using threads, MPI, or something else?


For some reason, they are always disappointed by my response:

Just run multiple copies of your program at once.

The reasoning is very simple. You already have a complete, debugged program. You have multiple processors, and your operating system knows how to use them. Running four processes at once on a four CPU machine will give you four times the number of results in the same amount of time. Your work will be down in 250 hours instead of 1000. In fact, you can take the same sequential program and submit it to a large batch system that can run on 100 different processors at once and complete one hundred simulations in one hour. If you only get 99 hosts, that's ok, you will still get a 99x improvement.

The alternative is almost too awful to contemplate. Those who have written multithreaded or message passing programs knows that it sounds great on the chalkboard, but the reality is much more complicated. Debugging tools are ineffective on parallel programs. Many existing libraries are not thread safe. You have to deal with synchronization problems, and an endless slew of tuning parameters. If you write a message passing program that requires eight hosts, then you need to wait until you have exactly eight hosts available for your exclusive use. It is all too likely that you will spend more time trying to correct the program than you actually will running it.

The funny part is, many people do not like this advice. But... that's not... parallel! Or, if they concede it's parallel, it's merely embarassingly parallel, or even worse, shamefully parallel. (As if using 100 CPUs simultaneously with processes was somehow less parallel than using 8 CPUs with threads.) They were hoping to doing some really difficult, macho programming, but now find there is a simple solution to the problem.

Now, I'm over-simplifying a little bit. There are certainly cases where it makes sense to take an existing program and parallelize it to use multiple processors. There are a few good reasons for doing so. First, if you really need one particular result as soon as possible, then it makes sense to parallelize. For example, if you are predicting tomorrow's weather, you need the result before tomorrow. Second, if your sequential program has fully consumed another resource on the machine, then it may make sense to parallelize. For example, if your simulation uses all available memory on a machine, then you cannot run two copies at once on the same machine. Third, if one program will run for longer than you can realistically keep a computer up without rebooting, then it may make sense to parallelize. However, none of these cases are as common as you might think, and it's usually best to avoid threads or message passing until the necessity has been proven.

A middle ground that we have been exploring in my lab is the use of abstractions to represent large parallel programs. In this approach, the user provides a sequential program that performs the key kernel of computation in which they specialize. Many invocations of the kernel are then combined together to run very large parallel programs with parallelism measured in hundreds of CPUs. You can read more about the BXGrid, Wavefront, All-Pairs, and Classify abstractions.

Monday, December 29, 2008

BXGrid: The Biometrics Research Grid

One of our graduate students, Hoang Bui, presented a poster on the Biometrics Research Grid (BXGrid) at the IEEE e-Science conference in Indianapolis a few weeks ago. BXGrid is a large data repository that we have built to support both production research in biometrics and to provide a platform for research in large scale data intensive computing. It provides another example of the idea of abstractions for distributed computing. This project is a collaboration between Dr. Patrick Flynn and my research group at Notre Dame.

The Computer Vision Research Lab studies methods for identifying people via biometrics such as fingerprints, iris scans, and surveillance videos. The group collects hundreds of thousands of images and movies from hundreds of volunteers on campus, and uses them to test clever new identification algorithms. For example, here is an atlas of photos from one particular subject (our department chair):



Each image is annotated with metadata that describes who the subject is, what camera took the picture, what the conditions were, and so forth. In BXGrid, you can see all the metadata for a given image like this:



Before BXGrid, all of this data was stored in an ordinary file system as big directories of images. This worked acceptably, but required an enormous amount of error prone scripting in order to answer interesting research questions. For example, a user might want to locate all close up face images taken in low light with a given camera, using only data for subjects with more than twenty images. You can do this in a filesystem, but it isn't easy, and it certainly isn't fast.

So, we designed BXGrid to be a filesystem-database hybrid that can store large amounts of data reliably, but enable new modes of exploration. The system consists of one central database that indexes all of the metadata, and sixteen active storage servers that provide storage that scales in both capacity and performance. Each item in the system is replicated three times across the cluster for reliability, so you can continue to operate even with several storage servers offline. A nice web interface on the front makes it easy to search, download, and process data from your desktop.

What's more is that the database really simplifies tasks that were previously arduous. For example, when ingesting new data into the system, a human needs to manually verify that each image really is of the intended person. BXGrid can simply pop up a screen that shows newly images alongside a selection of known good images of the old subject, and the user can quickly scan them and press a button if there is a problem. What used to be a Sisyphean task for one poor graduate student can now be accomplished by ten people working together in a few hours. Here is what it looks like:



The next step is to automate research tasks using abstractions. Many research problems in biometrics can be answered using the following formula:

  1. Select a number of images according to some criterion.
  2. Transform those images by a standard function.
  3. Compare all of those images to each other with another function.

To formalize it a little bit, we call this the BXGrid abstraction:

  1. S = Select( R );
  2. T = Transform( S, F(s) );
  3. M = AllPairs( T, G(x,y) );

Although easily stated, each of these steps is computationally expensive to perform on a large amount of data. On a single machine, a workload could take months just to compute.

However, BXGrid can be used to dramatically accelerate discovery. The database facilitates fast Select operations by virtue of indexing, the active storage cluster acclerates Transform by virtue of parallel storage, and our computing grid provides the All-Pairs capability on hundreds of processors. Once results are generated, they are sent back to the database where they can be shared with other users.

Thursday, December 11, 2008

Abstractions, Grids, and Clouds at IEEE e-Science 2008

I just attended the IEEE conference on e-Science in Indianapolis, and gave this talk on harnessing distributed systems with high level abstractions.

Another highlight of the conference was Rich Wolski's talk on Eucalyptus, an open source toolkit for cloud computing. Like Nimbus, it is API compatible with Amazon's EC2. That is, if your code works with Amazon, you can install Nimbus or Eucalyptus on your on cluster and run your own cloud.

Rich also spoke on the distinction between clouds and grids, which to many are the same idea. However, he pointed out a very important distinction:

- Clouds provide a resource allocation service. You ask for a certain number of machines, you get them for a certain amount of time, and you can choose to use them however you like. This gives you guaranteed service, which is great if you are running a web server, but can lead to underutilization if your goal is to run a large number of simulations.

- Grids provide a task execution service. You submit work to be executed, and the system decides where and when to execute the tasks, perhaps preempting or migrating them along the way. You have no guarantees of service, but the system will get very high utilization, making it good for high throughput computing.

As such, you can combine the two ideas together. For example, Cycle Computing provides a value added web service that employs both Amazon and Condor. You request a certain number of CPUs to execute a certain number of tasks. Cycle allocates the CPUs using Amazon, installs Condor on the nodes, and then runs the jobs. The result is a grid running on a cloud.

Thursday, November 13, 2008

The Wavefront Abstraction

This is the third in a series of posts on the idea of abstractions for distributed computing on clusters, clouds, and grids. An abstraction is a simple interface that allows you to scale up well-structured problems to run on hundreds or thousands of computers at once.

The Wavefront abstraction came up in a discussion with several economists. You want to compute a recurrence relation where each result depends on one or more previous results. The user provides initial conditions along the edges of a matrix, and then you can compute F at position (1,1). Once you do that, then you can compute F at (1,2) and (2,1), and so on. The work progresses like a wave across the matrix, hence the name Wavefront. Here is what it looks like:



We have a first implementation of this abstraction that can run on a Condor pool of multicore machines. You simply run it by stating the function, size of the matrix, and providing some files that state the initial conditions:


wavefront func.exe 100 100



This abstraction is interesting for several reasons. First, it needs a variable number of CPUs over time. Even if you had an infinite number of CPUs, it can only use one in the first step, two in the second, and so on until the wavefront reaches the diagonal of the matrix, after which it decreases again. So, it would be impossible to program this efficiently in a system like MPI where you have to choose a fixed number of CPUs. Instead, you want to allocate more CPUs over time. For example, here is a timeline of a Wavefront run on a 64-CPU cluster. The red line shows the number of CPUs in use, and the green line shows the percent of the problem completed:



Second, the problem has a certain degree of asychrony built in. You do not need to run each diagonal slice of the system in lock step. Instead, each cell can be computed as soon as its neighbors are down. Because of this, different parts of the problem can be delegated to different nodes, allowing them to run and finish at their own pace. If each function is fast, then you can delegate an entire square chunk of the task to a remote processor, and allow it to complete independently.

You can see this in the progress images generated by our implementation of Wavefront. These images shows the state of a workload. Green boxes indicated completed cells, blue indicate cells currently running, yellow are ready to run, and red are not able to run. This is an example of a 10x10 Wavefront running on only five processors:
















You can read more about the Wavefront abstraction at the Cooperative Computing Lab.

Friday, October 31, 2008

An Abstraction for Ensemble Classifiers

In the last post, I presented the idea of abstractions for distributed computing, and explained the All-Pairs abstraction, which represents a very large Cartesian product. Of course, a single abstraction is meant to address a very focused kind of workload. If you have a different category of problem, then you need another abstraction.

We discovered another abstraction while working with Nitesh Chawla's data mining group, also at Notre Dame. A common construction in data mining is the ensemble classifer. A single classifier examines the properties of a large number of objects and divides them up into groups that are roughly similar. You may be familiar with algorithms such as K-Nearest-Neighbors or Decision Trees. Improvements upon these algorithms continue to be an active area of research.

For any given classifier, you can often improve the runtime or the accuracy of the classification by breaking the data into pieces, running the classifier on each piece, and then collecting all of the results, using a majority vote to determine the final classification. For very large datasets, you may even need to use multiple processors and disks to complete the job in a reasonable amount of time.

To address this problem, our students Chris Moretti and Karsten Steinhauser created the Classify abstraction:

Classify( T, R, P, N, F ):
T - The testing set of data to classify.
R - The training set used to train each classifier.
P - The method of partitioning the data.
N - The number of classifiers to employ.
F - The classifier function to use.

Here is a schematic of the Classify abstraction:

This abstraction was also implemented on our local Condor pool and Chirp storage cluster, using one CPU and disk for each classifier function. With this implementation, Chris and Karsten were able to evaluate a number of classifier functions on multi-gigabyte datasets, using up to 128 CPUs simultaneously. In a week, they were able to accomplish what might have taken years to organize and execute by hand. You can read more about the technical details in our paper which will be presented at the International Conference on Data Mining.

The observant reader may notice that the Classify abstraction looks a lot like the Map-Reduce abstraction implemented in the Hadoop. In the next post, I'll discuss this similarity and explain the important difference between the two abstractions.

Wednesday, October 22, 2008

Abstractions for Distributed Computing

My current research revolves around the idea of abstractions for distributed computing. An abstraction is a way of simplifying a workload that runs on thousands of machines, in much the same way that a high level language simplifies the tiresome process of programming in assembly language. Let me explain a little more.

Real assembly language has operations like this:

  • MOV memory to register
  • PUSH register to stack
  • CALL procedure

As you may know, programming in assembly language stinks. The programmer has to keep track of the limited number of registers in use, the current state of the stack, and the meaning of external memory locations. If that's not enough, you have to worry about the behavior of instructions that have wildly varying runtimes, unusual exceptions, and sometimes asynchronous behavior.

We find much the same story in distributed computing, where the operations are something like:

  • TransferFile( source, destination );
  • ExecuteJob( executable, input, output );
  • AllocateVM( cpu, mem, disk, time );

If this is your instruction set, then you have many of the same problems. You have to manage a limited amount of local and remote storage, carefully cleaning up when jobs complete. If that's not enough, you have to worry about the behavior of instructions that have wildly varying runtimes, unusual exceptions, and sometimes asynchronous behavior. Sound familiar?

So, we advocate that users should employ high level abstractions that hide many of these ugly details, allowing the user to focus on the big picture. An abstraction expresses a pattern of computation that is simple, but may be very large, and thus requires significant effort to implement correctly and efficiently.

Chris Moretti is working on one abstraction called AllPairs. This abstraction crops up in a variety of domains, including biology, bioinformatics, and data mining, to name a few. It is easily stated:

AllPairs( set A, set B, function F ) returns matrix M
where M[i,j] = F( A[i],B[j] );

Of course, AllPairs is easy to execute on a small problem: just write a nested loop. But, what if sets A and B have 10,000 elements of 1MB each, and the function F takes ten seconds to run? You would be looking at 100TB of I/O and 1157 days of computation.

Instead, you need to harness a distributed system such as Condor to split up the computation across hundreds of machines. But you can't just submit 100,000,000 jobs: the queueing system would fall over. You can't just blindly have each job access the data over the network: the file server would file over. Without some careful tuning, your task will run even more slowly than the sequential version. The larger the problem gets, the more you have to worry about.

Fortunately, if your program fits in the All-Pairs abstraction, then we have solved the problem for you. If your "sets" are a bunch of files in a directory and your "function" is a Unix program that compares two objects, then you can simply invoke this at the command line:

allpairs A B Func

In the background, the All-Pairs implementation will measure the size of the objects, test the runtime of the function, choose the resources to use, distribute the data to the nodes, deal with failures, and then clean up the system. The user only has to worry about the problem to solve, not the method of achieving it.

In short, by using an abstraction, we can guide our users to appropriate ways of solving problems, and get them results in a reasonable time. You can read more about this in our paper on All-Pairs presented at IPDPS in the spring.

In future posts, I'll elaborate on other abstractions that we are designing and implementing.