Ubuntu14.04部署分布式计算框架Dpark(1)

所用到的工具参考: https://codinglonglong.github.io/posts/fen-bu-shi-xi-tong-xiang-guan-ruan-jian-de-xia-zai-di-zhi/

1、分别在三台机器上安装Ubuntu Desktop 14.04.3,并设定静态IP,参考文章 https://codinglonglong.github.io/posts/ubuntu1404bu-shu-tornado%2Bnginx%2Bsupervisor/

2、分别在三台机器上安装ssh

>> sudo apt-get install ssh openssh-server openssh-client

在pc3上

3、搭建内存数据库

>> sudo apt-get update
>> sudo apt-get install git
>> sudo apt-get install g++ gcc build-essential gfortran
>> git clone https://github.com/douban/beansdb
>> sudo apt-get install autoconf automake libtool
>> cd beansdb/
>> sudo ./autogen.sh
>> ./configure && sudo make && sudo make install
>> sudo cp beansdb_log.conf /etc/
>> sudo mkdir /mnt/beansdb

4、建立自启动脚本

>> sudo apt-get install emacs
>> cd /usr/local/bin
>> sudo emacs startbeansdb
beansdb -u root -H /mnt/beansdb -p 7900 -d
>> sudo chmod 777 startbeansdb

5、设定无密码启动

>> sudo visudo
long ALL=NOPASSWD:/usr/local/bin/startbeansdb

6、安装beansdb的python接口

>> sudo cp /home/long/beansdb/python/dbclient.py /usr/local/lib/python2.7/dist-packages/
>> wget https://launchpad.net/libmemcached/1.0/1.0.18/+download/libmemcached-1.0.18.tar.gz
>> tar zxvf libmemcached-1.0.18.tar.gz
>> cd libmemcached-1.0.18/
>> ./configure
>> sudo make
>> sudo make install
>> git clone https://github.com/douban/python-libmemcached
>> cd python-libmemcached/
>> sudo apt-get install python-pip python-dev
>> sudo python setup.py install
>> emacs ~/.profile
export LD_LIBRARY_PATH=/usr/local/lib:$LD_LIBRARY_PATH
>> source ~/.profile

7、搭建分布式文件系统服务器

>> wget http://nchc.dl.sourceforge.net/project/fuse/fuse-2.X/2.9.4/fuse-2.9.4.tar.gz
>> wget http://ppa.moosefs.com/src/moosefs-2.0.73-1.tar.gz
>> wget http://zlib.net/zlib-1.2.8.tar.gz
>> tar zxvf fuse-2.9.4.tar.gz && cd fuse-2.9.4 && ./configure --prefix=/usr && sudo make && sudo make install && cd ..
>> tar zxvf zlib-1.2.8.tar.gz && cd zlib-1.2.8 && ./configure && sudo make && sudo make install && cd ..
>> sudo useradd mfs -M -s /sbin/nologin
>> sudo apt-get install libpcap-dev pkg-config
>> tar zxvf moosefs-2.0.73-1.tar.gz && cd moosefs-2.0.73 && ./configure --prefix=/usr/local/mfs --with-default-user=mfs --with-default-group=mfs --enable-mfsmount && sudo make && sudo make install
>> cd /usr/local/mfs/etc/mfs
>> sudo cp mfsmaster.cfg.dist mfsmaster.cfg
>> sudo emacs mfsmaster.cfg
MATOCS_LISTEN_HOST = 192.168.1.203
>> sudo cp mfsexports.cfg.dist mfsexports.cfg
>> sudo cp mfstopology.cfg.dist mfstopology.cfg
>> cd /usr/local/mfs/etc/mfs
>> sudo cp mfschunkserver.cfg.dist mfschunkserver.cfg
>> sudo emacs mfschunkserver.cfg
MASTER_HOST = 192.168.1.203
>> sudo cp mfshdd.cfg.dist mfshdd.cfg
>> sudo emacs mfshdd.cfg
# 删除Examples:以下的内容
# 在最后添加
/usr/local/mfs/var/mfs
>> sudo chown -R mfs:mfs /usr/local/mfs/var/mfs
>> cd /usr/local/mfs/etc/mfs
>> sudo cp mfsmount.cfg.dist mfsmount.cfg
>> sudo emacs mfsmount.cfg
# Example:
#
/mnt/mfs
>> sudo mkdir /mnt/mfs
>> sudo chown -R mfs:mfs /mnt/mfs

