Given 2 huge list of values, I am trying to compute jaccard similarity between them in Spark using Scala.
Assume colHashed1
contains the first list of values and colHashed2
contains the second list.
Approach 1(trivial approach):
val jSimilarity = colHashed1.intersection(colHashed2).distinct.count/(colHashed1.union(colHashed2).distinct.count.toDouble)
Approach 2(using minHashing):
I have used the approach explained here.
import java.util.zip.CRC32 def getCRC32 (s : String) : Int = { val crc=new CRC32 crc.update(s.getBytes) return crc.getValue.toInt & 0xffffffff } val maxShingleID = Math.pow(2,32)-1 def pickRandomCoeffs(kIn : Int) : Array[Int] = { var k = kIn val randList = Array.fill(k){0} while(k > 0) { // Get a random shingle ID. var randIndex = (Math.random()*maxShingleID).toInt // Ensure that each random number is unique. while(randList.contains(randIndex)) { randIndex = (Math.random()*maxShingleID).toInt } // Add the random number to the list. k = k - 1 randList(k) = randIndex } return randList } val colHashed1 = list1Values.map(a => getCRC32(a)) val colHashed2 = list2Values.map(a => getCRC32(a)) val nextPrime = 4294967311L val numHashes = 10 val coeffA = pickRandomCoeffs(numHashes) val coeffB = pickRandomCoeffs(numHashes) var signature1 = Array.fill(numHashes){0} for (i <- 0 to numHashes-1) { // Evaluate the hash function. val hashCodeRDD = colHashed1.map(ele => ((coeffA(i) * ele + coeffB(i)) % nextPrime)) // Track the lowest hash code seen. signature1(i) = hashCodeRDD.min.toInt } var signature2 = Array.fill(numHashes){0} for (i <- 0 to numHashes-1) { // Evaluate the hash function. val hashCodeRDD = colHashed2.map(ele => ((coeffA(i) * ele + coeffB(i)) % nextPrime)) // Track the lowest hash code seen. signature2(i) = hashCodeRDD.min.toInt } var count = 0 // Count the number of positions in the minhash signature which are equal. for(k <- 0 to numHashes-1) { if(signature1(k) == signature2(k)) count = count + 1 } val jSimilarity = count/numHashes.toDouble
Approach 1 seems to outperform Approach 2 in terms of time always. When I analyzed the code, min()
function call on the RDD
in Approach 2 takes significant time and that function is called many times depending upon how many hash functions are used.
The intersection and union operations used in Approach 1 seems to work faster compared to the repeated min() function calls.
I don't understand why minHashing does not help here. I expected minHashing to work faster compared to trivial approach. Is there anything I am doing wrong here?
Sample data can be viewed here
2 Answers
Answers 1
JaccardSimilarity with MinHash is not giving consistent results:
import java.util.zip.CRC32 object Jaccard { def getCRC32(s: String): Int = { val crc = new CRC32 crc.update(s.getBytes) return crc.getValue.toInt & 0xffffffff } def pickRandomCoeffs(kIn: Int, maxShingleID: Double): Array[Int] = { var k = kIn val randList = Array.ofDim[Int](k) while (k > 0) { // Get a random shingle ID. var randIndex = (Math.random() * maxShingleID).toInt // Ensure that each random number is unique. while (randList.contains(randIndex)) { randIndex = (Math.random() * maxShingleID).toInt } // Add the random number to the list. k = k - 1 randList(k) = randIndex } return randList } def approach2(list1Values: List[String], list2Values: List[String]) = { val maxShingleID = Math.pow(2, 32) - 1 val colHashed1 = list1Values.map(a => getCRC32(a)) val colHashed2 = list2Values.map(a => getCRC32(a)) val nextPrime = 4294967311L val numHashes = 10 val coeffA = pickRandomCoeffs(numHashes, maxShingleID) val coeffB = pickRandomCoeffs(numHashes, maxShingleID) val signature1 = for (i <- 0 until numHashes) yield { val hashCodeRDD = colHashed1.map(ele => (coeffA(i) * ele + coeffB(i)) % nextPrime) hashCodeRDD.min.toInt // Track the lowest hash code seen. } val signature2 = for (i <- 0 until numHashes) yield { val hashCodeRDD = colHashed2.map(ele => (coeffA(i) * ele + coeffB(i)) % nextPrime) hashCodeRDD.min.toInt // Track the lowest hash code seen } val count = (0 until numHashes) .map(k => if (signature1(k) == signature2(k)) 1 else 0) .fold(0)(_ + _) val jSimilarity = count / numHashes.toDouble jSimilarity } // def approach1(list1Values: List[String], list2Values: List[String]) = { // val colHashed1 = list1Values.toSet // val colHashed2 = list2Values.toSet // // val jSimilarity = colHashed1.intersection(colHashed2).distinct.count / (colHashed1.union(colHashed2).distinct.count.toDouble) // jSimilarity // } def approach1(list1Values: List[String], list2Values: List[String]) = { val colHashed1 = list1Values.toSet val colHashed2 = list2Values.toSet val jSimilarity = (colHashed1 & colHashed2).size / (colHashed1 ++ colHashed2).size.toDouble jSimilarity } def main(args: Array[String]) { val list1Values = List("a", "b", "c") val list2Values = List("a", "b", "d") for (i <- 0 until 5) { println(s"Iteration ${i}") println(s" - Approach 1: ${approach1(list1Values, list2Values)}") println(s" - Approach 2: ${approach2(list1Values, list2Values)}") } } }
OUTPUT:
Iteration 0 - Approach 1: 0.5 - Approach 2: 0.5 Iteration 1 - Approach 1: 0.5 - Approach 2: 0.5 Iteration 2 - Approach 1: 0.5 - Approach 2: 0.8 Iteration 3 - Approach 1: 0.5 - Approach 2: 0.8 Iteration 4 - Approach 1: 0.5 - Approach 2: 0.4
Why are you using it?
Answers 2
It seems to me that the overhead cost for minHashing approach just outweighs its functionality in Spark. Especially as numHashes
increases. Here are some observations I've found in your code:
First, while (randList.contains(randIndex))
this part will surely slow down your process as numHashes (which is by the way equal to the size of randList) increases.
Second, You can save some time if you rewrite this code:
var signature1 = Array.fill(numHashes){0} for (i <- 0 to numHashes-1) { // Evaluate the hash function. val hashCodeRDD = colHashed1.map(ele => ((coeffA(i) * ele + coeffB(i)) % nextPrime)) // Track the lowest hash code seen. signature1(i) = hashCodeRDD.min.toInt } var signature2 = Array.fill(numHashes){0} for (i <- 0 to numHashes-1) { // Evaluate the hash function. val hashCodeRDD = colHashed2.map(ele => ((coeffA(i) * ele + coeffB(i)) % nextPrime)) // Track the lowest hash code seen. signature2(i) = hashCodeRDD.min.toInt } var count = 0 // Count the number of positions in the minhash signature which are equal. for(k <- 0 to numHashes-1) { if(signature1(k) == signature2(k)) count = count + 1 }
into
var count = 0 for (i <- 0 to numHashes - 1) { val hashCodeRDD1 = colHashed1.map(ele => ((coeffA(i) * ele + coeffB(i)) % nextPrime)) val hashCodeRDD2 = colHashed2.map(ele => ((coeffA(i) * ele + coeffB(i)) % nextPrime)) val sig1 = hashCodeRDD1.min.toInt val sig2 = hashCodeRDD2.min.toInt if (sig1 == sig2) { count = count + 1 } }
This method simplifies the three loops into one. However, I am not sure if that would give a huge boost in computational time.
One other suggestion I have, assuming that the first approach still turns out to be much faster is to use the property of sets to modify the first approach:
val colHashed1_dist = colHashed1.distinct val colHashed2_dist = colHashed2.distinct val intersect_cnt = colHashed1_dist.intersection(colHashed2_dist).distinct.count val jSimilarity = intersect_cnt/(colHashed1_dist.count + colHashed2_dist.count - intersect_cnt).toDouble
with that, instead of getting the union, you can just reuse the value of the intersection.
0 comments:
Post a Comment