The use-once join (or matching in bipartite graphs with Spark)

A bipartite graph is a graph whose vertices can be divided into two sets such that no two vertices in the same set are connected by an edge in the graph. A matching is subset of the edges in the graph such that no two edges in the subset share an endpoint.

Bipartite graphs naturally occur implicitly when working with dataframes. Suppose we are given two dataframes and we want to compute a join between them for some join condition. Then the rows of the two dataframes are the disjoint sets of vertices and there is an edge between two vertices if the join condition between the two corresponding rows is satisfied.

Two dataframes and a join condition induce a bipartite graph and the join type decides how to use the graph to produce the result of the join from the graph. For example, the result of an inner join has exactly as many rows as there are edges in the implicit graph. Similarly, the result of a left join has one row for each edge in the graph and each vertex in the left set with degree 0.

So what type of join is equivalent to a bipartite matching? It corresponds to a join where each row can be used only once in the join. In this post we show how to compute a join corresponding to a matching in a bipartite graph in Spark and describe a use case for this.

Matching in bipartite graphs

We need to ensure that each row of each dataframe occurs exactly once in the join. To achieve this we group the rows by the join conditions, i.e., each row in a group has the same values for the columns affected by the join condition. We can do this without actually reducing the dataframes using a window function.

The following example demonstrates how this might look in Spark. Suppose that we are given two dataframes dfA and dfB, both with a column val, and we want to join them on dfA("val") === dfB("val"). Then we may compute a matching as follows.

val window = Window.partitionBy($"val")
val matching = dfA.withColumn("_rn", row_number.over(window))
    .join(
        dfB.withColumn("_rn", row_number.over(window)),
        $"val" === $"val" && $"_rn" === $"_rn",
        "inner"
    )
    .drop($"_rn")

Notice that there may be several matchings and the above method finds some matching. The above is in fact a greedy matching algorithm: If there are more than one choice of edges for a vertex, then we pick the first edge (the one connecting the vertex with a vertex with the same _rn). Interestingly, the greedy matching algorithm also finds a maximal matching. Consequently, for simple join conditions like dfA("val") === dfB("val"), this method produces a result using as many rows from each dataframe as possible. For more elaborate join conditions, this does not necessarily hold true.

Matching entries in log files

We will now describe a use case for the matching. Suppose we are given an event log with the columns: user_idtimestamp, and event_type. There are two event types: A and B. We would like to find all the events of type A immediately followed an event of type B generated by the same user.

To do this, we apply the matching method to a self join of the event log.

val window = Window.partitionBy($"user_id").orderBy(asc("timestamp"))
val groupedLogs = logs.withColumn("_rn", row_number.over(window))
val matching = groupedLogs.alias("left")
    .join(
        groupedLogs.alias("right"),
          $"user_id" === $"user_id" && 
          $"left.event_type" === "A" &&
          $"right.event_type" === "B" &&
          $"_rn" === $"_rn" - 1,
        "inner"
    )
    .drop($"_rn")

First we order the events generated by each user by the time of occurrence and assign each group row numbers. In the join we specify that we want to match all events of type A (line 7) with an event of type B (line 8) generated by the same user (line 6) if the events occur with no other events between them (line 9).


This post first appeared May 19, 2019 on Medium.