mapreduce的典型案例 mapreduce项目实战案例

2025-01-10 00:59

高级大数据开发课程大纲那个

魔据条件不错,基础教育不错,有经验真正做到为学生负责到底,其它的,说实在的真的不敢保证。未来一定是大数据时代,现在选择还不迟,只要努力一定会有更好的发展前景,希望你能为有一个好的前程。

mapreduce的典型案例 mapreduce项目实战案例mapreduce的典型案例 mapreduce项目实战案例


mapreduce的典型案例 mapreduce项目实战案例


mapreduce是谁的一个基础组件

hadoop。MapReduce是hadoop的核心组件之一,所以是hadoop的一个基础组件。MapReduce主要提供的是计算模型,比较典型的应用案例就词频统计。

hive 什么样的operator会导致生成一个mapreduce任务

Hive中在做多表关联时,由于Hive的SQL优化引擎还不够强大,表的关联顺序不同往往导致产生不同数量的MapReduce作业数。这时就需要通过分析执行对SQL进行调整,以获得最少的MapReduce作业数。举一个例子(案例基于Hive 0.6.0):

create table ljn1(

k1 bigint,

k2 String,

v1 int

);

create table ljn2(

k1 bigint,

v2 int

);

create table ljn3(

k1 bigint,

v3 int

);

create table ljn4(

k1 bigint,

v4 int

);

create table ljn5(

k1 bigint,

v5 int

);

create table ljn6(

k2 string,

v6 int

);

然后看一下下面这个SQL的执行:

explain

select a.v1

from

ljn1 a

left outer join ljn2 b on (a.k1 = b.k1)

left outer join ljn3 c on (a.k1 = c.k1)

left outer join ljn4 d on (a.k1 = d.k1)

left outer join ljn6 e on (a.k2 = e.k2)

left outer join ljn5 f on (a.k1 = f.k1);

STAGE DEPENDENCIES:

Stage-5 is a root stage

Stage-1 depends on stages: Stage-5

Stage-2 depends on stages: Stage-1

Stage-0 is a root stage

STAGE PLANS:

Stage: Stage-5

Map Reduce

Alias -> Map Operator Tree:

aTableScan

alias: a

Reduce Output Operator

key expressions:

expr: k1

type: bigint

sort order: +

Map-reduce partition columns:

expr: k1

type: bigint

tag: 0

value expressions:

expr: k1

type: bigint

expr: k2

type: string

expr: v1

type: int

bTableScan

alias: b

Reduce Output Operator

key expressions:

expr: k1

type: bigint

sort order: +

Map-reduce partition columns:

expr: k1

type: bigint

tag: 1

Reduce Operator Tree:

Join Operator

condition map:

Left Outer Join0 to 1

condition expressions:

0 {VALUE._col0} {VALUE._col1} {VALUE._col2}

1handleSkewJoin: false

outputColumnNames: _col0, _col1, _col2

File Output Operator

compressed: true

GlobalTableId: 0

table:

input format: org.apache.hadoop.mapred.SequenceFileInputFormat

output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat

Stage: Stage-1

Map Reduce

Alias -> Map Operator Tree:

$INTNAME

Reduce Output Operator

key expressions:

expr: _col0

type: bigint

sort order: +

Map-reduce partition columns:

expr: _col0

type: bigint

tag: 0

value expressions:

expr: _col1

type: string

expr: _col2

type: int

cTableScan

alias: c

Reduce Output Operator

key expressions:

expr: k1

type: bigint

sort order: +

Map-reduce partition columns:

expr: k1

type: bigint

tag: 1

dTableScan

alias: d

Reduce Output Operator

key expressions:

expr: k1

type: bigint

sort order: +

Map-reduce partition columns:

expr: k1

type: bigint

tag: 2

fTableScan

alias: f

Reduce Output Operator

key expressions:

expr: k1

type: bigint

sort order: +

Map-reduce partition columns:

expr: k1

type: bigint

tag: 3

Reduce Operator Tree:

Join Operator

condition map:

Left Outer Join0 to 1

Left Outer Join0 to 2

Left Outer Join0 to 3

condition expressions:

0 {VALUE._col3} {VALUE._col4}

12

3handleSkewJoin: false

outputColumnNames: _col3, _col4

File Output Operator

compressed: true

GlobalTableId: 0

table:

input format: org.apache.hadoop.mapred.SequenceFileInputFormat

output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat

Stage: Stage-2

Map Reduce

Alias -> Map Operator Tree:

$INTNAME

Reduce Output Operator

key expressions:

expr: _col3

type: string

sort order: +

Map-reduce partition columns:

expr: _col3

type: string

tag: 0

value expressions:

expr: _col4

type: int

eTableScan

alias: e

Reduce Output Operator

key expressions:

expr: k2

type: string

sort order: +

Map-reduce partition columns:

