Dealing large sequence data
External merge sort
If the size of the file is too big to be held in the memory during sorting. This algorithm minimizes the number of disk accesses and improves the sorting performance.
External sorting is required when the data being sorted do not fit into the main memory of a computing device (usually RAM) and instead they must reside in the slower external memory (usually a hard drive). External sorting typically uses a sort-merge strategy. In the sorting phase, chunks of data small enough to fit in main memory are read, sorted, and written out to a temporary file. In the merge phase, the sorted sub files are combined into a single larger file.
In raw manner:
Let’s say your data set has one million elements and you can only fit 100,000 in memory at a time. Here’s what I’d do:
- read in the first 100,000, sort them and write that sorted list back out.
- do this for each group of 100,000.
- run a merge operation on the 10 groups.
In other words, once your 10 groups are sorted within the group, grab the first entry from each group.
Then write that the lowest of those 10 (which is the lowest of the whole million) to the output file and read the next one from that group in its place.
Then just continue selecting the lowest of the 10, writing it out and replacing it from the same group. In that way, the final output is the entire sorted list of a million entries.
Steps:
- Split the original file into two equal-sized run files.
- Read one block from each run file into input buffers.
- Take the first record from each input buffer, and write a run of length two to an output buffer in sorted order.
- Take the next record from each input buffer, and write a run of length two to a second output buffer in sorted order.
- Repeat until finished, alternating output between the two output run buffers. Whenever the end of an input block is reached, read the next block from the appropriate input file. When an output buffer is full, write it to the appropriate output file.
- Repeat steps 2 through 5, using the original output files as input files. On the second pass, the first two records of each input run file are already in sorted order. Thus, these two runs may be merged and output as a single run of four elements.
- Each pass through the run files provides larger and larger runs until only one run remains.
This algorithm can easily take advantage of double buffering . Note that the various passes read the input run files sequentially and write the output run files sequentially. For sequential processing and double buffering to be effective, however, it is necessary that there be a separate I/O head available for each file. This typically means that each of the input and output files must be on separate disk drives, requiring a total of four disk drives for maximum efficiency.
Improving Performance
One example of external sorting is the external merge sort algorithm, which sorts chunks that each fit in RAM, then merges the sorted chunks together. For example, for sorting 900 megabytes of data using only 100 megabytes of RAM:
1) Read 100 MB of the data in main memory and sort by some conventional method, like quicksort.
2) Write the sorted data to disk.
3) Repeat steps 1 and 2 until all of the data is in sorted 100 MB chunks (there are 900MB / 100MB = 9 chunks), which now need to be merged into one single output file.
4) Read the first 10 MB (= 100MB / (9 chunks + 1)) of each sorted chunk into input buffers in main memory and allocate the remaining 10 MB for an output buffer. (In practice, it might provide better performance to make the output buffer larger and the input buffers slightly smaller.)
5) Perform a 9-way merge and store the result in the output buffer. If the output buffer is full, write it to the final sorted file, and empty it. If any of the 9 input buffers gets empty, fill it with the next 10 MB of its associated 100 MB sorted chunk until no more data from the chunk is available.
Optimizing a little with additional passes –
1) For sorting, say, 50 GB in 100 MB of RAM, using a single merge pass isn’t efficient: the disk seeks required to fill the input buffers with data from each of the 500 chunks (we read 100MB / 501 ~ 200KB from each chunk at one time) take up most of the sort time. Using two merge passes solves the problem. Then the sorting process might look like this:
2) Run the initial chunk-sorting pass as before.
3) Run a first merge pass combining 25 chunks at a time, resulting in 20 larger sorted chunks.
4) Run a second merge pass to merge the 20 larger sorted chunks.
Implementation –
Sort a 4Gb file in 10Mb memory (or a system in which many processes run and total memory of that system is x Mb)
Java code:
package
main.java.algo.sorting;
import
java.util.*;
import
java.io.*;
// Goal: offer a generic external-memory sorting program in Java.
//
// It must be :
// - hackable (easy to adapt)
// - scalable to large files
// - sensibly efficient.
// This software is in the public domain.
public
class
ExternalSort {
// we divide the file into small blocks. If the blocks
// are too small, we shall create too many temporary files.
// If they are too big, we shall be using too much memory.
public
static
long
estimateBestSizeOfBlocks(File filetobesorted) {
long
sizeoffile = filetobesorted.length();
// we don't want to open up much more than 1024 temporary files, better run
// out of memory first. (Even 1024 is stretching it.)
final
int
MAXTEMPFILES =
1024
;
long
blocksize = sizeoffile / MAXTEMPFILES ;
// on the other hand, we don't want to create many temporary files
// for naught. If blocksize is smaller than half the free memory, grow it.
long
freemem = Runtime.getRuntime().freeMemory();
if
( blocksize < freemem/
2
)
blocksize = freemem/
2
;
else
{
if
(blocksize >= freemem)
System.err.println(
"We expect to run out of memory. "
);
}
return
blocksize;
}
// This will simply load the file by blocks of x rows, then
// sort them in-memory, and write the result to a bunch of
// temporary files that have to be merged later.
//
// @param file some flat file
// @return a list of temporary flat files
public
static
List<File> sortInBatch(File file, Comparator<String> cmp)
throws
IOException {
List<File> files =
new
ArrayList<File>();
BufferedReader fbr =
new
BufferedReader(
new
FileReader(file));
long
blocksize = estimateBestSizeOfBlocks(file);
// in bytes
try
{
List<String> tmplist =
new
ArrayList<String>();
String line =
""
;
try
{
while
(line !=
null
) {
long
currentblocksize =
0
;
// in bytes
while
((currentblocksize < blocksize)
&&( (line = fbr.readLine()) !=
null
) ){
// as long as you have 2MB
tmplist.add(line);
currentblocksize += line.length()
// 2 + 40; // java uses 16 bits per character + 40 bytes of overhead (estimated)
}
files.add(sortAndSave(tmplist,cmp));
tmplist.clear();
}
}
catch
(EOFException oef) {
if
(tmplist.size()>
0
) {
files.add(sortAndSave(tmplist,cmp));
tmplist.clear();
}
}
}
finally
{
fbr.close();
}
return
files;
}
public
static
File sortAndSave(List<String> tmplist, Comparator<String> cmp)
throws
IOException {
Collections.sort(tmplist,cmp);
//
File newtmpfile = File.createTempFile(
"sortInBatch"
,
"flatfile"
);
newtmpfile.deleteOnExit();
BufferedWriter fbw =
new
BufferedWriter(
new
FileWriter(newtmpfile));
try
{
for
(String r : tmplist) {
fbw.write(r);
fbw.newLine();
}
}
finally
{
fbw.close();
}
return
newtmpfile;
}
// This merges a bunch of temporary flat files
// @param files
// @param output file
// @return The number of lines sorted. (P. Beaudoin)
public
static
int
mergeSortedFiles(List<File> files, File outputfile,
final
Comparator<String> cmp)
throws
IOException {
PriorityQueue<BinaryFileBuffer> pq =
new
PriorityQueue<BinaryFileBuffer>(
11
,
new
Comparator<BinaryFileBuffer>() {
public
int
compare(BinaryFileBuffer i, BinaryFileBuffer j) {
return
cmp.compare(i.peek(), j.peek());
}
}
);
for
(File f : files) {
BinaryFileBuffer bfb =
new
BinaryFileBuffer(f);
pq.add(bfb);
}
BufferedWriter fbw =
new
BufferedWriter(
new
FileWriter(outputfile));
int
rowcounter =
0
;
try
{
while
(pq.size()>
0
) {
BinaryFileBuffer bfb = pq.poll();
String r = bfb.pop();
fbw.write(r);
fbw.newLine();
++rowcounter;
if
(bfb.empty()) {
bfb.fbr.close();
bfb.originalfile.delete();
// we don't need you anymore
}
else
{
pq.add(bfb);
// add it back
}
}
}
finally
{
fbw.close();
for
(BinaryFileBuffer bfb : pq ) bfb.close();
}
return
rowcounter;
}
public
static
void
main(String[] args)
throws
IOException {
if
(args.length<
2
) {
System.out.println(
"please provide input and output file names"
);
return
;
}
String inputfile = args[
0
];
String outputfile = args[
1
];
Comparator<String> comparator =
new
Comparator<String>() {
public
int
compare(String r1, String r2){
return
r1.compareTo(r2);}};
List<File> l = sortInBatch(
new
File(inputfile), comparator) ;
mergeSortedFiles(l,
new
File(outputfile), comparator);
}
}
class
BinaryFileBuffer {
public
static
int
BUFFERSIZE =
2048
;
public
BufferedReader fbr;
public
File originalfile;
private
String cache;
private
boolean
empty;
public
BinaryFileBuffer(File f)
throws
IOException {
originalfile = f;
fbr =
new
BufferedReader(
new
FileReader(f), BUFFERSIZE);
reload();
}
public
boolean
empty() {
return
empty;
}
private
void
reload()
throws
IOException {
try
{
if
((
this
.cache = fbr.readLine()) ==
null
){
empty =
true
;
cache =
null
;
}
else
{
empty =
false
;
}
}
catch
(EOFException oef) {
empty =
true
;
cache =
null
;
}
}
public
void
close()
throws
IOException {
fbr.close();
}
public
String peek() {
if
(empty())
return
null
;
return
cache.toString();
}
public
String pop()
throws
IOException {
String answer = peek();
reload();
return
answer;
}
}
Given an infinite stream of integers with only k distinct integers, find those distinct integers. How would you modify your solution if k is also very large and cannot use hash table
Keep as many elements in the hash_map as you can. Once the map capacity is exhausted, sort all the elements in the hash_map and write them to disk and build a bloom filter. Keep this bloom filter in memory, empty the hash_map and now for every new number in the stream, check in the bloom filter, if it says does not exist, that means this is a new number, insert in the map. If it says the number does exist, then this can be a false positive, then first check in the map, if not found in it, it could be on disk.
Use a binary search on the file (this will take O(logn) disk seeks) and verify if its present in the file or not. If present, ignore, else add in the map. Keep repeating the process of flushing the map to disk whenever the map is full. To flush, you could either use any external sorting method, or just write out a new file each time. If separate files, you will need to binary search in all of them in case of false positive.
How would one sort 1 billion integers. .. I gave external sorting answer but he waned more .. What should i say ? Asked me to tell him how would the problem change if total of 4 threads are given. How would you solve ?
I would suggest splitting the data into four pieces and having each
thread do an external sort on one piece. Essentially that means
breaking up the data into chunks which can be stored in memory,
sorting them, and then merging the chunks back into one piece. After the 4 threads are done sorting their piece, use two threads to merge the four pieces into two pieces, and then use one thread to merge those into the one final output file.