1.1. 前言
由于我们这边没有使用 M/R join的方法来实现 IP对应的区域,我们是使用M/R结合Pandas来实现。
我们计算每日PV经过这四个步骤:
Mapper: 将以行数据解析成 key=real_ip value=1的形式
Shuffle: 通过Shuffle后的结果会生成以 key 的值排序的 value迭代器
结果如: real_ip [1, 1, 1 ... 1, 1]
Reduce 1: 在这边我们计算出 real_ip 的访问量
输出如: None [sum([1, 1, 1 ... 1, 1]), key]
Reduce 2:
- 初始化 area_ip pandas 数据
- 对sum([1, 1, 1 ... 1, 1]) 进行排序并输出 TOP 100
输入如: 31943 140.205.127.2 浙江省杭州市
1.2. 代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
cat
mr_uv_real_ip_addr
.
py
#!/usr/bin/env python
#-*- coding:utf-8 -*-
from
mrjob
.
job
import
MRJob
from
mrjob
.
step
import
MRStep
from
mrjob
.
protocol
import
RawProtocol
from
ng_line_parser
import
NgLineParser
import
pandas
as
pd
import
heapq
import
socket
import
struct
import
sys
reload
(
sys
)
sys
.
setdefaultencoding
(
'utf-8'
)
class
MRUVRrealIpAddr
(
MRJob
)
:
OUTPUT_PROTOCOL
=
RawProtocol
ng_line_parser
=
NgLineParser
(
)
def
mapper
(
self
,
_
,
line
)
:
self
.
ng_line_parser
.
parse
(
line
)
yield
self
.
ng_line_parser
.
real_ip
,
1
def
reducer_sum
(
self
,
key
,
values
)
:
""
"统计 VU"
""
yield
None
,
[
str
(
sum
(
values
)
)
,
key
]
def
init_ip_addr_df
(
self
)
:
""
"读取IP Addr 文件构造 DataFrame 文件"
""
cols
=
[
'id'
,
'ip_start_num'
,
'ip_end_num'
,
'ip_start'
,
'ip_end'
,
'addr'
,
'operator'
]
area_ip_path
=
'/root/script/nginx_log_parse/area_ip.csv'
self
.
ip_addr_df
=
pd
.
read_csv
(
area_ip_path
,
sep
=
'\t'
,
names
=
cols
,
index_col
=
'id'
)
def
reducer_top100
(
self
,
_
,
values
)
:
""
"访问数降序"
""
for
cnt
,
ip
in
heapq
.
nlargest
(
100
,
values
,
key
=
lambda
x
:
int
(
x
[
0
]
)
)
:
ip_num
=
-
1
try
:
# 将IP转化成INT/LONG 数字
ip_num
=
socket
.
ntohl
(
struct
.
unpack
(
"I"
,
socket
.
inet_aton
(
str
(
ip
)
)
)
[
0
]
)
# 通过数字获得 地址 DataFrame
addr_df
=
self
.
ip_addr_df
[
(
self
.
ip_addr_df
.
ip_start_num
<=
ip_num
)
&
(
ip_num
<=
self
.
ip_addr_df
.
ip_end_num
)
]
# 通过索引值获得获得 地址
addr
=
addr_df
.
at
[
addr_df
.
index
.
tolist
(
)
[
0
]
,
'addr'
]
yield
cnt
,
'{ip} {addr}'
.
format
(
ip
=
ip
,
addr
=
addr
)
except
:
yield
cnt
,
ip
def
steps
(
self
)
:
return
[
MRStep
(
mapper
=
self
.
mapper
,
reducer
=
self
.
reducer_sum
)
,
MRStep
(
reducer_init
=
self
.
init_ip_addr_df
,
reducer
=
self
.
reducer_top100
)
]
def
main
(
)
:
MRUVRrealIpAddr
.
run
(
)
if
__name__
==
'__main__'
:
main
(
)
|
运行统计和输出结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
python
mr_uv_real_ip_addr
.
py
<
www
.
ttmark
.
com
.
access
.
log
No
configs
found
;
falling
back
on
auto
-
configuration
Creating
temp
directory
/
tmp
/
mr_uv_cdn_ip_addr
.
root
.
20160925.023837.331013
Running
step
1
of
2...
reading
from
STDIN
Running
step
2
of
2...
Streaming
final
output
from
/
tmp
/
mr_uv_cdn_ip_addr
.
root
.
20160925.023837.331013
/
output
.
.
.
31943
140.205.127.2
浙江省杭州市
26306
101.200.101.203
24667
101.200.101.214
.
.
.
.
.
.
4065
140.205.253.155
浙江省杭州市
4048
140.205.253.174
浙江省杭州市
3972
140.205.253.131
浙江省杭州市
Removing
temp
directory
/
tmp
/
mr_uv_cdn_ip_addr
.
root
.
20160925.023837.331013...
|
昵称: HH
QQ: 275258836
ttlsa群交流沟通(QQ群②: 6690706 QQ群③: 168085569 QQ群④: 415230207(新) 微信公众号: ttlsacom)
感觉本文内容不错,读后有收获?
逛逛衣服店,鼓励作者写出更好文章。
收 藏
转载请注明:成长的对话 » 真实IP对应区域-MRJob-Python数据分析(19)