expr: k2

type: string

tag: 1

Reduce Operator Tree:

Join Operator

condition map:

Left Outer Join0 to 1

condition expressions:

0 {VALUE._col10}

1handleSkewJoin: false

outputColumnNames: _col10

Select Operator

expressions:

expr: _col10

type: int

outputColumnNames: _col0

File Output Operator

compressed: true

GlobalTableId: 0

table:

input format: org.apache.hadoop.mapred.TextInputFormat

output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat

Stage: Stage-0

Fetch Operator

limit: -1

常规来讲,这个SQL非常简单,a表是主表,与其他表左外关联用到了k1和k2两个关联键,使用两个MapReduce作业完全可以搞定。但是这个SQL的执行却给出了3个作业:(Stage-0用做数据的最终展示,该作业可以忽略不计)第1个作业(Stage-5)是a表与b表关联;第2个作业(Stage-1)是第1个作业的中间结果再与c、d、f三表关联;第3个作业(Stage-2)是第2个作业的中间结果再与e表关联。

有点搞不懂了吧,第1和第2个作业明明可以合并在一起来完成的呀!其实我也搞不懂,从执行中看不出原由。而且如果这个SQL去掉c或者e其中的一个关联表,第1和第2个作业就可以合并在一起!很奇妙,我没有深入探究,应该是Hive的规则优化器还不够完美。

总之,遇到这种多表关联的情况一定要记得看一下执行,看看Hive是不是生成了多余的作业。如果Hive真的犯傻生成了多余的作业,就要尝试改变一下SQL的写法。通常是将关联键相同的表放在一起,如果还不行就再引入子查询。例如上面这个例子改为如下SQL就可以只生成2个作业了:

explain

select t.v1

from

(select a.k2,a.v1

from

ljn1 a

left outer join ljn2 b on (a.k1 = b.k1)

left outer join ljn3 c on (a.k1 = c.k1)

left outer join ljn4 d on (a.k1 = d.k1)

left outer join ljn5 f on (a.k1 = f.k1)

) t

left outer join ljn6 e on (t.k2 = e.k2)

;STAGE DEPENDENCIES:

Stage-1 is a root stage

Stage-2 depends on stages: Stage-1

Stage-0 is a root stage

STAGE PLANS:

Stage: Stage-1

Map Reduce

Alias -> Map Operator Tree:

t:a

TableScan

alias: a

Reduce Output Operator

key expressions:

expr: k1

type: bigint

sort order: +

Map-reduce partition columns:

expr: k1

type: bigint

tag: 0

value expressions:

expr: k2

type: string

expr: v1

type: int

t:b

TableScan

alias: b

Reduce Output Operator

key expressions:

expr: k1

type: bigint

sort order: +

Map-reduce partition columns:

expr: k1

type: bigint

tag: 1

t:c

TableScan

alias: c

Reduce Output Operator

key expressions:

expr: k1

type: bigint

sort order: +

Map-reduce partition columns:

expr: k1

type: bigint

tag: 2

t:d

TableScan

alias: d

Reduce Output Operator

key expressions:

expr: k1

type: bigint

sort order: +

Map-reduce partition columns:

expr: k1

type: bigint

tag: 3

t:f

TableScan

alias: f

Reduce Output Operator

key expressions:

expr: k1

type: bigint

sort order: +

Map-reduce partition columns:

expr: k1

type: bigint

tag: 4

Reduce Operator Tree:

Join Operator

condition map:

Left Outer Join0 to 1

Left Outer Join0 to 2

Left Outer Join0 to 3

Left Outer Join0 to 4

condition expressions:

0 {VALUE._col1} {VALUE._col2}

12

34

handleSkewJoin: false

outputColumnNames: _col1, _col2

Select Operator

expressions:

expr: _col1

type: string

expr: _col2

type: int

outputColumnNames: _col0, _col1

File Output Operator

compressed: true

GlobalTableId: 0

table:

input format: org.apache.hadoop.mapred.SequenceFileInputFormat

output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat

Stage: Stage-2

Map Reduce

Alias -> Map Operator Tree:

$INTNAME

Reduce Output Operator

key expressions:

expr: _col0

type: string

sort order: +

Map-reduce partition columns:

expr: _col0

type: string

tag: 0

value expressions:

expr: _col1

type: int

eTableScan

alias: e

Reduce Output Operator

key expressions:

expr: k2

hadoop的mapreduce常见算法案例有几种

基本MapReduce模式

计数与求和

问题陈述:

有许多文档,每个文档都有一些字段组成。需要计算出每个字段在所有文档中的出现次数或者这些字段的其他什么统计值。例如,给定一个log文件,其中的每条记录都包含一个响应时间,需要计算出平均响应时间。

解决方案:

让我们先从简单的例子入手。在下面的代码片段里,Mapper每遇到指定词就把频次记1,Reducer一个个遍历这些词的然后把他们的频次加和。

