MapReduce: building Google's framework from the OSTEP spec
MapReduceA C implementation of the Google MapReduce framework scaled for a single machine. Partitioning, sorted bucket chains, and thread-local iteration.
The 2004 paper MapReduce: Simplified Data Processing on Large Clusters by Dean and Ghemawat is short and oddly humble for the size of the idea. The user writes two functions: map and reduce. The framework then deals with everything else — partitioning, scheduling, shuffling, fault tolerance — and the user gets to pretend they are processing one record at a time on one machine. The whole paper is the story of how much engineering hides behind that pretense.
I had just finished HiveParser and was looking for something that would make me earn the next concurrency idea. OSTEP has a project that maps almost exactly onto the paper, scaled down to fit one box: build the framework, then write wordcount against it. No distributed coordinator, no fault tolerance, no GFS. Just the API, the partitioner, and the threads.
Starting from the API
The first thing the OSTEP spec taught me was that the shape of the API is the entire project. Once it is known what MR_Run looks like, the rest becomes careful pthreads.
void MR_Run(int argc, char *argv[],
Mapper map, int num_mappers,
Reducer reduce, int num_reducers,
Partitioner partition);
void MR_Emit(char *key, char *value);
char *MR_Getter(char *key, int partition_number);
The user writes Map(file_name) and Reduce(key, get_next, partition). Inside Map, they call MR_Emit(key, value) for every key-value pair they see. Inside Reduce, they call MR_Getter(key, partition) to walk the values associated with that key, one at a time, until it returns NULL.
That last bit is the part of the design I like most. The framework hands the reducer a function pointer instead of a list. The reducer doesn’t know how the values are stored, doesn’t know how many there are, doesn’t know which partition owns them — it just calls get_next until it gets NULL. All the storage, all the locking, all the iteration state lives behind the getter. The reducer stays a one-liner.
Wordcount as a Driver
The wordcount driver is forty lines. Half of it is main, and the core logic is beautifully sparse:
void Map(char *file_name) {
FILE *fp = fopen(file_name, "r");
char *line = NULL;
size_t size = 0;
while (getline(&line, &size, fp) != -1) {
char *token, *dummy = line;
while ((token = strsep(&dummy, " \t\n\r")) != NULL) {
if (*token == '\0') continue;
MR_Emit(token, "1");
}
}
free(line);
fclose(fp);
}
void Reduce(char *key, Getter get_next, int partition) {
int count = 0;
while (get_next(key, partition) != NULL) count++;
printf("%s %d\n", key, count);
}
Everything that makes this hard sits on the other side of MR_Emit.
Partitioning is just hashing twice
The framework owns N partitions, one per reducer. Each partition is its own hashtable with its own mutex. When MR_Emit is called, it hashes the key once to pick the partition, locks that partition’s mutex, hashes again to pick the bucket inside it, and inserts.
void MR_Emit(char *key, char *value) {
int partition_num = partitioner(key, num_partitions);
pthread_mutex_lock(&partitions[partition_num].partition_lock);
int bucket_num = partitioner(key, partitions[partition_num].num_buckets);
/* ... insert into the bucket chain ... */
pthread_mutex_unlock(&partitions[partition_num].partition_lock);
}
The choice that mattered here was one mutex per partition instead of a global mutex over the whole hashtable. With ten partitions and ten mappers, the chance that two mappers want to emit into the same partition at the same instant is about one in ten. Even when they collide, they only block each other for as long as one bucket-chain insertion takes. One thing I learned from HiveParser was: the locks you inherit cost more than the locks you write. I had to write small locks, a whole lot of them, and let the contention spread out.
Sorted bucket chains and dispatch
The OSTEP spec adds a complication: it wants reducers to see keys in lexicographic order. There are two natural places to put the sort: at insertion time, or at reduce time. I ended up doing both.
Inside each bucket, the entry chain is kept sorted on insert. MR_Emit walks the chain comparing keys and breaks at the first key that compares greater. This makes MR_Getter slightly faster since it can early-exit on cmp > 0. But the real reason was the reducer side. With sorted chains, if I want all keys in a partition in lex order, I just need to merge-sort the buckets together. Or, as I did, collect entry pointers into a flat array and qsort it.
/* Pass 1: count entries across all buckets in this partition */
int count = 0;
for (int b = 0; b < partition->num_buckets; b++)
for (entry_t *e = partition->buckets[b]; e != NULL; e = e->next)
count++;
/* Pass 2: collect into a flat array */
entry_t **sorted = malloc(sizeof(entry_t *) * count);
int idx = 0;
for (int b = 0; b < partition->num_buckets; b++)
for (entry_t *e = partition->buckets[b]; e != NULL; e = e->next)
sorted[idx++] = e;
qsort(sorted, count, sizeof(entry_t *), MR_EntryKeyCmp);
The data flow, end to end
The flow is strictly sequential. Mappers run to completion, then reducers spin up. In the Google paper, there’s a heavy shuffle stage; here the shuffle is implicit because MR_Emit places data into the correct partition from the start.
What I left out
A lot. Reading back the Google paper after finishing this, the gap between the framework I wrote and the system they describe is mostly the things they had to solve at scale. There’s no master process here, no fault tolerance, no straggler detection, no locality-aware scheduling. There is no on-disk spill when partitions get big.
Each of those is a future project. The OSTEP spec deliberately strips them out so you can see the core idea: partitioning + sorted dispatch + a clean user API without the distributed-systems machinery on top.
Source on GitHub at Basliel25/MapReduce.