8、建立自启动脚本

>> cd /usr/local/bin
>> sudo emacs startmfsmaster
cd /usr/local/mfs/sbin
./mfsmaster -a
>> sudo chmod 777 startmfsmaster
>> sudo emacs startmfscgi
cd /usr/local/mfs/sbin
./mfscgiserv
>> sudo chmod 777 startmfscgi
>> cd /usr/local/bin
>> sudo emacs startmfschunk
cd /usr/local/mfs/sbin/
./mfschunkserver start
>> sudo chmod 777 startmfschunk
>> cd /usr/local/bin
>> sudo emacs startmfsmount
cd /usr/local/mfs/bin/
./mfsmount -H 192.168.1.203 -o nonempty
>> sudo chmod 777 startmfsmount

9、设定无密码启动

>> sudo visudo
long ALL=NOPASSWD:/usr/local/bin/startmfsmaster
long ALL=NOPASSWD:/usr/local/bin/startmfscgi
long ALL=NOPASSWD:/usr/local/bin/startmfschunk
long ALL=NOPASSWD:/usr/local/bin/startmfsmount

10、安装分布式资源调度框架

>> wget http://archive.apache.org/dist/mesos/0.22.1/mesos-0.22.1.tar.gz  (这里的版本最高就到0.22.1,要不然pymesos无法支持)
>> tar zxf mesos-0.22.1.tar.gz
>> sudo apt-get install build-essential openjdk-6-jdk python-dev python-boto libcurl4-nss-dev libsasl2-dev maven libapr1-dev libsvn-dev autoconf libtool
>> cd mesos-0.22.1
>> ./bootstrap
>> mkdir build
>> cd build
>> ../configure --prefix=/home/long/mesos
>> sudo make
>> sudo make install

11、建立自启动脚本

>> cd /usr/local/bin
>> sudo emacs startmesosmaster
cd /home/long/mesos/sbin
./mesos-master --ip=192.168.1.203 --work_dir=/var/lib/mesos --quiet &
>> sudo chmod 777 startmesosmaster
>> sudo emacs startmesosslave
cd /home/long/mesos/sbin
./mesos-slave --master=192.168.1.203:5050 --quiet &
>> sudo chmod 777 startmesosslave

12、设定无密码启动

>> sudo visudo
long ALL=NOPASSWD:/usr/local/bin/startmesosmaster
long ALL=NOPASSWD:/usr/local/bin/startmesosslave

13、安装pymesos

>> git clone https://github.com/douban/pymesos.git
>> cd pymesos
>> sudo apt-get install python-pip
>> sudo python setup.py install

14、安装dpark

>> sudo apt-get install git libzmq-dev python-pip python-dev
>> git clone https://github.com/douban/dpark.git
>> cd dpark
>> sudo apt-get install python-psutil psutils
>> sudo python setup.py install

15、修改/etc/hosts

>> sudo emacs /etc/hosts
192.168.1.201    pc1
192.168.1.202    pc2
192.168.1.203    pc3
192.168.1.204    pc4
192.168.1.211    cn001
192.168.1.212    cn002
192.168.1.213    cn003
192.168.1.214    cn004
192.168.1.215    cn005
192.168.1.216    cn006
192.168.1.217    cn007
192.168.1.218    cn008
192.168.1.219    cn009
192.168.1.220    cn010
192.168.1.221    cn011
192.168.1.222    cn012
192.168.1.241    snc001
192.168.1.242    snc002
192.168.1.243    snc003
192.168.1.244    snc004
192.168.1.210    client