1 class Mapper

2 mod Map(docid id, doc d)

3 for all term t in doc d do

4 Emit(term t, count 1)

56 class Reducer

7 mod Reduce(term t, counts [c1, c2,...])

8 sum = 0

9 for all count c in [c1, c2,...] do

10 sum = sum + c

11 Emit(term t, count sum)

这种方法的缺点显而易见,Mapper提交了太多无意义的计数。它完全可以通过先对每个文档中的词进行计数从而减少传递给Reducer的数据量:

1 class Mapper

2 mod Map(docid id, doc d)

3 H = new AssociativeArray

4 for all term t in doc d do

5 H{t} = H{t} + 1

6 for all term t in H do

7 Emit(term t, count H{t})

如果要累计计数的的不只是单个文档中的内容,还包括了一个Mapper处理的所有文档,那就要用到Combiner了:

1 class Mapper

2 mod Map(docid id, doc d)

3 for all term t in doc d do

4 Emit(term t, count 1)

56 class Combiner

7 mod Combine(term t, [c1, c2,...])

8 sum = 0

9 for all count c in [c1, c2,...] do

10 sum = sum + c

11 Emit(term t, count sum)

12

13 class Reducer

14 mod Reduce(term t, counts [c1, c2,...])

15 sum = 0

16 for all count c in [c1, c2,...] do

17 sum = sum + c

18 Emit(term t, count sum)

应用:Log 分析, 数据查询

整理归类

问题陈述:

有一系列条目,每个条目都有几个属性,要把具有同一属性值的条目都保存在一个文件里,或者把条目按照属性值分组。 最典型的应用是倒排索引。

解决方案:

解决方案很简单。 在 Mapper 中以每个条目的所需属性值作为 key,其本身作为值传递给 Reducer。 Reducer 取得按照属性值分组的条目,然后可以处理或者保存。如果是在构建倒排索引,那么 每个条目相当于一个词而属性值就是词所在的文档ID。

应用:倒排索引, ETL

过滤 (文本查找),解析和校验

问题陈述:

设有很多条记录,需要从其中找出满足某个条件的所有记录,或者将每条记录传换成另外一种形式(转换作相对于各条记录,即对一条记录的作与其他记录无关)。像文本解析、特定值抽取、格式转换等都属于后一种用例。

解决方案:

非常简单,在Mapper 里逐条进行作,输出需要的值或转换后的形式。

应用:日志分析,数据查询,ETL,数据校验

分布式任务执行

问题陈述:

大型计算可以分解为多个部分分别进行然后合并各个计算的结果以获得最终结果。

解决方案: 将数据切分成多份作为每个 Mapper 的输入,每个Mapper处理一份数据,执行同样的运算,产生结果,Reducer把多个Mapper的结果组合成一个。

案例研究: 数字通信系统模拟

像 WiMAX 这样的数字通信模拟软件通过系统模型来传输大量的随机数据,然后计算传输中的错误几率。 每个 Mapper 处理样本 1/N 的数据,计算出这部分数据的错误率,然后在 Reducer 里计算平均错误率。

应用:工程模拟,数字分析,性能测试

排序

问题陈述:

有许多条记录,需要按照某种规则将所有记录排序或是按照顺序来处理记录。

解决方案: 简单排序很好办 – Mappers 将待排序的属性值为键,整条记录为值输出。 不过实际应用中的排序要更加巧妙一点, 这就是它之所以被称为MapReduce 核心的原因(“核心”是说排序?因为证明Hadoop计算能力的实验是大数据排序?还是说Hadoop的处理过程中对key排序的环节?)。在实践中,常用组合键来实现二次排序和分组。

MapReduce 最初只能够对键排序, 但是也有技术利用可以利用Hadoop 的特性来实现按值排序。想了解的话可以看这篇博客。

按照BigTable的概念,使用 MapReduce来对最初数据而非中间数据排序,也即保持数据的有序状态更有好处,必须注意这一点。换句话说,在数据插入时排序一次要比在每次查询数据的时候排序更高效。

应用:ETL,数据分析

非基本 MapReduce 模式

迭代消息传递 (图处理)

问题陈述:

设一个实体网络,实体之间存在着关系。 需要按照与它比邻的其他实体的属性计算出一个状态。这个状态可以表现为它和其它之间的距离, 存在特定属性的邻接点的迹象, 邻域密度特征等等。

解决方案:

网络存储为系列的结合,每个包含有其所有邻接点ID的列表。按照这个概念,MapReduce 迭代进行,每次迭代中每个都发消息给它的邻接点。邻接点根据接收到的信息更新自己的状态。当满足了某些条件的时候迭代停止,如达到了迭代次数(网络半径)或两次连续的迭代几乎没有状态改变。从技术上来看,Mapper 以每个邻接点的ID为键发出信息,所有的信息都会按照接受分组,reducer 就能够重算各的状态然后更新那些状态改变了的。下面展示了这个算法:

