Jumbo's JobBuilder: part 2

This is fifth article about Jumbo.

Previously, we looked at how Jumbo required you to create your own job configurations, and how I tried to alleviate that with helpers for common job structures.

I wanted a way to make this easier, and while I didn't want to use a separate language like Hadoop's Pig did, I was inspired by its simple, imperative nature: you just specify a sequence of operations, and it turns that into a sequence of MapReduce jobs.

The JobBuilder would end up doing the same thing for Jumbo, but instead you used a sequence of method calls in ordinary C# code. These would be 'compiled' into a job configuration.

This had two key goals:

  1. Let the JobBuilder make certain decisions about job structure, such as when to use a local aggregation step, or how many tasks to use in a channel-input stage.
  2. Allow the user to create tasks using simple methods, instead of having to write a whole class.

The first was accomplished by having the JobBuilder define helper functions for certain operations, such as AccumulateRecords, that knew how to create the kind of job structure appropriate for that operation. That may not seem much better than the old AccumulatorTask approach, but it had one key advantage: you could now incorporate this operation in a larger job structure, rather than that structure having to be your entire job.

It also meant the JobBuilder could apply certain heuristics, such as: if there is only one input task, you don't need a second aggregation stage; everything can be done locally. It's this kind of logic that lets WordCount in the quick start guide run in a single task if the input is small.

The second goal had some complications. I didn't really want to give Jumbo Jet's TaskExecutionUtility (the thing responsible for most of the logic of how to run tasks) the ability to invoke arbitrary functions. It would break too much of the logic of how tasks were constructed, and would require a lot of bespoke code for different types of task methods. No, I wanted to stick to ITask<TInput, TOutput> at that level, which meant JobBuilder had to translate between the two.

That meant dynamically generating MSIL to create task classes that invoked the methods the user specified. These classes would be generated at job submission time, saved to a generated assembly, which the TaskExecutionUtility could just load like normal task type without being aware of any of this. It was a fun challenge to figure out how to do this, and it made it much easier to create custom tasks.

The only limitation is that, unfortunately, if the method you used for your task was not public static, I couldn't emit a direct call in the generated MSIL. In that case, I had to serialize the delegate you pass in, and during task execution, deserialize it and call the method through that, which is much slower. Unfortunately, this meant that using lambdas (which never generate a public method) was possible, but not ideal.

The first iteration of the JobBuilder meant you could now write code like this to create a job:

var builder = new JobBuilder();
var input = builder.CreateRecordReader<Utf8StringWritable>(_inputPath, typeof(LineRecordReader));
var collector = new RecordCollector<KeyValuePairWritable<Utf8StringWritable, Int32Writable>>(null, null, _combinerTasks == 0 ? null : (int?)_combinerTasks);
builder.ProcessRecords(input, collector.CreateRecordWriter(), WordCount);

var output = builder.CreateRecordWriter<KeyValuePairWritable<Utf8StringWritable, Int32Writable>>(_outputPath,
    typeof(TextRecordWriter<KeyValuePairWritable<Utf8StringWritable, Int32Writable>>), BlockSize, ReplicationFactor);

builder.AccumulateRecords(collector.CreateRecordReader(), output, WordCountAccumulator);

...

[AllowRecordReuse]
public static void WordCount(RecordReader<Utf8StringWritable> input, RecordWriter<KeyValuePairWritable<Utf8StringWritable, Int32Writable>> output) { ... }

[AllowRecordReuse]
public static void WordCountAccumulator(Utf8StringWritable key, Int32Writable value, Int32Writable newValue) { ... }

That was better: no more task classes, no more explicitly creating child stages, and the job was defined by basically two function calls: ProcessRecords, and AccumulateRecords. The JobBuilder also kept track of all the assemblies used by each record reader, writer, partitioner, and task you used, and made sure they, and all their non-framework dependencies, would be uploaded to the DFS.

There were two things I didn't like about this, though: all the explicit generic type parameters, and the fact that you have to define your input, output, and channels (through the RecordCollector class) explicitly before calling processing functions. It kind of meant you had to think about your stages backwards, because you had to define their output before you invoked their processing functions.

I wasn't quite satisfied with this. It still wasn't as easy as I wanted it to be. I tried to improve things by just adding even more specific helper methods, so you could write WordCount like this:

var input = new DfsInput(_inputPath, typeof(WordRecordReader));
var output = CreateDfsOutput(_outputPath, typeof(TextRecordWriter<Pair<Utf8String, int>>));

StageBuilder[] stages = builder.Count<Utf8String>(input, output);

