Map-Side Joins in MapReduce

In a previous post, I wrote about reduce-side joins in MapReduce. Here is an example of one in action:

            class ReduceSideJoin(MRJob):


    def mapper(self, _, line):
        fields = line.split(',')
        if len(fields) == 3:  # User database
            user_id = fields[0].strip()
            data = ('user', fields[1], fields[2])
        else:  # User activity log
            user_id = fields[0].split()[1].strip()
            data = ('activity', ' '.join(fields[1:]))
        yield user_id, data


    def reducer(self, key, values):
        user_data = None
        activities = []
       
        for value in values:
            if value[0] == 'user':
                user_data = value
            else:
                activities.append(value)
       
        if user_data:
            for activity in activities:
                yield key, (user_data[1], user_data[2], activity[1])
   
    def steps(self):
        return [
            MRStep(mapper=self.mapper,
                   reducer=self.reducer)
        ]
        

A major positive to this approach of joins in MapReduce is that they make no assumptions about the format of input data. It is the role of the mapper to take whatever format the data comes in and prepare it for the reducer, which does the actual joining. However, the cost comes in speed: there is a large amount of shuffling around of data to prepare data for a reducer (sorting, copying to reducers and merging reducer inputs), making reduce-side joins a slow process.

If we know a bit about the input data and choose an appropriate joining method, we can significantly speed up MapReduce jobs involving joins.

For example, building on the example from the reduce-side joins post (https://neil-gibbons.uk/article4.html), if we know that the full user database can fit in memory as a hash table, we can “broadcast” a copy of it from the distributed filesystem to each mapper in the MapReduce cluster. The mapper can then scan over each user activity event and look up the user id in the hash table. This way, we can entirely avoid the need for a reducer job.

Here is an example, known as a broadcast join:

            class MapSideJoin(MRJob):


    def load_user_db(self):
        self.user_db = {}
        with open(self.options.user_db, 'r') as f:
            reader = csv.reader(f)
            for row in reader:
                user_id, email, dob = row
                self.user_db[user_id.strip()] = (email, dob)


    def mapper_init(self):
        self.load_user_db()


    def mapper(self, _, line):
        fields = line.split(',')
        if len(fields) != 3:  # User activity log
            user_id = fields[0].split()[1].strip()
            activity = ' '.join(fields[1:])
            if user_id in self.user_db:
                email, dob = self.user_db[user_id]
                yield user_id, (email, dob, activity)
        

An issue with the broadcast join is that we needed to make a copy of the complete user database in each mapper, so there was a strict limit that the user data should be small enough to fit in memory. However, if the inputs to the map-side join are partitioned in the same way, we only need to fit a complete partition into memory. For example, in the example of merging user profile and user activity data, if we make 10 partitions based on the last character of the user ID, we only need to fit 10% of the dataset into memory to benefit from the speed of a hash table look-up.

Here is what the updated `mapper` function could look like, known as a partition join:

            def mapper(self, _, line):
        if self.partition_key is None:
            # Determine the partition key from the first line
            fields = line.split(',')
            if len(fields) == 3:  # User database line
                user_id = fields[0].strip()
            else:  # User activity log line
                user_id = fields[0].split()[1].strip()
            self.partition_key = user_id[-1]
            self.load_user_db(self.partition_key)
       
        fields = line.split(',')
        if len(fields) == 3:  # User database line
            return  # Skip further processing for user database lines in the mapper
        else:  # User activity log line
            user_id = fields[0].split()[1].strip()
            activity = ' '.join(fields[1:])
            if user_id in self.user_db:
                email, dob = self.user_db[user_id]
                yield user_id, (email, dob, activity)
        

A final assumption can be made to handle even more data without the need for expensive shuffling: if the input data is not only partitioned in the same way, but sorted based on the same key too. In such a case, a mapper can do the full task normally done by a reducer: reading both the activity and user data in ascending key order, then matching records with the same key. This way, you maintain the speed benefits of not needing to shuffle data, while also not being constrained at all by how much data you can fit in memory. This is known as a map-side merge join.