1 class Mapper

2 mod Map(id n, object N)

3 Emit(id n, object N)

4 for all id m in N.OutgoingRelations do

5 Emit(id m, message getMessage(N))

67 class Reducer

8 mod Reduce(id m, [s1, s2,...])

9 M = null

10 messages = []

11 for all s in [s1, s2,...] do

12 if IsObject(s) then

13 M = s

14 else // s is a message

15 messages.add(s)

16 M.State = calculateState(messages)

17 Emit(id m, M)

一个的状态可以迅速的沿着网络传全网,那些被感染了的又去感染它们的邻居,整个过程就像下面的图示一样:

案例研究: 沿分类树的有效性传递

问题陈述:

这个问题来自于真实的电子商务应用。将各种货物分类,这些类别可以组成一个树形结构,比较大的分类(像男人、女人、儿童)可以再分出小分类(像男裤或女装),直到不能再分为止(像男式蓝色牛仔裤)。这些不能再分的基层类别可以是有效(这个类别包含有货品)或者已无效的(没有属于这个分类的货品)。如果一个分类至少含有一个有效的子分类那么认为这个分类也是有效的。我们需要在已知一些基层分类有效的情况下找出分类树上所有有效的分类。

解决方案:

这个问题可以用上一节提到的框架来解决。我们咋下面定义了名为 getMessage和 calculateState 的方法:

1 class N

2 State in {True = 2, False = 1, null = 0},

3 initialized 1 or 2 for end-of-line categories, 0 otherwise

4 mod getMessage(object N)

5 return N.State

6 mod calculateState(state s, data [d1, d2,...])

7 return max( [d1, d2,...] )

案例研究:广度优先搜索

问题陈述:需要计算出一个图结构中某一个到其它所有的距离。

解决方案: Source源给所有邻接点发出值为0的信号,邻接点把收到的信号再转发给自己的邻接点,每转发一次就对信号值加1:

1 class N

2 State is distance,

3 initialized 0 for source node, INFINITY for all other nodes

4 mod getMessage(N)

5 return N.State + 1

6 mod calculateState(state s, data [d1, d2,...])

7 min( [d1, d2,...] )

案例研究:网页排名和 Mapper 端数据聚合

这个算法由Google提出,使用权威的PageRank算法,通过连接到一个网页的其他网页来计算网页的相关性。真实算法是相当复杂的,但是核心思想是权重可以传播,也即通过一个的各联接的权重的均值来计算自身的权重。

1 class N

2 State is PageRank

3 mod getMessage(object N)

4 return N.State / N.OutgoingRelations.size()

5 mod calculateState(state s, data [d1, d2,...])

6 return ( sum([d1, d2,...]) )

要指出的是上面用一个数值来作为评分实际上是一种简化,在实际情况下,我们需要在Mapper端来进行聚合计算得出这个值。下面的代码片段展示了这个改变后的逻辑 (针对于 PageRank 算法):

1 class Mapper

2 mod Initialize

3 H = new AssociativeArray

4 mod Map(id n, object N)

5 p = N.PageRank / N.OutgoingRelations.size()

6 Emit(id n, object N)

7 for all id m in N.OutgoingRelations do

8 H{m} = H{m} + p

9 mod Close

10 for all id n in H do

11 Emit(id n, value H{n})

12

13 class Reducer

14 mod Reduce(id m, [s1, s2,...])

15 M = null

16 p = 0

17 for all s in [s1, s2,...] do

18 if IsObject(s) then

19 M = s

20 else

21 p = p + s

22 M.PageRank = p

23 Emit(id m, M)

应用:图分析,网页索引

值去重 (对项计数)

问题陈述: 记录包含值域F和值域 G,要分别统计相同G值的记录中不同的F值的数目 (相当于按照 G分组).

这个问题可以推而广之应用于分面搜索(某些电子商务网站称之为Narrow Search)

Record 1: F=1, G={a, b}

Record 2: F=2, G={a, d, e}

Record 3: F=1, G={b}

Record 4: F=3, G={a, b}

Result:

a -> 3 // F=1, F=2, F=3

b -> 2 // F=1, F=3

d -> 1 // F=2

e -> 1 // F=2

解决方案 I:

种方法是分两个阶段来解决这个问题。阶段在Mapper中使用F和G组成一个复合值对,然后在Reducer中输出每个值对,目的是为了保证F值的性。在第二阶段,再将值对按照G值来分组计算每组中的条目数。

阶段:

1 class Mapper

2 mod Map(null, record [value f, categories [g1, g2,...]])

3 for all category g in [g1, g2,...]

4 Emit(record [g, f], count 1)

56 class Reducer