((Channel)stages[0].Output).PartitionCount = _combinerTasks;
stages[0].StageId = "WordCountStage";
stages[1].StageId = "WordCountSumStage";

Which was kind of cheating (since I hadn't solved the problem, I just hid it), and the way you customized things like the stage IDs and task count wasn't exactly great.

Eventually, I had an idea that completely changed how the job builder worked. Instead of having to define channels and outputs up front, each operation could directly serve as the input of the next operation, and if that created a channel, the resulting object could be used to customize that channel as well as the stage itself.

This new JobBuilder still had helpers for specific types of operations that could use custom logic (such as adding the local aggregation step), as well as helpers for custom channel types (sorting), and for complex operations like joins. And of course, you could still use custom task classes or manually define child stages if none of the helpers sufficed, combining them seamlessly with the helpers.

Finally, you could specify every step in a straight-forward way:

var job = new JobBuilder();
var input = job.Read(InputPath, typeof(LineRecordReader));
var words = job.Map<Utf8String, Pair<Utf8String, int>>(input, MapWords);
var aggregated = job.GroupAggregate<Utf8String, int>(words, AggregateCounts);
job.Write(aggregated, OutputPath, typeof(TextRecordWriter<>));

This was the kind of simplicity I wanted. There were no more specific job helpers, and every step was explicit while still being easy to use. Even emulating MapReduce was now simple:

var job = new JobBuilder();
var input = job.Read(InputPath, typeof(LineRecordReader));
var mapped = job.Map<Utf8String, Pair<Utf8String, int>>(input, MapWords);
var sorted = job.SpillSortCombine<Utf8String, int>(mapped, ReduceWordCount);
var reduced = job.Reduce<Utf8String, int, Pair<Utf8String, int>>(sorted, ReduceWordCount);
job.Write(reduced, OutputPath, typeof(TextRecordWriter<>));

Here, SpillSortCombine configures the channel in between the map and reduce, without the user needing to know that's what it does. And despite it being an operation that has no stages of its own, you can just apply it directly to DFS input and output; the JobBuilder makes sure a valid job structure is created.

The only limitation I could never get away from is the need for explicit generic type arguments. I was able to make things a bit easier; everywhere you specify a type directly, whether record readers, writers, partitioners, or even tasks (if not using methods), you can specify the "open" generic type (like typeof(TextRecordWriter<>)) above, and the JobBuilder will instantiate it with the correct type for the records.

However, for the actual processing methods, where they use delegates, it was always necessary to specify both the input and output types explicitly, because C# isn't able to determine the type arguments from the arguments of a delegate target (only from the return type). LINQ doesn't suffer from this problem because the type of the input items is known, because you pass in an IEnumerable<T> or IQueryable<T>. Whereas the types of input, words, etc. above (all implementing IStageInput) are not generic, so can't be used to deduce the type.

The only improvement I made was shortening a bunch of type names. Utf8StringWritable became Utf8String, KeyValuePairWritable became Pair, and Int32Writable (and a bunch of similar wrappers) went away in favor of being able to directly use types like int.

So, why not make IStageInput generic? Now the delegates work, but you have to specify explicit types in other places, such as Read above, and when calling Process with a custom task class. There is no way to derive the record type for job.Read(InputPath, typeof(LineRecordReader)). I thought of using a constraint like this:

public IStageInput<TRecord> Read<TRecordReader, TRecord>(string inputPath)
    where TRecordReader : RecordReader<TRecord>

But, you still have to explicitly specify both generic arguments in that case.

Maybe I could do better now, if I gave this some more thought. I can't immediately think of anything that wouldn't have equal drawbacks, though. And, since I'm not developing Jumbo anymore, this is how it will stay.

Still, despite that little annoyance, I accomplished most of what I wanted with the final version of the JobBuilder. It feels like a pretty natural way of defining jobs, and even if you want to customize settings for stages and channels (for example, see advanced WordCount), it works pretty well

It's probably my favorite aspect of Jumbo, something that's really different from anything in Hadoop (at least at the time), and I'm pretty proud of it. And, before I released Jumbo, nobody really knew about it. Since it had no real value to my research, I never published about it. It was only mentioned on one page in my dissertation, but that was it.

That's why I wanted to release Jumbo. It's why I wanted to write these articles about it. Even if nobody cares, the work I did on this is at least preserved now. And that's worth something, even if only to me.

Categories: Software, Programming
Posted on: 2023-02-07 18:54 UTC.

Comments

No comments here...

Add comment

Comments are closed for this post. Sorry.

Latest posts

Categories

Archive

Syndication

RSS Subscribe