16、配置SSH无密码登录

>> ssh-keygen -t rsa -P ''
>> cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
>> sudo chmod 600 ~/.ssh/authorized_keys

KVM删除virbr0

安装KVM后,通过ifconfig,看到了一个名为virbr0的虚拟网络接口。

由于在实验中,我们采用桥接方式使用虚拟机,所以这个接口是没有用的,我们通过如下方法可以将其删除。

>> virsh net-list
>> virsh net-destroy default  #这里的default是上一步输出结果中的。
>> virsh net-undefine default
>> sudo service libvirt-bin restart

重启系统即可。

分布式系统相关软件的下载地址

本文所指分布式系统包含:分布式文件系统Moosefs、分布式数据库Beansdb、分布式资源调度框架Mesos和分布式计算框架Dpark。

其中以tar.gz结尾的文件可以用wget下载,以git结尾的要用git克隆。

Python Word Count的四种方法对比

Word Count可以认为是大数据时代的Hello World。我们使用Python按照四种方案进行一篇文章的词数统计,并进行性能对比,我们已经手工去除了乱码和多余的标点。

方案1、内置map和reduce函数

# 内置map和reduce函数
print("内置map和reduce函数")
import time
start = time.clock()

from functools import reduce
with open("sample.txt", "r") as fp:
    # 读取文件
    datas = fp.readlines()
    # 使用mapreduce统计单词数目
    wordcount = reduce(lambda x, y: x + y,
                       map(lambda x: len(x.split()), datas))
    print(wordcount)

end = time.clock()
print(end - start)

运行结果:

1
2
3
>> 内置map和reduce函数
>> 767
>> 0.008342000000000002

方案2、直接计算单词个数

# 直接计算单词个数
print("直接计算单词个数")
import time
start = time.clock()

with open("sample.txt", "r") as fp:
    # 读取文件
    datas = fp.readlines()
    # 合并成一个字符串
    datas = " ".join(datas)
    # 划分成单词
    datas = datas.split()
    # 求单词数目
    wordcount = len(datas)
    print(wordcount)

end = time.clock()
print(end - start)

运行结果:

1
2
3
>> 直接计算单词个数
>> 767
>> 0.0002339999999999981

方案3、多线程MapReduce

# 多线程MapReduce
print("多线程MapReduce")
import time
start = time.clock()

import threading
from queue import Queue
from functools import reduce