7 mod Reduce(record [g, f], counts [n1, n2, ...])

8 Emit(record [g, f], null )

第二阶段:

1 class Mapper

2 mod Map(record [f, g], null)

3 Emit(value g, count 1)

45 class Reducer

6 mod Reduce(value g, counts [n1, n2,...])

7 Emit(value g, sum( [n1, n2,...] ) )

解决方案 II:

第二种方法只需要一次MapReduce 即可实现,但扩展性不强。算法很简单-Mapper 输出值和分类,在Reducer里为每个值对应的分类去重然后给每个所属的分类计数加1,再在Reducer结束后将所有计数加和。这种方法适用于只有有限个分类,而且拥有相同F值的记录不是很多的情况。例如网络日志处理和用户分类,用户的总数很多,但是每个用户的是有限的,以此分类得到的类别也是有限的。值得一提的是在这种模式下可以在数据传输到Reducer之前使用Combiner来去除分类的重复值。

1 class Mapper

2 mod Map(null, record [value f, categories [g1, g2,...] )

3 for all category g in [g1, g2,...]

4 Emit(value f, category g)

56 class Reducer

7 mod Initialize

8 H = new AssociativeArray : category -> count

9 mod Reduce(value f, categories [g1, g2,...])

10 [g1', g2',..] = ExcludeDuplicates( [g1, g2,..] )

11 for all category g in [g1', g2',...]

12 H{g} = H{g} + 1

13 mod Close

14 for all category g in H do

15 Emit(category g, count H{g})

应用:日志分析,用户计数

互相关

问题陈述:有多个各由若干项构成的组,计算项两两共同出现于一个组中的次数。如项数是N,那么应该计算NN。

这种情况常见于文本分析(条目是单词而元组是句子),市场分析(购买了此物的客户还可能购买什么)。如果NN小到可以容纳于一台机器的内存,实现起来就比较简单了。

配对法

种方法是在Mapper中给所有条目配对,然后在Reducer中将同一条目对的计数加和。但这种做法也有缺点:

使用 combiners 带来的的好处有限,因为很可能所有项对都是的

不能有效利用内存

1 class Mapper

2 mod Map(null, s [i1, i2,...] )

3 for all i in [i1, i2,...]

4 for all j in [i1, i2,...]

5 Emit(pair [i j], count 1)

67 class Reducer

8 mod Reduce(pair [i j], counts [c1, c2,...])

9 s = sum([c1, c2,...])

10 Emit(pair[i j], count s)

Stripes Approach(条方法?不知道这个名字怎么理解)

第二种方法是将数据按照pair中的项来分组,并维护一个关联数组,数组中存储的是所有关联项的计数。The second approach is to group data by the first in pair and maintain an associative array (“stripe”) where counters for all adjacent s are accumulated. Reducer receives all stripes for leading i, merges them, and emits the same result as in the Pairs approach.

中间结果的键数量相对较少,因此减少了排序消耗。

可以有效利用 combiners。

可在内存中执行,不过如果没有正确执行的话也会带来问题。

实现起来比较复杂。

一般来说, “stripes” 比 “pairs” 更快

1 class Mapper

2 mod Map(null, s [i1, i2,...] )

3 for all i in [i1, i2,...]

4 H = new AssociativeArray : -> counter

5 for all j in [i1, i2,...]

6 H{j} = H{j} + 1

7 Emit( i, stripe H)

89 class Reducer

10 mod Reduce( i, stripes [H1, H2,...])

11 H = new AssociativeArray : -> counter

12 H = merge-sum( [H1, H2,...] )

13 for all j in H.keys()

14 Emit(pair [i j], H{j})

应用:文本分析,市场分析

参考资料:Lin J. Dyer C. Hirst G. Data Intensive Processing MapReduce

用MapReduce 表达关系模式

在这部分我们会讨论一下怎么使用MapReduce来进行主要的关系作。

筛选(Selection)

1 class Mapper

2 mod Map(rowkey key, tuple t)

3 if t satisfies the predicate

4 Emit(tuple t, null)

投影(Projection)

投影只比筛选稍微复杂一点,在这种情况下我们可以用Reducer来消除可能的重复值。

1 class Mapper

2 mod Map(rowkey key, tuple t)

3 tuple g = project(t) // extract required fields to tuple g

4 Emit(tuple g, null)

56 class Reducer

MapReduce之金庸的江湖人物分析项目

通过一个综合数据分析案例:”金庸的江湖——金庸武侠中的人物关系挖掘“,来学习和掌握MapReduce程序设计。通过本项目的学习,可以体会如何使用MapReduce完成一个综合性的数据挖掘任务,包括全流程的数据预处理、数据分析、数据后处理等。

1 任务1 数据预处理

1.1 任务描述

从原始的金庸文本中,抽取出与人物互动相关的数据,而屏蔽掉与人物关系无关的文本内容,为后面的基于人物共现的分析做准备。

1.2 关键问题

1.2.1 中文分词和人名提取

使用开源的Ansj_seg进行分词。Ansj_seg不仅支持中文分词,还允许用户自定义词典,在分词前,将人名列表到添加用户自定义的词典,可以识别金庸武侠中的人名。

但实际测试的时候发现,Ansj_seg分词会出现的歧义问题,比如“汉子”属于人名列表中的人名(nr),但Ansj_seg可能会错误地将它分类为名词(n)。因此,如果根据词性提取人名,会导致提取的人名太少。解决方法是在提取人名的时候,需要在将人名加入用户自定义词典的同时,构造一个包含所有人名的字典,对分词的结果逐个进行测试,如果在字典里,就是人名。

1.2.2 文件传输

使用HDFS传递数据。考虑到人名列表文件已经存放在了HDFS里,所以使用HDFS的方式不需要移动人名列表文件,只需要在Configuration中设置文件在HDFS文件系统中的路径,然后在Mapper的setup()函数里调用HDFS的函数获取文件内容即可。

1.2.3 单词同现算法

两个单词近邻关系的定义:实验要求中已经说明,同现关系为一个段落。

段落划分:非常庆幸的是,原文中一个段落就是一行,因此,不需要自己定义FileInputFormat和RecordReader。

1.3 MapReduce设计

1.3.1 Mapper

1.3.2 Reducer

1.3.3 Driver

2 任务2 特征抽取:人物同现统计

2.1 任务描述

完成基于单词同现算法的人物同现统计。在人物同现分析中,如果两个人在原文的同一段落中出现,则认为两个人发生了一次同现关系。我们需要对人物之间的同现关系次数进行统计,同现关系次数越多,则说明两人的关系越密切。

2.2 关键问题

2.2.1 人名冗余

在同一段中,人名可能多次出现,任务一只负责提取出所有的人名,没有剔除多余的人名,任务必须在输出同现次数之前处理冗余人名。我的做法是在Mapper中创建一个,把所有人名放入中,会自动剔除冗余的人名。

2.2.2 同现次数统计

两个人物之间应该输出两个键值对,如“狄云”和“戚芳”,应该输出“<狄云,戚芳> 1”和“<戚芳,狄云> 1”。多个段落中允许输出相同的键值对,因此,Reducer中需要整合具有相同键的输出,输出总的同现次数。

2.3 MapReduce设计

2.3.1 Mapper

2.3.2 Reducer

3 任务3 特征处理:人物关系图构建与特征归一化

3.1 任务描述

根据任务2人物之间的共现关系,生物之间的关系图。人物关系使用邻接表的形式表示,人物是顶点,人物之间关系是边,两个人的关系的密切程度由共现次数体现,共现次数越高,边权重越高。另外需要对共现次数进行归一化处理,确保某个顶点的出边权重和为1。

3.2 关键问题

3.2.1 确保人物的所有邻居输出到相同结点处理

在Mapper结点将输入的键值对“<狄云,戚芳> 1”拆分,输出新的键值对“<狄云> 戚芳:1”,“狄云”的所有邻居会被分配给同一个Reducer结点处理。

3.2.2 归一化

在Reducer结点首先统计该人物与所有邻居同现的次数和sum,每个邻居的的同现次数除以sum就得到共现概率。为了提高效率,在次遍历邻居的时候,可以把名字和共现次数保存在链表里,避免重复处理字符串。

3.3 MapReduce设计

3.3.1 Mapper

3.3.2 Reducer

4.1 任务描述

经过数据预处理并获得任务的关系图之后,就可以对人物关系图作数据分析,其中一个典型的分析任务是:PageRank 值计算。通过计算 PageRank,我们就可以定量地获知金庸武侠江湖中的“主角”们是哪些。

4.2 PageRank原理

PageRank算法由Google的两位创始人佩奇和布林在研究网页排序问题时提出,其核心思想是:如果一个网页被很多其它网页链接到,说明这个网页很重要,它的PageRank值也会相应较高;如果一个PageRank值很高的网页链接到另外某个网页,那么那个网页的PageRank值也会相应地提高。

相应地,PageRank算法应用到人物关系图上可以这么理解:如果一个人物与多个人物存在关系连接,说明这个人物是重要的,其PageRank值响应也会较高;如果一个PageRank值很高的人物与另外一个人物之间有关系连接,那么那个人物的PageRank值也会相应地提高。一个人物的PageRank值越高,他就越可能是中的主角。

PageRank有两个比较常用的模型:简单模型和随机浏览模型。由于本次设计考虑的是人物关系而不是网页跳转,因此简单模型比较合适。简单模型的计算公式如下,其中Bi为所有连接到人物i的,Lj为认为人物j对外连接边的总数:

在本次设计的任务3中,已经对每个人物的边权值进行归一化处理,边的权值可以看做是对应连接的人物占总边数的比例。设表示人物i在人物j所有边中所占的权重,则PageRank计算公式可以改写为:

4.3.2 PageRanklter类

GraphBuilder将数据处理成可供迭代的格式,PageRank的迭代过程由PageRanklter类实现,包含一个Map和Reduce过程。Map过程产生两种类型的:<人物名,PageRrank值>,<人物名,关系链表>。个人物名是关系链表中的各个链出人物名,其PR值由计算得到;第二个人物名是本身人物名,目的是为了保存该人物的链出关系,以保证完成迭代过程。以上面的输出为例,则Map过程产生的键值对为<完颜萍, 1.0 0.005037>,<小龙女, 1.0 0.017632>,……,<一灯, #完颜萍:0.005037783;……>。

Reduce过程将同一人物名的汇聚在一起,如果value是PR值,则累加到sum变量;如果value是关系链表则保存为List。遍历完迭代器里所有的元素后输出键值对<人物名,sum#List>,这样就完成了一次迭代过程。

PR值排名不变的比例随迭代次数变化的关系图如下,由于我们考虑的是找出中的主角,所以只要关心PR值前100名的人物的排名的变化情况,可以看到迭代次数在10以后,PR值排名不变的比例已经趋于稳定了,所以基于效率考虑,选取10作为PR的迭代次数。

4.3.3 PageRankViewer类

当所有迭代都完成后,我们就可以对所有人物的PageRank值进行排序,该过程由PageRankViewer类完成,包含一个Map和Reduce过程。Map过程只提取迭代过程输出结果中的人物名以及对应的PageRank值,并以PageRank值作为key,人物名作为value输出。为了实现PageRank值从大到小排序,需要实现DescFloatComparator类来重写compare方法以达成逆序排序。由于可能存在PageRank值相同的情况,所以还需要一个reduce过程来把因PageRank值相同而汇聚到一起的人物名拆开并输出。

PageRankMapper

PageRankReducer

Driver类

5.1 任务描述

标签传播(Label Propagation)是一种半监督的图分析算法,他能为图上的顶点打标签,进行图顶点的聚类分析,从而在一张类似社交网络图中完成社区发现。在人物关系图中,通过标签传播算法可以将关联度比较大的人物分到同一标签,可以直观地分析人物间的关系。

5.2 标签传播算法原理

标签传播算法(Label Propagation Algorithm,后面简称LPA)是由Zhu等人于2002年提出,它是一种基于图的半监督学习方法,其基本思路是用已标记的标签信息去预测未标记的标签信息。LPA基本过程为:(1)每个结点初始化一个特定的标签值;(2)逐轮更新所有的标签,直到所有的标签不再发生变化为止。对于每一轮刷新,标签的刷新规则如下:对于某一个,考察其所有邻居的标签,并进行统计,将出现个数最多的那个标签赋值给当前。当个数最多的标签不时,随机选择一个标签赋值给当前。

LPA与PageRank算法相似,同样需要通过迭代过程来完成。在标签传播算法中,的标签更新通常有同步更新和异步更新两种方法。同步更新是指,x在t时刻的更新是基于邻接在t-1时刻的标签。异步更新是指,x在t时刻更新时,其部分邻接是t时刻更新的标签,还有部分的邻接是t-1时刻更新的标签。若LPA算法在标签传播过程中采用的是同步更新,则在二分结构网络中,容易出现标签震荡的现象。在本次设计中,我们考虑到了两种更新方法,并进行了比较。

5.3 标签传播算法在mapreduce上的实现细节

5.3.1 LPAInit类

为实现LPA的迭代过程,需要先给每个人物赋予一个独特标签,标签初始化由LPAInit类完成,仅包含一个Map过程。标签由数字表示,Map过程由1开始,为每一个人物名赋予一个独特的标签。为了便于后面的可视化分析,我们需要把PageRank值和标签整合在一起,所以LPAInit的输入文件直接采用PageRank过程的输出文件,格式如下:

5.3.2 LPAIteration类

LPAIteration类完成标签的更新过程,其格式与LPAInit的输出格式一致,包含一个Map和Reduce过程。Map过程对输入的每一行进行切割,输出四种格式的:<人物名,关系链表>,<人物名,PageRank值>,<人物名,标签>,<链出人物名,标签#起点人物名>。第四种格式个键值对是为了将该的标签传给其所有邻居。

Reduce过程对value值进行识别,识别可以通过Map过程把预先定义好的特殊字符如‘#’、‘@’来实现前缀到value上来实现。由于人物关系图中的各个边都是有权重的,并且代表两个人物的相关程度,所以标签更新过程不是用边数最多的标签而是权重标签来更新,我们可以预先把权重的若干个保存到一个链表中,如果存在多个权重相同的标签,则随机选取一个作为该人名新的标签。异步方法更新标签需要使用一个哈希表来存储已经更新标签的人物名和它们的新标签,并且在更新标签时使用该哈希表里面的标签。同步方法更新标签则不需要存储已更新的标签。

本次设计中比较了同步和异步更新两种方法,下图为标签不变的比例随迭代次数的变化。可以发现,异步收敛速度更快,只要6次迭代即可完全收敛,且标签不变的比例可达。而同步更新方法则不能达到,说明人物关系图中存在子图是二部子图。

5.3.3 LPAReorganize类

LPA算法迭代收敛后,所有人物名的标签不再变化,但是此时的标签排列是散乱的,需要把同一标签的人物名整合在一起。该过程由LPAReorganize类完成,包含一个Map和Reduce过程。Map过程对输入的每一行进行切割,以<标签,人物名#PageRank值#关系链表>格式输出。Reduce过程中,同一标签的人物名汇聚在一起,然后根据每个标签人物的大小从大到小排序,重新赋予标签(从1开始)。这样输出文件中同一标签的人物名就会聚集在一起。的输出格式如下:

5.3.2 LPAMapper类

LPAIteration类完成标签的更新过程,其格式与LPAInit的输出格式一致,包含一个Map和Reduce过程。Map过程对输入的每一行进行切割,输出四种格式的:<人物名,关系链表>,<人物名,PageRank值>,<人物名,标签>,<链出人物名,标签#起点人物名>。第四种格式个键值对是为了将该的标签传给其所有邻居。

5.3.2 LPAReducer类

Reduce过程对value值进行识别,识别可以通过Map过程把预先定义好的特殊字符如‘#’、‘@’来实现前缀到value上来实现。由于人物关系图中的各个边都是有权重的,并且代表两个人物的相关程度,所以标签更新过程不是用边数最多的标签而是权重标签来更新,我们可以预先把权重的若干个保存到一个链表中,如果存在多个权重相同的标签,则随机选取一个作为该人名新的标签。异步方法更新标签需要使用一个哈希表来存储已经更新标签的人物名和它们的新标签,并且在更新标签时使用该哈希表里面的标签。同步方法更新标签则不需要存储已更新的标签。

Driver类

6.1 可视化工具Gephi

Gephi是一款开源的跨平台的基于JVM的复杂网络分析软件。把PageRank和LPA的结果,转化为gexf格式,在Gephi中绘制图像并分析大数据实验结果,更加直观、易于理解。

gexf实际上是一种特殊的XML文件,python的gexf库提供了接口方便我们编辑和生成gexf文件,因此我们选择使用python处理PageRank和LPA的结果。顶点有两种属性,LPA生成的标签和PageRank计算的PR值,每条边的权重是PageRank计算出的值。在可视化的时候,标签决定顶点显示的颜色,PR值决定标签的

6.2 可视化预处理

编写一个python程序transform2xml.py,将数据分析部分得到的PR值,标签以及点连接关系处理成一个可供Gephi读取的gexf文件。

6.3 可视化结果

7 输出结果截图

7.2 同现次数统计

7.4 PageRank

发现公司里的大数据开发挣得很多,想转行,

您好:

大数据技术前景我们是毋庸置疑的,而对于学习更是争先恐后。在这些人中,不乏有已经在IT圈混迹好几年的程序员,自然也有初出茅庐的零基础小白。说实话,大数据不比编程学习,还是需要一定的基础的,时间起码需要半年左右。

想要成为一个的大数据人才并不容易,你不仅需要系统的学习理论知识,熟练掌握技能技巧,还需要具备一定的开发经验,而这些仅靠自学是远远不够的,比较好的方式就是参加专业学习。希望可以帮到你。

理工程师是职称,不需要考试,只要工作年限到了就可以评,大专要两年吧。 这两个都有用啊,是都弄上。

大数据的开发也是需要一定基础的,你可以试试。

建议就是如果决定要学Hadoop开发一定要坚持下去,不能半途而废。大数据人才现在确实很缺,我们公司就有2个空缺一直招不到人。八斗学院的课程设置还不错,公司的要求里提到的技术他们都讲到了,挺适合学习转型的。

如何快速地编写和运行一个属于自己的MapReduce例子程序

大数据的时代, 到处张嘴闭嘴都是Hadoop, MapReduce, 不跟上时代怎么行? 可是对一个hadoop的新手, 写一个属于自己的MapReduce程序还是小有点难度的, 需要建立一个men项目, 还要搞清楚各种库的依赖, 再加上编译运行, 基本上头大两圈了吧。 这也使得很多只是想简单了解一下MapReduce的人望而却步。

下一篇:明日方舟芙蓉_明日方舟芙蓉皮肤
上一篇:
相关文章
返回顶部小火箭