1.1. 前言
我们计算CDN IP请求数经过这四个步骤:
Mapper: 将以行数据解析成 key=cdn_ip value=1的形式
Shuffle: 通过Shuffle后的结果会生成以 key 的值排序的 value迭代器
结果如: cdn_ip [1, 1, 1 ... 1, 1]
Reduce 1: 在这边我们计算出 cdn_ip 的访问量
输出如: None [sum([1, 1, 1 ... 1, 1]), key]
Reduce 2: 对sum([1, 1, 1 ... 1, 1]) 进行排序并输出 TOP 100
输入如: 31943 "140.205.127.2"
1.2. 代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
cat
mr_uv_cdn_ip
.
py
# -*- coding: utf-8 -*-
from
mrjob
.
job
import
MRJob
from
mrjob
.
step
import
MRStep
from
ng_line_parser
import
NgLineParser
import
heapq
class
MRUVCdnIp
(
MRJob
)
:
ng_line_parser
=
NgLineParser
(
)
def
mapper
(
self
,
_
,
line
)
:
self
.
ng_line_parser
.
parse
(
line
)
yield
self
.
ng_line_parser
.
cdn_ip
,
1
def
reducer_sum
(
self
,
key
,
values
)
:
""
"统计 VU"
""
yield
None
,
[
sum
(
values
)
,
key
]
def
reducer_top100
(
self
,
_
,
values
)
:
""
"访问数降序"
""
for
cnt
,
ip
in
heapq
.
nlargest
(
100
,
values
)
:
yield
cnt
,
ip
def
steps
(
self
)
:
return
[
MRStep
(
mapper
=
self
.
mapper
,
reducer
=
self
.
reducer_sum
)
,
MRStep
(
reducer
=
self
.
reducer_top100
)
]
def
main
(
)
:
MRUVCdnIp
.
run
(
)
if
__name__
==
'__main__'
:
main
(
)
|
运行统计和输出结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
python
mr_uv_cdn_ip
.
py
<
www
.
ttmark
.
com
.
access
.
log
No
configs
found
;
falling
back
on
auto
-
configuration
Creating
temp
directory
/
tmp
/
mr_uv_cdn_ip
.
root
.
20160924.153339.597058
Running
step
1
of
2...
reading
from
STDIN
Running
step
2
of
2...
Streaming
final
output
from
/
tmp
/
mr_uv_cdn_ip
.
root
.
20160924.153339.597058
/
output
.
.
.
31943
"140.205.127.2"
26306
"101.200.101.203"
24667
"101.200.101.214"
.
.
.
.
.
.
4065
"140.205.253.155"
4048
"140.205.253.174"
3972
"140.205.253.131"
Removing
temp
directory
/
tmp
/
mr_uv_cdn_ip
.
root
.
20160924.153339.597058...
|
昵称: HH
QQ: 275258836
ttlsa群交流沟通(QQ群②: 6690706 QQ群③: 168085569 QQ群④: 415230207(新) 微信公众号: ttlsacom)
感觉本文内容不错,读后有收获?
逛逛衣服店,鼓励作者写出更好文章。
收 藏
转载请注明:成长的对话 » CDN_IP请求数-MRJob-Python数据分析(12)