荆州做网站的公司,中小微企业建设网站,邢台网红打卡地,短视频seo询盘获客源码2019独角兽企业重金招聘Python工程师标准 在提取文本特征时#xff0c;经常用到TF-IDF算法。Spark Mlib实现了该算法。下面是Spark Mlib中#xff0c;TF_IDF算法调用的一个实例#xff1a; def main(args:Array[String]){val sc: SparkContext null … 2019独角兽企业重金招聘Python工程师标准 在提取文本特征时经常用到TF-IDF算法。Spark Mlib实现了该算法。下面是Spark Mlib中TF_IDF算法调用的一个实例 def main(args:Array[String]){val sc: SparkContext null // Load documents (one per line).val documents: RDD[Seq[String]] sc.textFile(...).map(_.split( ).toSeq)val hashingTF new HashingTF()//计算tf val tf: RDD[Vector] hashingTF.transform(documents)tf.cache()//得到idfModel对象 val idf new IDF().fit(tf)//得到tf-idf值val tfidf: RDD[Vector] idf.transform(tf) 要求输入数据 必须是一行一篇文章切过词的Spark Mlib中没有提供切词的工具但给出了建议使用的切词工具 Stanford NLP Group and scalanlp/chalk 1、TF源码详读 在调用的代码中我们找到 val hashingTF new HashingTF()
//计算tf
val tf: RDD[Vector] hashingTF.transform(documents) 获取TF主要是通过HashingTF类的 transform方法跟踪该方法 /*** Transforms the input document to term frequency vectors.*/Since(1.1.0)def transform[D : Iterable[_]](dataset: RDD[D]): RDD[Vector] {dataset.map(this.transform)} SparkMlib是基于RDD的所以在看源码前必须要对RDD熟悉。再看 dataset.map(this.transform)中的transform方法 /*** Transforms the input document into a sparse term frequency vector.*/Since(1.1.0)def transform(document: Iterable[_]): Vector {//定义词频的mapval termFrequencies mutable.HashMap.empty[Int, Double]//循环每篇文章里的每个词document.foreach { term //获取词项term对应的向量位置val i indexOf(term)//i即代表这个词统计次数放入termFrequenciestermFrequencies.put(i, termFrequencies.getOrElse(i, 0.0) 1.0)}//将词特征映射到一个很大维度的向量中去 稀疏向量 numFeatures是类HashingTF的成员变量 可以在调用HashingTF传入如果没有传入默认为2的20次方Vectors.sparse(numFeatures, termFrequencies.toSeq)} transform方法对每一行即每篇文章都会执行一次主要是计算每篇文章里的词的词频转存入一个维度很大的稀疏向量中每个词在该向量中对应的位置就是 Since(1.1.0)def indexOf(term: Any): Int Utils.nonNegativeMod(term.##, numFeatures) term.##相当于hashcode()得到每个词的hash值然后对numFeatures 取模是个Int型的值 到此为止TF就计算完了最终的结果是一个存放词的位置以及该词对应词频的 向量即SparseVector(size, indices, values) 2、IDF源码详读 //得到idfModel对象 输入的tf类型是SparseVector(size, indices, values)val idf new IDF().fit(tf)//得到tf-idf值val tfidf: RDD[Vector] idf.transform(tf) IDF实现主要通过两步 第一步 val idf new IDF().fit(tf) /*** Computes the inverse document frequency.* param dataset an RDD of term frequency vectors*/Since(1.1.0)def fit(dataset: RDD[Vector]): IDFModel {//返回 IDF向量 类型是DenseVector(values)val idf dataset.treeAggregate(new IDF.DocumentFrequencyAggregator(minDocFreq minDocFreq))(///minDocFreq是词最小出现频率,不填是默认0seqOp (df,v) df.add(v),//计算combOp (df1, df2) df1.merge(df2)//合并).idf()new IDFModel(idf)} 上面treeAggregate方法原型是def treeAggregate[U: ClassTag](zeroValue: U)( seqOp: (U, T) U, combOp: (U, U) U, depth: Int 2): U treeAggregate是使用mapPartition进行计算的需定义两个操作符一个用来计算一个用来合并结果 seqOp 用来计算分区结果的操作符 (an operator used to accumulate results within a partition) combOp 用来组合来自不同分区结果的关联操作符 an associative operator used to combine results from different partitions 该方法的调用返回new IDF.DocumentFrequencyAggregator对象,接着又调用DocumentFrequencyAggregator的idf方法,返回idf向量然后又通过new IDFModel(idf)返回IDFModel对象 下面是 DocumentFrequencyAggregator 类的方法即一个addseqOp一个mergecombOp private object IDF {/** Document frequency aggregator. */class DocumentFrequencyAggregator(val minDocFreq: Int) extends Serializable {/** number of documents 文档总数量*/ private var m 0L/** document frequency vector df向量词在出现过的文档个数*/private var df: BDV[Long] _def this() this(0) //构造方法如果minDocFreq没有传入的话默认值为0/** Adds a new document. 这个地方就是执行的每个分区里的计算操作 输入是tf向量*/def add(doc: Vector): this.type {if (isEmpty) {df BDV.zeros(doc.size)}doc match {//tf向量是 SparseVector 所以会走这个casecase SparseVector(size, indices, values) val nnz indices.sizevar k 0while (k nnz) {if (values(k) 0) {df(indices(k)) 1L //如果词在文章中出的频率大于0则该词的df1}k 1}case DenseVector(values) val n values.sizevar j 0while (j n) {if (values(j) 0.0) {df(j) 1L}j 1}case other throw new UnsupportedOperationException(sOnly sparse and dense vectors are supported but got ${other.getClass}.)}m 1Lthis}/** Merges another. 这个地方就是执行所有分区的合并操作*/def merge(other: DocumentFrequencyAggregator): this.type {if (!other.isEmpty) {m other.m //总文档数合并if (df null) {df other.df.copy} else {df other.df //df向量合并}}this}private def isEmpty: Boolean m 0L/** Returns the current IDF vector. 计算idf向量的方法 */def idf(): Vector {if (isEmpty) {throw new IllegalStateException(Havent seen any document yet.)}val n df.lengthval inv new Array[Double](n)var j 0while (j n) {/** If the term is not present in the minimum* number of documents, set IDF to 0. This* will cause multiplication in IDFModel to* set TF-IDF to 0.** Since arrays are initialized to 0 by default,* we just omit changing those entries.*/if (df(j) minDocFreq) { //如果df大于设定的值就计算idf的值如果不大于的话就直接设置为0inv(j) math.log((m 1.0) / (df(j) 1.0))}j 1}Vectors.dense(inv) //返回idf 密集向量}}
} 第二步通过上面的计算得到idf向量剩下的工作就是计算 tf*idf了会用到IDFMode类中的transform方法 val tfidf: RDD[Vector] idf.transform(tf) private object IDFModel {/*** Transforms a term frequency (TF) vector to a TF-IDF vector with a IDF vector** param idf an IDF vector* param v a term frequence vector* return a TF-IDF vector*/def transform(idf: Vector, v: Vector): Vector {val n v.sizev match {//会进入这个casecase SparseVector(size, indices, values) val nnz indices.sizeval newValues new Array[Double](nnz)var k 0while (k nnz) {newValues(k) values(k) * idf(indices(k)) //计算tf*idfk 1}Vectors.sparse(n, indices, newValues) //TFIDF向量case DenseVector(values) val newValues new Array[Double](n)var j 0while (j n) {newValues(j) values(j) * idf(j)j 1}Vectors.dense(newValues)case other throw new UnsupportedOperationException(sOnly sparse and dense vectors are supported but got ${other.getClass}.)}}
} 以上就是整个TFIDF的计算过程用到Spark Mlib 的密集向量DenseVector和稀疏向量(SparseVector 、RDD的聚合操作 主要相关的类有三个HashingTF 、IDF、IDFModel 还有就是利用spark Mlib 的TFIDF生成的TFIDF向量位置信息存是词hash后和向量维度取模后的值而不是该词在后面做一些分类或者文本推荐的时候如果需要用到词本身还需要做调整 转载于:https://my.oschina.net/xiaoluobutou/blog/670367