MySQL应用架构优化-实时数据处理(2)
22.4.7. 初始化Storm
这边我们在(10.10.10.21 storm_1、10.10.10.22 storm_2、10.10.10.23 storm_3)这三台部署storm。
- 到官网下载Stormtorm(apache-storm-0.9.6.zip)
- 解压到/usr/local/目录下,三台机子都执行同样的命令
1
2
|
[
root
@
storm_1
wordcount
]
# unzip apache-storm-0.9.6.zip
[
root
@
storm_1
wordcount
]
# mv apache-storm-0.9.6 /usr/local/
|
- 设置yaml配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
[
root
@
storm_2
wordcount
]
# cat /usr/local/apache-storm-0.9.6/conf/storm.yaml
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
########### These MUST be filled in for a storm configuration
# storm.zookeeper.servers:
# - "server1"
# - "server2"
#
# nimbus.host: "nimbus"
#
#
# ##### These may optionally be filled in:
#
## List of custom serializations
# topology.kryo.register:
# - org.mycompany.MyType
# - org.mycompany.MyType2: org.mycompany.MyType2Serializer
#
## List of custom kryo decorators
# topology.kryo.decorators:
# - org.mycompany.MyDecorator
#
## Locations of the drpc servers
# drpc.servers:
# - "server1"
# - "server2"
## Metrics Consumers
# topology.metrics.consumer.register:
# - class: "backtype.storm.metric.LoggingMetricsConsumer"
# parallelism.hint: 1
# - class: "org.mycompany.MyMetricsConsumer"
# parallelism.hint: 1
# argument:
# - endpoint: "metrics-collector.mycompany.org"
storm
.
zookeeper
.
servers
:
-
"storm_1"
-
"storm_2"
-
"storm_3"
nimbus
.
host
:
"storm_1"
storm
.
local
.
dir
:
"/u01/storm/status"
supervisor
.
slots
.
ports
:
-
6700
-
6701
-
6702
-
6703
|
- 创建Storm运行时目录
1
|
[
root
@
storm_1
wordcount
]
# mkdir -p /u01/storm/status
|
- 启动Storm
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
# Node1:启动 storm UI界面
[
root
@
storm_1
wordcount
]
# /usr/local/apache-storm-0.9.6/bin/storm ui > /dev/null 2>&1 &
# Node1:启动 storm nimbus
[
root
@
storm_1
wordcount
]
# /usr/local/apache-storm-0.9.6/bin/storm nimbus > /dev/null 2>&1 &
# Node2:启动 supervisor
[
root
@
storm_1
wordcount
]
# /usr/local/apache-storm-0.9.6/bin/storm supervisor > /dev/null 2>&1 &
# Node3:启动 supervisor
[
root
@
storm_1
wordcount
]
# /usr/local/apache-storm-0.9.6/bin/storm supervisor > /dev/null 2>&1 &
# 在各个节点上运行 jps 查看服务状态
[
root
@
storm_1
wordcount
]
# lps
2151
core
2097
QuorumPeerMain
3969
Jps
2191
nimbus
|
- 开启web界面访问Storm UI
看到上的界面就说明我们的Storm已经部署完毕了。
22.4.1. 构建streamparse(Python Storm框架)
streamparse 是Python Storm的一个框架,他可以将python代码打包为一个jar包运行在Storm中。
官网:http://streamparse.readthedocs.io/en/master/quickstart.html。
(PS:streamparse 3 以上的拓扑已经改变。和作者沟通过他是为了让streamparse能够更好的独立运行,从而脱离storm环境。)
- 创建3机信任,分别在3台机子上都生成ssh的公钥,分别执行以下命令
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
[
root
@
storm_1
~
]
# ssh-keygen -t rsa
Generating
public
/
private
rsa
key
pair
.
Enter
file
in
which
to
save
the
key
(
/
root
/
.
ssh
/
id_rsa
)
:
Enter
passphrase
(
empty
for
no
passphrase
)
:
Enter
same
passphrase
again
:
Your
identification
has
been
saved
in
/
root
/
.
ssh
/
id_rsa
.
Your
public
key
has
been
saved
in
/
root
/
.
ssh
/
id_rsa
.
pub
.
The
key
fingerprint
is
:
1e
:
20
:
62
:
da
:
f5
:
fb
:
69
:
32
:
da
:
ac
:
09
:
ef
:
7c
:
35
:
a5
:
01
root
@
storm_3
The
key'
s
randomart
image
is
:
+
--
[
RSA
2048
]
--
--
+
|
|
|
E
|
|
o
o
.
.
|
|
+
o
o
.
.
.
|
|
.
.
.
S
+
|
|
o
+
.
|
|
.
.
.
.
.
|
|
+
++
.
.
.
|
|
.
B
+
o
+
o
|
+
--
--
--
--
--
--
--
--
-
+
|
执行完上面命令后会在各个主机的 ~/.ssh/ 目录下会生成 id_rsa.pub 文件。将3台机子中的公钥都拷贝到一个文件中并且让3台机子的这个文件内容都一样
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
# storm_1 节点
[
root
@
storm_1
~
]
# cat ~/.ssh/id_rsa.pub
ssh
-
rsa
AAAAB3NzaC1yc2EAAAADAQABAAABAQD0z8u8K0wGWLhhzxcztokzVWHqKf1O5PScjkIvXFh2AnEqZz
+
d5
/
LqyT6qDi1T8p
+
k4UHCkgmRqWZbG
+
LEtzQEjE3
/
Guya4uEP5g8MGvnLUSQQWS5oQN6EAq2fQ7G806fipQCEKWETF7axk6We1NNUjO27c071OMQ2JXM7PLVQACaUcaI8sJg3uHOs7Bcb871tyiuXtVygpyjJEBxvzpuxcjEJI
/
C
/
Yu
+
q28KXRfSWblJ7hIN7c8eIGYumTi4vSKo3Rwwm5UGvBIopK8Xc4SmvrZ6jpHInR2YLQsEznrcR9MprhHXPeFgnoJ3vCLeXbOesmH
+
h6Cb4UJChUR7owWKr
root
@
storm_1
# storm_2 节点
[
root
@
storm_2
~
]
# cat ~/.ssh/id_rsa.pub
ssh
-
rsa
AAAAB3NzaC1yc2EAAAADAQABAAABAQC
/
n9bY6jD8Z2mkgZLO9meXAMNvDt
/
YJRpcTM57ko2p9Cmm4c
+
CgQzstBExOAciQR9ckranLj8k
/
GYDBL5jBIBjquvVEtA06sCp6vGsvUOOOg07VgrmpbEvGNovNa8bfVOXR5cSbqwoesPu33wG43WXDdpD7vKU9YrqUyNXj1xPi
+
xTQwWkUMz9zEH8zwYuhD7pglP7iJsvzl
/
GpJRA5kwlPj0PWOLocq8D26pNSMiP034Ah9bojpM6jnbFT4lXeV85PdCABhcqyLZVNiKqU
/
Yozx1Ui9UsXfPLcHl1SnvIOBFRIaih5WzZ0CMAENXzjrfSxvrFGCYLrwORO
/
uSJc0t
root
@
storm_2
# storm_3 节点
[
root
@
storm_3
~
]
# cat ~/.ssh/id_rsa.pub
ssh
-
rsa
AAAAB3NzaC1yc2EAAAADAQABAAABAQCzwB3Qq0ElUY6EDRYQF5NupPtQ6hILzzDrVp9GGdavdsjxlO1kD5LroeP2s94A38u0jbXiEYJZhNprfA
+
a
+
UuT6DtVVIIl9
/
gPrNlRUFLy
+
8vbzhN9G8hsqcB0nb3VNtnMJGsS9QyOmOieqp4fW15HZn0jQIS
+
TgmgaMeaMlK8LV5cO0S4sCjPTbtXMDKZ
/
oNWFenZ143Ul4ViAPudLm9o6ik4UkFaP847cxyKy
/
jgpDdEQBibRucrTiQWoJ
/
uhiHH020MqEv6H2ZbmjOXbEQLFo8b6feSJSp0RaFZuook0CNs88QXxSKRw
+
kKEDlEZGCUuvFHLfIzV7C4PExEViml
root
@
storm_3
# 每个节点中的 authorized_keys 文件内容
[
root
@
storm_1
~
]
# cat ~/.ssh/authorized_keys
ssh
-
rsa
AAAAB3NzaC1yc2EAAAADAQABAAABAQD0z8u8K0wGWLhhzxcztokzVWHqKf1O5PScjkIvXFh2AnEqZz
+
d5
/
LqyT6qDi1T8p
+
k4UHCkgmRqWZbG
+
LEtzQEjE3
/
Guya4uEP5g8MGvnLUSQQWS5oQN6EAq2fQ7G806fipQCEKWETF7axk6We1NNUjO27c071OMQ2JXM7PLVQACaUcaI8sJg3uHOs7Bcb871tyiuXtVygpyjJEBxvzpuxcjEJI
/
C
/
Yu
+
q28KXRfSWblJ7hIN7c8eIGYumTi4vSKo3Rwwm5UGvBIopK8Xc4SmvrZ6jpHInR2YLQsEznrcR9MprhHXPeFgnoJ3vCLeXbOesmH
+
h6Cb4UJChUR7owWKr
root
@
storm_1
ssh
-
rsa
AAAAB3NzaC1yc2EAAAADAQABAAABAQC
/
n9bY6jD8Z2mkgZLO9meXAMNvDt
/
YJRpcTM57ko2p9Cmm4c
+
CgQzstBExOAciQR9ckranLj8k
/
GYDBL5jBIBjquvVEtA06sCp6vGsvUOOOg07VgrmpbEvGNovNa8bfVOXR5cSbqwoesPu33wG43WXDdpD7vKU9YrqUyNXj1xPi
+
xTQwWkUMz9zEH8zwYuhD7pglP7iJsvzl
/
GpJRA5kwlPj0PWOLocq8D26pNSMiP034Ah9bojpM6jnbFT4lXeV85PdCABhcqyLZVNiKqU
/
Yozx1Ui9UsXfPLcHl1SnvIOBFRIaih5WzZ0CMAENXzjrfSxvrFGCYLrwORO
/
uSJc0t
root
@
storm_2
ssh
-
rsa
AAAAB3NzaC1yc2EAAAADAQABAAABAQCzwB3Qq0ElUY6EDRYQF5NupPtQ6hILzzDrVp9GGdavdsjxlO1kD5LroeP2s94A38u0jbXiEYJZhNprfA
+
a
+
UuT6DtVVIIl9
/
gPrNlRUFLy
+
8vbzhN9G8hsqcB0nb3VNtnMJGsS9QyOmOieqp4fW15HZn0jQIS
+
TgmgaMeaMlK8LV5cO0S4sCjPTbtXMDKZ
/
oNWFenZ143Ul4ViAPudLm9o6ik4UkFaP847cxyKy
/
jgpDdEQBibRucrTiQWoJ
/
uhiHH020MqEv6H2ZbmjOXbEQLFo8b6feSJSp0RaFZuook0CNs88QXxSKRw
+
kKEDlEZGCUuvFHLfIzV7C4PExEViml
root
@
storm_3
|
- 在3台机子上创建config文件(3台机子都要执行)
1
|
[
root
@
storm_1
wordcount
]
# touch /root/.ssh/config
|
- 下载 lein 文件到 /usr/local/bin 目录中,授予可执行权限(3台机子都要执行)
1
2
3
|
[
root
@
storm_1
wordcount
]
# wget https://raw.githubusercontent.com/technomancy/leiningen/stable/bin/lein
[
root
@
storm_1
wordcount
]
# mv lein /usr/local/bin/
[
root
@
storm_1
wordcount
]
# chmod 755 /usr/local/bin/lein
|
- 安装streamparse(3台机子都要执行)
1
|
[
root
@
storm_1
wordcount
]
# pip install streamparse
|
- 创建storm_project 目录,并且开始一个简单的Storm项目(在storm_2上操作),这边不要再Storm启动的Nimbus节点上创建,因为到时候运行Storm项目会有端口上的冲突。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
[
root
@
storm_2
~
]
# mkdir -p /u01/storm_project
[
root
@
storm_2
~
]
# cd /u01/storm_project/
[
root
@
storm_2
storm_project
]
# pwd
/
u01
/
storm
_project
[
root
@
storm_2
~
]
# sparse quickstart wordcount
Creating
your
wordcount
streamparse
project
.
.
.
create
wordcount
create
wordcount
/
.
gitignore
create
wordcount
/
config
.
json
create
wordcount
/
fabfile
.
py
create
wordcount
/
project
.
clj
create
wordcount
/
README
.
md
create
wordcount
/
src
create
wordcount
/
src
/
bolts
/
create
wordcount
/
src
/
bolts
/
__init__
.
py
create
wordcount
/
src
/
bolts
/
wordcount
.
py
create
wordcount
/
src
/
spouts
/
create
wordcount
/
src
/
spouts
/
__init__
.
py
create
wordcount
/
src
/
spouts
/
words
.
py
create
wordcount
/
topologies
create
wordcount
/
topologies
/
wordcount
.
py
create
wordcount
/
virtualenvs
create
wordcount
/
virtualenvs
/
wordcount
.
txt
Done
.
Try
running
your
topology
locally
with
:
cd
wordcount
sparse
run
|
- 设置json配置文件(在storm_2上操作)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
[
root
@
storm_2
wordcount
]
# cat /u01/storm_project/wordcount/config.json
{
"library"
:
""
,
"topology_specs"
:
"topologies/"
,
"virtualenv_specs"
:
"virtualenvs/"
,
"envs"
:
{
"prod"
:
{
"user"
:
"root"
,
"nimbus"
:
"storm_1"
,
"workers"
:
[
"storm_1"
,
"storm_2"
,
"storm_3"
]
,
"log"
:
{
"path"
:
"/tmp/storm/stream/log"
,
"file"
:
"pystorm_{topolopy_name}_{component_name}_{task_id}_{pid}.log"
,
"max_bytes"
:
1000000
,
"backup_count"
:
10
,
"level"
:
"info"
}
,
"use_ssh_for_nimbus"
:
true
,
"virtualenv_root"
:
"/tmp/storm/stream/virtualenvs"
}
}
}
|
- 创建相关目录(3个机器上都需要执行)
1
2
|
[
root
@
storm_1
wordcount
]
# mkdir -p /tmp/storm/stream/log
[
root
@
storm_1
wordcount
]
# mkdir -p /tmp/storm/stream/virtualenvs
|
- 将wordcount程序提交到Storm集群上(在storm_2上操作)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
[
root
@
storm_2
wordcount
]
# pwd
/
u01
/
storm_project
/
wordcount
[
root
@
storm_2
wordcount
]
# sparse submit
[
storm_1
]
Executing
task
'_create_or_update_virtualenv'
[
storm_2
]
Executing
task
'_create_or_update_virtualenv'
.
.
.
omit
.
.
.
[
storm_1
]
run
:
rm
/
tmp
/
streamparse_requirements
-
oD8qdm4We
.
txt
[
storm_3
]
out
:
[
storm_3
]
run
:
rm
/
tmp
/
streamparse_requirements
-
5greXfqjW.txt
Cleaning
from
prior
builds
.
.
.
# 需要敲回车键
Creating
topology
Uber
-
JAR
.
.
.
# 需要敲回车键
Uber
-
JAR
created
:
/
u01
/
storm_project
/
wordcount
/
_build
/
wordcount
-
0.0.1
-
SNAPSHOT
-
standalone
.
jar
Deploying
"wordcount"
topology
.
.
.
ssh
tunnel
to
Nimbus
storm_1
:
6627
established
.
Routing
Python
logging
to
/
tmp
/
storm
/
stream
/
log
.
Running
lein
command
to
submit
topology
to
nimbus
:
lein
run
-
m
streamparse
.
commands
.
submit_topology
/
-
main
topologies
/
wordcount
.
clj
--
option
'topology.workers=2'
--
option
'topology.acker.executors=2'
--
option
'topology.python.path="/tmp/storm/stream/virtualenvs/wordcount/bin/python"'
--
option
'streamparse.log.path="/tmp/storm/stream/log"'
--
option
'streamparse.log.max_bytes=1000000'
--
option
'streamparse.log.backup_count=10'
--
option
'streamparse.log.level="info"'
WARNING
:
You'
re
currently
running
as
root
;
probably
by
accident
.
Press
control
-
C
to
abort
or
Enter
to
continue
as
root
.
Set
LEIN_ROOT
to
disable
this
warning
.
# 需要敲回车键
{
:
option
{
streamparse
.
log
.
level
info
,
streamparse
.
log
.
backup
_count
10
,
streamparse
.
log
.
max
_bytes
1000000
,
streamparse
.
log
.
path
/
tmp
/
storm
/
stream
/
log
,
topology
.
python
.
path
/
tmp
/
storm
/
stream
/
virtualenvs
/
wordcount
/
bin
/
python
,
topology
.
acker
.
executors
2
,
topology
.
workers
2
}
,
:
debug
false
,
:
port
6627
,
:
host
localhost
,
:
help
false
}
1604
[
main
]
INFO
backtype
.
storm
.
StormSubmitter
-
Jar
not
uploaded
to
master
yet
.
Submitting
jar
.
.
.
1620
[
main
]
INFO
backtype
.
storm
.
StormSubmitter
-
Uploading
topology
jar
/
u01
/
storm_project
/
wordcount
/
_build
/
wordcount
-
0.0.1
-
SNAPSHOT
-
standalone
.
jar
to
assigned
location
:
/
u01
/
storm
/
status
/
nimbus
/
inbox
/
stormjar
-
03200d7a
-
dec1
-
44a6
-
b0f7
-
e775d0227864
.
jar
3853
[
main
]
INFO
backtype
.
storm
.
StormSubmitter
-
Successfully
uploaded
topology
jar
to
assigned
location
:
/
u01
/
storm
/
status
/
nimbus
/
inbox
/
stormjar
-
03200d7a
-
dec1
-
44a6
-
b0f7
-
e775d0227864
.
jar
3854
[
main
]
INFO
backtype
.
storm
.
StormSubmitter
-
Submitting
topology
wordcount
in
distributed
mode
with
conf
{
"streamparse.log.backup_count"
:
10
,
"streamparse.log.path"
:
"\/tmp\/storm\/stream\/log"
,
"topology.python.path"
:
"\/tmp\/storm\/stream\/virtualenvs\/wordcount\/bin\/python"
,
"topology.debug"
:
false
,
"nimbus.thrift.port"
:
6627
,
"topology.max.spout.pending"
:
5000
,
"nimbus.host"
:
"localhost"
,
"topology.workers"
:
2
,
"topology.acker.executors"
:
2
,
"streamparse.log.max_bytes"
:
1000000
,
"streamparse.log.level"
:
"info"
,
"topology.message.timeout.secs"
:
60
}
4487
[
main
]
INFO
backtype
.
storm
.
StormSubmitter
-
Finished
submitting
topology
:
wordcount
|
如果输出类似上面的信息就算是部署完成了。
- 确认wordcount程序已经部署到了 Storm中
- 停止Storm中的wordcount程序
1
2
3
4
5
6
7
8
9
|
[
root
@
storm_2
wordcount
]
# pwd
/
u01
/
storm_project
/
wordcount
[
root
@
storm_2
wordcount
]
# sparse kill -n wordcount
WARNING
:
You'
re
currently
running
as
root
;
probably
by
accident
.
Press
control
-
C
to
abort
or
Enter
to
continue
as
root
.
Set
LEIN_ROOT
to
disable
this
warning
.
5180
[
main
]
INFO
backtype
.
storm
.
thrift
-
Connecting
to
Nimbus
at
localhost
:
6627
Killed
topology
:
wordcount
|
出现上面信息就说明wordcount程序已经从Storm集群中停止并移除了。
22.4.9. streamparse代码编写
由于这是示例程序,我们就在之前创建好的wordcount项目中修改代码。在这里我们只需要修改spout和bolt的代码就好。
这边我们需要安装Python Kafka和Python MongoDB的相关模块,执行如下命令:
1
2
3
4
5
6
|
# 在操作系统自带的Python中安装,主要是为了使用sparse run时会调用
pip
install
pykafka
pip
install
pymongo
# 在streamparse Storm Python虚拟环境中安装(sparse submit)
/
tmp
/
storm
/
stream
/
virtualenvs
/
wordcount
/
bin
/
pip
install
pykafka
/
tmp
/
storm
/
stream
/
virtualenvs
/
wordcount
/
bin
/
pip
install
pymongo
|
- words.py代码(spout)
words.py的功能就是不断消费kafka产生的消息,并且发送(emit)下面一个接收者(spout|bolt)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
[
root
@
storm_2
spouts
]
# pwd
/
u01
/
storm_project
/
wordcount
/
src
/
spouts
[
root
@
storm_2
spouts
]
#
[
root
@
storm_2
spouts
]
#
[
root
@
storm_2
spouts
]
# cat words.py
# -*- coding:utf-8 -*-
from
__future__
import
absolute_import
,
print_function
,
unicode_literals
import
itertools
from
streamparse
.
spout
import
Spout
from
pykafka
import
KafkaClient
import
simplejson
as
json
import
sys
reload
(
sys
)
sys
.
setdefaultencoding
(
'utf-8'
)
class
WordSpout
(
Spout
)
:
def
initialize
(
self
,
stormconf
,
context
)
:
# self.words = itertools.cycle(['dog', 'cat',
# 'zebra', 'elephant'])
client
=
KafkaClient
(
hosts
=
"10.10.10.11:9092"
)
topic
=
client
.
topics
[
b
"test"
]
self
.
balanced_consumer
=
topic
.
get_balanced_consumer
(
consumer_group
=
b
"test_group"
,
auto_commit_enable
=
True
,
zookeeper_connect
=
"storm_1:2181,storm_2:2181,storm_3:2181"
)
def
next_tuple
(
self
)
:
# word = next(self.words)
# self.emit([word])
message
=
self
.
balanced_consumer
.
consume
(
)
# Logstash字符串转化为dict
log_info
=
json
.
loads
(
message
.
value
)
word
=
log_info
[
"message"
]
with
open
(
"/tmp/storm.log"
,
"a"
)
as
f
:
f
.
write
(
word
)
self
.
emit
(
[
word
]
)
|
- py代码
wordcount.py主要是实现了,接收从words.py发送的信息(json字符串),并将接收的到信息解析成转化成python的字典类型,分析数据存放到MongoDB(10.10.10.12)中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
|
[
root
@
storm_2
bolts
]
# pwd
/
u01
/
storm_project
/
wordcount
/
src
/
bolts
[
root
@
storm_2
bolts
]
#
[
root
@
storm_2
bolts
]
# cat wordcount.py
# -*- coding:utf-8 -*-
from
__future__
import
absolute_import
,
print_function
,
unicode_literals
from
collections
import
Counter
from
streamparse
.
bolt
import
Bolt
import
simplejson
as
json
from
pymongo
import
MongoClient
import
sys
reload
(
sys
)
sys
.
setdefaultencoding
(
'utf-8'
)
class
WordCounter
(
Bolt
)
:
def
initialize
(
self
,
conf
,
ctx
)
:
# self.counts = Counter()
client
=
MongoClient
(
b
"10.10.10.12:27017,10.10.10.12:27018,10.10.10.12:27019"
,
replicaset
=
"rs_12"
)
# 获得 order_stat 数据库
self
.
db
=
client
.
shop
def
process
(
self
,
tup
)
:
# 获得从spout传过来的字符串
word
=
tup
.
values
[
0
]
# self.counts[word] += 1
# self.emit([word, self.counts[word]])
# self.log('%s: %d' % (word, self.counts[word]))
# 将spout传来的字符串解析成dict
order_info
=
json
.
loads
(
word
)
# 通过 kafka 传入的 user_name 查找相关用户统计信息
condition
=
{
"user_name"
:
order_info
[
"user_name"
]
}
order_stat_info
=
self
.
db
.
order_stat
.
find_one
(
condition
)
## 如果order_stat_info无值则插入, 有值则更新
# 1、无值情况
if
not
order_stat_info
:
order_stat_info_new
=
{
"user_name"
:
order_info
.
get
(
"user_name"
,
"Unknow"
)
,
"order_num"
:
1
,
"total_price"
:
order_info
.
get
(
"price"
,
0.00
)
,
"min_order_price"
:
order_info
.
get
(
"price"
,
0.00
)
,
"max_order_price"
:
order_info
.
get
(
"price"
,
0.00
)
,
"min_order"
:
order_info
.
get
(
"order_id"
,
0
)
,
"max_order"
:
order_info
.
get
(
"order_id"
,
0
)
,
}
self
.
db
.
order_stat
.
insert_one
(
order_stat_info_new
)
# 2、有值情况
else
:
min_order_price
=
min
(
order_stat_info
[
"min_order_price"
]
,
order_info
.
get
(
"price"
,
0.00
)
)
max_order_price
=
max
(
order_stat_info
[
"max_order_price"
]
,
order_info
.
get
(
"price"
,
0.00
)
)
min_order
=
order_stat_info
[
"min_order"
]
max_order
=
order_stat_info
[
"max_order"
]
# 设置 最小order id
if
min_order_price
==
order_info
.
get
(
"price"
,
0.00
)
:
min_order
=
order_info
.
get
(
"order_id"
,
min_order
)
# 设置 最大order id
if
max_order_price
==
order_info
.
get
(
"price"
,
0.00
)
:
max_order
=
order_info
.
get
(
"order_id"
,
max_order
)
# 构造更新的信息
order_stat_info_new
=
{
"order_num"
:
order_stat_info
[
"order_num"
]
+
1
,
"total_price"
:
order_stat_info
[
"total_price"
]
+
order_info
.
get
(
"price"
,
0.00
)
,
"min_order_price"
:
min_order_price
,
"max_order_price"
:
max_order_price
,
"min_order"
:
min_order
,
"max_order"
:
max
_order
}
# 跟新信息
self
.
db
.
order_stat
.
update_one
(
{
"_id"
:
order_stat_info
[
"_id"
]
}
,
{
"$set"
:
order_stat_info_new
}
)
|
编写好上面代码之后就需要测试运行情况了。
- 运行streamparse进行测试
由于我们还不知道我们写的代码正确性,因此需要使用sparse run来记性调试,而非使用sparse submit直接提交到Storm环境中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
[
root
@
storm_2
wordcount
]
# pwd
/
u01
/
storm_project
/
wordcount
[
root
@
storm_2
wordcount
]
#
[
root
@
storm_2
wordcount
]
# sparse run
.
.
.
Omit
.
.
.
8653
[
Thread
-
15
-
count
-
bolt
]
INFO
backtype
.
storm
.
task
.
ShellBolt
-
Launched
subprocess
with
pid
3719
8703
[
Thread
-
16
-
word
-
spout
]
INFO
backtype
.
storm
.
spout
.
ShellSpout
-
Launched
subprocess
with
pid
3717
8706
[
Thread
-
13
-
count
-
bolt
]
INFO
backtype
.
storm
.
task
.
ShellBolt
-
Start
checking
heartbeat
.
.
.
8706
[
Thread
-
13
-
count
-
bolt
]
INFO
backtype
.
storm
.
daemon
.
executor
-
Prepared
bolt
count
-
bolt
:
(
3
)
8708
[
Thread
-
15
-
count
-
bolt
]
INFO
backtype
.
storm
.
task
.
ShellBolt
-
Start
checking
heartbeat
.
.
.
8708
[
Thread
-
15
-
count
-
bolt
]
INFO
backtype
.
storm
.
daemon
.
executor
-
Prepared
bolt
count
-
bolt
:
(
4
)
8708
[
Thread
-
16
-
word
-
spout
]
INFO
backtype
.
storm
.
daemon
.
executor
-
Opened
spout
word
-
spout
:
(
5
)
8715
[
Thread
-
16
-
word
-
spout
]
INFO
backtype
.
storm
.
daemon
.
executor
-
Activating
spout
word
-
spout
:
(
5
)
8715
[
Thread
-
16
-
word
-
spout
]
INFO
backtype
.
storm
.
spout
.
ShellSpout
-
Start
checking
heartbeat
.
.
.
|
- 向Logstash(10.10.11)监听的文件中输入相关的订单信息
1
2
3
4
5
6
7
8
9
|
echo
'{"order_id":1, "price":20, "user_name":"Bob", "goods_name":"good_name2"}'
>
/
tmp
/
orders
.
log
echo
'{"order_id":2, "price":120, "user_name":"Bob", "goods_name":"good_name1"}'
>>
/
tmp
/
orders
.
log
echo
'{"order_id":3, "price":1120, "user_name":"Bob", "goods_name":"good_name4"}'
>>
/
tmp
/
orders
.
log
echo
'{"order_id":4, "price":11120, "user_name":"Bob", "goods_name":"good_name3"}'
>>
/
tmp
/
orders
.
log
echo
'{"order_id":1, "price":10, "user_name":"Tom", "goods_name":"good_name2"}'
>>
/
tmp
/
orders
.
log
echo
'{"order_id":2, "price":110, "user_name":"Tom", "goods_name":"good_name1"}'
>>
/
tmp
/
orders
.
log
echo
'{"order_id":3, "price":1110, "user_name":"Tom", "goods_name":"good_name4"}'
>>
/
tmp
/
orders
.
log
echo
'{"order_id":4, "price":11110, "user_name":"Tom", "goods_name":"good_name3"}'
>>
/
tmp
/
orders
.
log
|
- 查看MongoDB(10.10.12)中的订单统计信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
[
root
@
normal_12
~
]
# /u01/mongodb_27018/client_mongodb.sh
MongoDB
shell
version
:
3.2.5
connecting
to
:
10.10.10.12
:
27018
/
test
(
test
)
01
:
01
:
10
>
(
test
)
01
:
01
:
11
>
use
shop
switched
to
db
shop
(
shop
)
01
:
01
:
16
>
(
shop
)
01
:
22
:
32
>
db
.
order_stat
.
find
(
)
{
"_id"
:
ObjectId
(
"5734bba0172d290f86e2d2e4"
)
,
"total_price"
:
12380
,
"min_order_price"
:
20
,
"min_order"
:
1
,
"order_num"
:
4
,
"max_order_price"
:
11120
,
"user_name"
:
"Bob"
,
"max_order"
:
4
}
{
"_id"
:
ObjectId
(
"5734bbf1172d290f844d2fdc"
)
,
"total_price"
:
12230
,
"min_order_price"
:
10
,
"min_order"
:
1
,
"order_num"
:
3
,
"max_order_price"
:
11110
,
"user_name"
:
"Tom"
,
"max_order"
:
4
}
|
- 最后只要将我们的项目提交到Storm上面去就好了
1
2
3
4
|
[
root
@
storm_2
wordcount
]
# pwd
/
u01
/
storm_project
/
wordcount
[
root
@
storm_2
wordcount
]
#
[
root
@
storm_2
wordcount
]
# sparse submit
|
到这里我们就使用Python完成了Storm环境的搭建和开发。
22.4. 总结
其实许多的系统中都不纯属于的OLTP或者OLAP,基本上是他们两个的结合体。当OLTP中掺杂OLAP的时候有时候如果单靠数据库查询来解决问题,这样就会造成OLTP系统变的慢(因为查询变大而复杂)。因此,遇到这类的情况就需要在架构层面上去解决了。现在,Storm和Spark都是用于实时计算。因此,有碰到类似以上场景的朋友,可以考虑给系统换上“新装”了。
昵称: HH
QQ: 275258836
ttlsa群交流沟通(QQ群②: 6690706 QQ群③: 168085569 QQ群④: 415230207(新) 微信公众号: ttlsacom)
感觉本文内容不错,读后有收获?
逛逛衣服店,鼓励作者写出更好文章。
成长的对话版权声明:以上内容作者已申请原创保护,未经允许不得转载,侵权必究!授权事宜、对本内容有异议或投诉,敬请联系网站管理员,我们将尽快回复您,谢谢合作!