Tuesday, May 3, 2016

Spark program gives odd results when ran on standalone cluster

Leave a Comment

I have this spark program and I'll try to limit it to just the pertinent parts

# Split by delimiter , # If the file is in unicode, we need to convert each value to a float in order to be able to  # treat it as a number points = sc.textFile(filename).map(lambda line: [float(x) for x in line.split(",")]).persist()  # start with K randomly selected points from the dataset # A centroid cannot be an actual data point or else the distance measure between a point and  # that centroid will be zero. This leads to an undefined membership value into that centroid. centroids = points.takeSample(False, K, 34) #print centroids # Initialize our new centroids newCentroids = [[] for k in range(K)] tempCentroids = [] for centroid in centroids:     tempCentroids.append([centroid[N] + 0.5]) #centroids = sc.broadcast(tempCentroids)  convergence = False  ncm = NCM()  while(not convergence):     memberships = points.map(lambda p : (p, getMemberships([p[N]], centroids.value, m)))     cmax = memberships.map(lambda (p, mus) : (p, getCMax(mus, centroids.value)))     # Memberships     T = cmax.map(lambda (p, c) : (p, getMemberships2([p[N]], centroids.value, m, delta, weight1, weight2, weight3, c)))     I = cmax.map(lambda (p, c) : (p, getIndeterminateMemberships([p[N]], centroids.value, m, delta, weight1, weight2,  weight3, c)[0]))     F = cmax.map(lambda (p, c) : (p, getFalseMemberships([p[N]], centroids.value, m, delta, weight1,  weight2, weight3, c)[0]))     # Components of new centroids     wTm = T.map(lambda (x, t) : ('onekey', scalarPow(m, scalarMult(weight1, t))))     #print "wTm = " + str(wTm.collect())     print "at first reduce"     sumwTm = wTm.reduceByKey(lambda p1, p2 : addPoints(p1, p2))     #print "sumwTm = " + str(sumwTm.collect())     wTmx = T.map(lambda (x, t) : pointMult([x[N]], scalarPow(m, scalarMult(weight1, t))))     print "adding to cnumerator list"     #print wTmx.collect()     cnumerator = wTmx.flatMap(lambda p: getListComponents(p)).reduceByKey(lambda p1, p2 : p1 + p2).values()     print "collected cnumerator, now printing"         #print "cnumerator = " + str(cnumerator.collect())     #print str(sumwTm.collect())     # Calculate the new centroids     sumwTmCollection = sumwTm.collect()[0][1]     cnumeratorCollection = cnumerator.collect()     #print "sumwTmCollection = " + str(sumwTmCollection)     #cnumeratorCollection =cnumerator.collectAsMap().get(0).items     print "cnumeratorCollection = " + str(cnumeratorCollection)     for i in range(len(newCentroids)):         newCentroids[i] = scalarMult(1 / sumwTmCollection[i], [cnumeratorCollection[i]])     centroids = newCentroids     # Test for convergence     convergence = ncm.test([centroids[N]], [newCentroids[N]], epsilon)      #convergence = True      # Replace our old centroids with the newly found centroids and repeat if convergence not met     # Clear out space for a new set of centroids     newCentroids = [[] for k in range(K)] 

This program works pretty well on my local machine, however, it does not behave as expected when run on a standalone cluster. It doesn't necessarily throw an error, but what it does do it give different output than that which I receive when running on my local machine. The cluster and the 3 nodes seem to be working fine. I have a feeling the problem is that I keep updating centroids, which is a python list, and it changes each time through the while-loop. Is it possible that each node may not have the most recent copy of that list? I think so so I tried using a broadcast variable but those can't be updated (read only). I also tried using an accumulator but those are just for accumulations. I also tried to save the python lists as a file on hdfs for each node to have access to, but this didn't work well. Do you think I'm understanding the problem correctly? Is something else likely going on here? How can I get code that works fine on my local machine, but not on a cluster?

1 Answers

Answers 1

Thank you for all of the time and attention to this problem, especially since it sounds like I could have posted more information to make your jobs easier. The problem here is in

centroids = points.takeSample(False, K, 34) 

I didn't realize this, but after a short experiment, this function returns the same output each and every time, despite being what I thought was a random sample. As long as you use the same seed (34 in this case), you will get the same RDD in return. The RDD on my cluster was different for some reason than the one returned to my local machine. In any case, since it was the same RDD each time, my output never changed. The problem with the "random" centroids returned to me is that these particular ones gave rise to something like a saddle point in mathematics, where no convergence of the centroids would be found. This part of the answer is mathematical and a programming one, so I won't mention it further. My real hope at this point is that others are helped by the notion that if you want

centroids = points.takeSample(False, K, 34) 

to produce different samples each time it is called, that you change your seed each time to some random number.

I hope this all helps. I've never before spent so much time on a solution to my memory.

Thanks again.

If You Enjoyed This, Take 5 Seconds To Share It

0 comments:

Post a Comment