class WordCount(threading.Thread):
    def __init__(self, wordstr, queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.wordstr = wordstr

    def run(self):
        self.queue.put(len(self.wordstr.split()))


with open("sample.txt", "r") as fp:
    datas = fp.readlines()
    threadlist = []
    queue = Queue()
    for x in datas:
        threadlist.append(WordCount(x, queue))
    for trd in threadlist:
        trd.start()
    for trd in threadlist:
        trd.join()
    datalist = [queue.get() for x in range(queue.qsize())]
    wordcount = reduce(lambda x, y: x + y, datalist)
    print(wordcount)

end = time.clock()
print(end - start)

注意:Python提供的Queue是线程安全的,所以这里不需要锁。

运行结果:

1
2
3
>> 多线程MapReduce
>> 767
>> 0.02518399999999999

方案4、多进程MapReduce

# 多进程MapReduce
print("多进程MapReduce")
import time
start = time.clock()

from multiprocessing import Pool
from functools import reduce


def wordcount(wordstr):
    return len(wordstr.split())


with open("sample.txt", "r") as fp:
    datas = fp.readlines()
    p = Pool()
    datalist = p.map(wordcount, datas)
    wordcount = reduce(lambda x, y: x + y, datalist)
    print(wordcount)

end = time.clock()
print(end - start)

注意:Python提供的进程池Pool自带map函数。

运行结果:

1
2
3
>> 多进程MapReduce
>> 767
>> 0.056589

性能对比十分明显: 直接计算单词个数 > 内置map和reduce函数 > 多线程MapReduce > 多进程MapReduce

可见,在单机运行环境中,MapReduce不能起到提高性能的作用。

Python多线程运行时的几种情况

本文学习Python在多线程运行时的几种情况。

看代码,这是一个标准的多线程程序:

import time
start = time.clock()

import threading


class Stu:

    def __init__(self, name):
        self.name = name

    def __str__(self):
        return self.name

    def run(self):
        time.sleep(2)
        with open("datas", "a") as fp:
            fp.writelines("puts " + str(self.name) + "\n")


def spawn(target, *args, **kw):
    t = threading.Thread(
        target=target, name=target.__name__, args=args, kwargs=kw)
    t.start()
    return t

with open("datas", "a") as fp:
    fp.writelines("main thread start.\n")
studentlist = ['a', 'b', 'c', 'd', 'e']
[spawn(Stu(x).run) for x in studentlist]
with open("datas", "a") as fp:
    fp.writelines("main thread end here.\n")

end = time.clock()
print(end - start)

我们来看程序的输出:

>> long@happytime:~/envtest$ python envtester.py
>> 0.005009
>> long@happytime:~/envtest$ cat datas
>> main thread start.
>> main thread end here.
>> puts c
>> puts a
>> puts d
>> puts b
>> puts e

线程都正常启动了,主线程最先结束,然后每个线程分别向datas中写入自己的name。这是最简单的多线程,也是没有任何控制的多线程。

下一步,我们如果设置守护线程,看看会发生什么。

import time
start = time.clock()

import threading


class Stu:

    def __init__(self, name):
        self.name = name

    def __str__(self):
        return self.name

    def run(self):
        time.sleep(2)
        with open("datas", "a") as fp:
            fp.writelines("puts " + str(self.name) + "\n")


def spawn(target, *args, **kw):
    t = threading.Thread(
        target=target, name=target.__name__, args=args, kwargs=kw)
    t.daemon = True
    t.start()
    return t

with open("datas", "a") as fp:
    fp.writelines("main thread start.\n")
studentlist = ['a', 'b', 'c', 'd', 'e']
[spawn(Stu(x).run) for x in studentlist]
with open("datas", "a") as fp:
    fp.writelines("main thread end here.\n")

end = time.clock()
print(end - start)

运行结果:

>> long@happytime:~/envtest$ python envtester.py
>> 0.005924
>> long@happytime:~/envtest$ cat datas
>> main thread start.
>> main thread end here.

我们发现,一旦将子线程设置为daemon,也就是守护线程,主线程结束后,程序就退出了,子线程也结束了。要注意的地方是,daemon要在start之前调用。

那如果我们想让子线程都结束后,主线程再继续执行,该如何实现呢?看代码:

import time
start = time.clock()

import threading


class Stu:

    def __init__(self, name):
        self.name = name

    def __str__(self):
        return self.name

    def run(self):
        time.sleep(2)
        with open("datas", "a") as fp:
            fp.writelines("puts " + str(self.name) + "\n")


def spawn(target, *args, **kw):
    t = threading.Thread(
        target=target, name=target.__name__, args=args, kwargs=kw)
    t.start()
    t.join()
    return t

with open("datas", "a") as fp:
    fp.writelines("main thread start.\n")
studentlist = ['a', 'b', 'c', 'd', 'e']
[spawn(Stu(x).run) for x in studentlist]
with open("datas", "a") as fp:
    fp.writelines("main thread end here.\n")

end = time.clock()
print(end - start)

程序运行结果:

>> long@happytime:~/envtest$ python envtester.py
>> 0.007657
>> long@happytime:~/envtest$ cat datas
>> main thread start.
>> puts a
>> puts b
>> puts c
>> puts d
>> puts e
>> main thread end here.

从结果可以明显看出,尽管子线程都需要花费时间执行,但主线程最后结束。

综上,如果想让:

1) 主线程和子线程混乱执行,使用默认的多线程即可;

2) 主线程结束后,子线程不管有没有执行完都结束,使用守护线程的方法;

3) 主线程等待子线程结束,使用join方法。