1   
  2   
  3   
  4   
  5   
  6   
  7   
  8   
  9   
 10   
 11   
 12   
 13   
 14   
 15   
 16   
 17   
 18  import numpy 
 19   
 20  from numpy import array, dot, shape 
 21  from pyspark import SparkContext 
 22  from pyspark.mllib._common import \ 
 23      _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \ 
 24      _serialize_double_matrix, _deserialize_double_matrix, \ 
 25      _serialize_double_vector, _deserialize_double_vector, \ 
 26      _get_initial_weights, _serialize_rating, _regression_train_wrapper, \ 
 27      LinearModel, _linear_predictor_typecheck 
 28  from math import exp, log 
 31      """A linear binary classification model derived from logistic regression. 
 32   
 33      >>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0]).reshape(4,2) 
 34      >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(data)) 
 35      >>> lrm.predict(array([1.0])) > 0 
 36      True 
 37      """ 
 39          _linear_predictor_typecheck(x, self._coeff) 
 40          margin = dot(x, self._coeff) + self._intercept 
 41          prob = 1/(1 + exp(-margin)) 
 42          return 1 if prob > 0.5 else 0 
   43   
 45      @classmethod 
 46 -    def train(cls, data, iterations=100, step=1.0, 
 47                miniBatchFraction=1.0, initialWeights=None): 
  48          """Train a logistic regression model on the given data.""" 
 49          sc = data.context 
 50          return _regression_train_wrapper(sc, lambda d, i: 
 51                  sc._jvm.PythonMLLibAPI().trainLogisticRegressionModelWithSGD(d._jrdd, 
 52                          iterations, step, miniBatchFraction, i), 
 53                  LogisticRegressionModel, data, initialWeights) 
   54   
 56      """A support vector machine. 
 57   
 58      >>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0]).reshape(4,2) 
 59      >>> svm = SVMWithSGD.train(sc.parallelize(data)) 
 60      >>> svm.predict(array([1.0])) > 0 
 61      True 
 62      """ 
 64          _linear_predictor_typecheck(x, self._coeff) 
 65          margin = dot(x, self._coeff) + self._intercept 
 66          return 1 if margin >= 0 else 0 
   67   
 69      @classmethod 
 70 -    def train(cls, data, iterations=100, step=1.0, regParam=1.0, 
 71                miniBatchFraction=1.0, initialWeights=None): 
  72          """Train a support vector machine on the given data.""" 
 73          sc = data.context 
 74          return _regression_train_wrapper(sc, lambda d, i: 
 75                  sc._jvm.PythonMLLibAPI().trainSVMModelWithSGD(d._jrdd, 
 76                          iterations, step, regParam, miniBatchFraction, i), 
 77                  SVMModel, data, initialWeights) 
   78   
 80      """ 
 81      Model for Naive Bayes classifiers. 
 82   
 83      Contains two parameters: 
 84      - pi: vector of logs of class priors (dimension C) 
 85      - theta: matrix of logs of class conditional probabilities (CxD) 
 86   
 87      >>> data = array([0.0, 0.0, 1.0, 0.0, 0.0, 2.0, 1.0, 1.0, 0.0]).reshape(3,3) 
 88      >>> model = NaiveBayes.train(sc.parallelize(data)) 
 89      >>> model.predict(array([0.0, 1.0])) 
 90      0 
 91      >>> model.predict(array([1.0, 0.0])) 
 92      1 
 93      """ 
 94   
 96          self.pi = pi 
 97          self.theta = theta 
  98   
100          """Return the most likely class for a data vector x""" 
101          return numpy.argmax(self.pi + dot(x, self.theta)) 
  102   
104      @classmethod 
105 -    def train(cls, data, lambda_=1.0): 
 106          """ 
107          Train a Naive Bayes model given an RDD of (label, features) vectors. 
108   
109          This is the Multinomial NB (U{http://tinyurl.com/lsdw6p}) which can 
110          handle all kinds of discrete data.  For example, by converting 
111          documents into TF-IDF vectors, it can be used for document 
112          classification.  By making every vector a 0-1 vector, it can also be 
113          used as Bernoulli NB (U{http://tinyurl.com/p7c96j6}). 
114   
115          @param data: RDD of NumPy vectors, one per element, where the first 
116                 coordinate is the label and the rest is the feature vector 
117                 (e.g. a count vector). 
118          @param lambda_: The smoothing parameter 
119          """ 
120          sc = data.context 
121          dataBytes = _get_unmangled_double_vector_rdd(data) 
122          ans = sc._jvm.PythonMLLibAPI().trainNaiveBayes(dataBytes._jrdd, lambda_) 
123          return NaiveBayesModel( 
124              _deserialize_double_vector(ans[0]), 
125              _deserialize_double_matrix(ans[1])) 
  126   
129      import doctest 
130      globs = globals().copy() 
131      globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) 
132      (failure_count, test_count) = doctest.testmod(globs=globs, 
133              optionflags=doctest.ELLIPSIS) 
134      globs['sc'].stop() 
135      if failure_count: 
136          exit(-1) 
 137   
138  if __name__ == "__main__": 
139      _test() 
140