基于python的paxos算法实现

 更新时间:2019年07月03日 16:37:36   作者:charles_lun   我要评论

这篇文章主要介绍了基于python的paxos算法实现,理解一个算法最快,最深刻的做法,我觉着可能是自己手动实现,虽然项目中不用自己实现,有已经封装好的算法库,供调用,我觉着还是有必要自己亲自实践一下,需要的朋友可以参考下

理解一个算法最快,最深刻的做法,我觉着可能是自己手动实现,虽然项目中不用自己实现,有已经封装好的算法库,供调用,我觉着还是有必要自己亲自实践一下。

这里首先说明一下,python这种动态语言,对不熟悉的人可能看着比较别扭,不像java那样参数类型是固定的,所以看着会有些蛋疼。这里环境用的是python2.7。

class message:
  # command
  msg_acceptor_agree = 0 # 追随者约定
  msg_acceptor_accept = 1 # 追随者接受
  msg_acceptor_reject = 2 # 追随者拒绝-网络不通
  msg_acceptor_unaccept = 3 # 追随者网络通-不同意
  msg_accept = 4 # 接受
  msg_propose = 5 # 提议
  msg_ext_propose = 6 # 额外提议
  msg_heartbeat = 7 # 心跳,每隔一段时间同步消息
  def __init__(self, command=none):
    self.command = command
  # 把收到的消息原原路返回,作为应答消息
  def copyasreply(self, message):
    # 提议id #当前的id #发给谁 #谁发的
    self.proposalid, self.instanceid, self.to, self.source = message.proposalid, message.instanceid, message.source, message.to
    self.value = message.value # 发的信息

然后是利用socket,线程和队列实现的消息处理器:

# 基于socket传递消息,封装网络传递消息
import threading
import pickle
import socket
import queue
class messagepump(threading.thread):
  # 收取消息线程
  class mphelper(threading.thread):
    #
    def __init__(self, owner):
      self.owner = owner
      threading.thread.__init__(self)
    def run(self):
      while not self.owner.abort: # 只要所有者线程没有结束,一直接受消息
        try:
          (bytes, addr) = self.owner.socket.recvfrom(2048) # 收取消息
          msg = pickle.loads(bytes) # 读取二进制数据转化为消息
          msg.source = addr[1]
          self.owner.queue.put(msg) # 队列存入消息
        except exception as e:
          pass  def __init__(self, owner, port, timeout=2):
    threading.thread.__init__(self)
    self.owner = owner
    self.abort = false
    self.timeout = 2
    self.port = port
    self.socket = socket.socket(socket.af_inet, socket.sock_dgram) # udp通信
    self.socket.setsockopt(socket.sol_socket, socket.so_rcvbuf, 200000) # 通信参数
    self.socket.bind(("localhost", port)) # 通信地址,ip,端口
    self.socket.settimeout(timeout) # 超时设置
    self.queue = queue.queue() # 队列
    self.helper = messagepump.mphelper(self) # 接收消息  # 运行主线程
  def run(self):
    self.helper.start() # 开启收消息的线程
    while not self.abort:
      message = self.waitformessage() # 阻塞等待
      self.owner.recvmessage(message) # 收取消息  # 等待消息
  def waitformessage(self):
    try:
      msg = self.queue.get(true, 3) # 抓取数据,最多等待3s
      return msg
    except:
      return none  # 发送消息
  def sendmessage(self, message):
    bytes = pickle.dumps(message) # 转化为二进制
    address = ("localhost", message.to) # 地址ip,端口(ip,port)
    self.socket.sendto(bytes, address)
    return true
  #是否停止收取消息
  def doabort(self):
    self.abort = true

再来一个消息处理器,模拟消息的传递,延迟,丢包,其实这个类没什么卵用,这个是为模拟测试准备的

from messagepump import messagepump
import random
class adversarialmessagepump(messagepump): # 类的继承
  # 对抗消息传输,延迟消息并任意顺序传递,模拟网络的延迟,消息传送并不是顺序
  def __init__(self, owner, port, timeout=2):
    messagepump.__init__(self, owner, port, timeout) # 初始化父类
    self.messages = set() # 集合避免重复  def waitformessage(self):
    try:
      msg = self.queue.get(true, 0.1) # 从队列抓取数据
      self.messages.add(msg) # 添加消息
    except exception as e: # 处理异常
      pass
      # print(e)
    if len(self.messages) > 0 and random.random() < 0.95: # arbitrary!
      msg = random.choice(list(self.messages)) # 随机抓取消息发送
      self.messages.remove(msg) # 删除消息
    else:
      msg = none
    return msg

再来一个是记录类

# instancerecord本地记录类,主要记录追随者、领导者最高编号的协议
from paxosleaderprotocol import paxosleaderprotocol
class instancerecord:
  def __init__(self):
    self.protocols = {}
    self.highestid = (-1, -1) # (port,count)
    self.value = none  def addprotocol(self, protocol):
    self.protocols[protocol.proposalid] = protocol
    #
    if protocol.proposalid[1] > self.highestid[1] or (
        protocol.proposalid[1] == self.highestid[1] and protocol.proposalid[0] > self.highestid[0]):
      self.highestid = protocol.proposalid # 取得编号最大的协议  def getprotocol(self, protocolid):
    return self.protocols[protocolid]  def cleanprotocols(self):
    keys = self.protocols.keys()
    for k in keys:
      protocol = self.protocols[k]
      if protocol.state == paxosleaderprotocol.state_accepted:
        print("删除协议")
        del self.protocols[k]

下面就是acceptor的实现:

# 追随者
from messagepump import messagepump
from message import message
from instancerecord import instancerecord
from paxosacceptorprotocol import paxosacceptorprotocol
class paxosacceptor:
  def __init__(self, port, leaders):
    self.port = port
    self.leaders = leaders
    self.instances = {} # 接口列表
    self.msgpump = messagepump(self, self.port) # 消息传递器
    self.failed = false  # 开始消息传送
  def start(self):
    self.msgpump.start()  # 停止
  def stop(self):
    self.msgpump.doabort()  # 失败
  def fail(self):
    self.failed = true  def recover(self):
    self.failed = false  # 发送消息
  def sendmessage(self, message):
    self.msgpump.sendmessage(message)  # 收消息,只收取为提议的消息
  def recvmessage(self, message):
    if message == none:
      return
    if self.failed: # 失败状态不收取消息
      return    if message.command == message.msg_propose: # 判断消息是否为提议
      if message.instanceid not in self.instances:
        record = instancerecord() # 记录器
        self.instances[message.instanceid] = record
      protocol = paxosacceptorprotocol(self) # 创建协议
      protocol.recvproposal(message) # 收取消息
      self.instances[message.instanceid].addprotocol(protocol)
    else:
      self.instances[message.instanceid].getprotocol(message.proposalid).dotransition(message)  # 通知客户端,
  def notifyclient(self, protocol, message):
    if protocol.state == paxosacceptorprotocol.state_proposal_accepted: # 提议被接受,通知
      self.instances[protocol.instanceid].value = message.value # 储存信息
      print(u"协议被客户端接受 %s" % message.value)  # 获取最高同意的建议
  def gethighestagreedproposal(self, instance):
    return self.instances[instance].highestid # (port,count)  # 获取接口数据
  def getinstancevalsue(self, instance):
    return self.instances[instance].value

那再看下acceptorprotocol的实现:

from message import message
class paxosacceptorprotocol(object):
  # state variables
  state_undefined = -1 # 协议没有定义的情况0
  state_proposal_received = 0 # 收到消息
  state_proposal_rejected = 1 # 拒绝链接
  state_proposal_agreed = 2 # 同意链接
  state_proposal_accepted = 3 # 同意请求
  state_proposal_unaccepted = 4 # 拒绝请求  def __init__(self, client):
    self.client = client
    self.state = paxosacceptorprotocol.state_undefined  # 收取,只处理协议类型的消息
  def recvproposal(self, message):    if message.command == message.msg_propose: # 协议
      self.proposalid = message.proposalid
      self.instanceid = message.instanceid
      (port, count) = self.client.gethighestagreedproposal(message.instanceid) # 端口,协议内容的最高编号
      # 检测编号处理消息协议
      # 判断协议是否最高 
      if count < self.proposalid[1] or (count == self.proposalid[1] and port < self.proposalid[0]):
        self.state = paxosacceptorprotocol.state_proposal_agreed # 协议同意
        print("同意协议:%s, %s " % (message.instanceid, message.value))
        value = self.client.getinstancevalsue(message.instanceid)
        msg = message(message.msg_acceptor_agree) # 同意协议
        msg.copyasreply(message)
        msg.value = value
        msg.sequence = (port, count)
        self.client.sendmessage(msg) # 发送消息
      else: # 不再接受比最高协议小的提议
        self.state = paxosacceptorprotocol.state_proposal_rejected
      return self.proposalid
    else:
      # 错误重试
      pass
  # 过度
  def dotransition(self, message): # 如果当前协议状态是接受连接,消息类型是接受
    if self.state == paxosacceptorprotocol.state_proposal_agreed and message.command == message.msg_accept:
      self.state = paxosacceptorprotocol.state_proposal_accepted # 接收协议
      msg = message(message.msg_acceptor_accept) # 创造消息
      msg.copyasreply(message) # 拷贝并回复
      for l in self.client.leaders:
        msg.to = l
        self.client.sendmessage(msg) # 给领导发送消息
      self.notifyclient(message) # 通知自己
      return true
    raise exception("并非预期的状态和命令")  # 通知 自己客户端
  def notifyclient(self, message):
    self.client.notifyclient(self, message)

接着看下leader和leaderprotocol实现:

# 领导者
import threading
import queue
import time
from message import message
from messagepump import messagepump
from instancerecord import instancerecord
from paxosleaderprotocol import paxosleaderprotocol
class paxosleader:
  # 定时监听
  class heartbeatlistener(threading.thread):
    def __init__(self, leader):
      self.leader = leader
      self.queue = queue.queue() # 消息队列
      self.abort = false
      threading.thread.__init__(self)    def newhb(self, message):
      self.queue.put(message)    def doabort(self):
      self.abort = true    def run(self): # 读取消息
      elapsed = 0
      while not self.abort:
        s = time.time()
        try:
          hb = self.queue.get(true, 2)
          # 设定规则,谁的端口号比较高,谁就是领导
          if hb.source > self.leader.port:
            self.leader.setprimary(false)
        except:
          self.leader.setprimary(true)  # 定时发送
  class heartbeatsender(threading.thread):
    def __init__(self, leader):
      threading.thread.__init__(self)
      self.leader = leader
      self.abort = false
    def doabort(self):
      self.abort = true
    def run(self):
      while not self.abort:
        time.sleep(1)
        if self.leader.isprimary:
          msg = message(message.msg_heartbeat)
          msg.source = self.leader.port
          for leader in self.leader.leaders:
            msg.to = leader
            self.leader.sendmessage(msg)  def __init__(self, port, leaders=none, acceptors=none):
    self.port = port
    if leaders == none:
      self.leaders = []
    else:
      self.leaders = leaders
    if acceptors == none:
      self.acceptors = []
    else:
      self.acceptors = acceptors
    self.group = self.leaders + self.acceptors # 集合合并
    self.isprimary = false # 自身是不是领导
    self.proposalcount = 0
    self.msgpump = messagepump(self, port) # 消息传送器
    self.instances = {}
    self.hblistener = paxosleader.heartbeatlistener(self) # 监听
    self.hbsender = paxosleader.heartbeatsender(self) # 发送心跳
    self.highestinstance = -1 # 协议状态
    self.stoped = true # 是否正在运行
    self.lasttime = time.time() # 最后一次时间  def sendmessage(self, message):
    self.msgpump.sendmessage(message)  def start(self):
    self.hbsender.start()
    self.hblistener.start()
    self.msgpump.start()
    self.stoped = false  def stop(self):
    self.hbsender.doabort()
    self.hblistener.doabort()
    self.msgpump.doabort()
    self.stoped = true  def setprimary(self, primary): # 设置领导者
    if self.isprimary != primary:
      # only print if something's changed
      if primary:
        print(u"我是leader%s" % self.port)
      else:
        print(u"我不是leader%s" % self.port)
    self.isprimary = primary  # 获取所有的领导下面的追随者
  def getgroup(self):
    return self.group  def getleaders(self):
    return self.leaders  def getacceptors(self):
    return self.acceptors  # 必须获得1/2以上的人支持
  def getquorumsize(self):
    return (len(self.getacceptors()) / 2) + 1  def getinstancevalsue(self, instanceid):
    if instanceid in self.instances:
      return self.instances[instanceid].value
    return none  def gethistory(self): # 历史记录
    return [self.getinstancevalsue(i) for i in range(1, self.highestinstance + 1)]  # 抓取同意的数量
  def getnumaccpted(self):
    return len([v for v in self.gethistory() if v != none])  # 抓取空白时间处理下事务
  def findandfillgaps(self):
    for i in range(1, self.highestinstance):
      if self.getinstancevalsue(i) == none:
        print("填充空白", i)
        self.newproposal(0, i)
    self.lasttime = time.time()  # 采集无用信息
  def garbagecollect(self):
    for i in self.instances:
      self.instances[i].cleanprotocols()  # 通知领导
  def recvmessage(self, message):
    if self.stoped:
      return
    if message == none:
      if self.isprimary and time.time() - self.lasttime > 15.0:
        self.findandfillgaps()
        self.garbagecollect()
      return
    #处理心跳信息
    if message.command == message.msg_heartbeat:
      self.hblistener.newhb(message)
      return true
    #处理额外的提议
    if message.command == message.msg_ext_propose:
      print("额外的协议", self.port, self.highestinstance)
      if self.isprimary:
        self.newproposal(message.value)
      return true    if self.isprimary and message.command != message.msg_acceptor_accept:
      self.instances[message.instanceid].getprotocol(message.proposalid).dotransition(message)    if message.command == message.msg_acceptor_accept:
      if message.instanceid not in self.instances:
        self.instances[message.instanceid] = instancerecord()
      record = self.instances[message.instanceid]
      if message.proposalid not in record.protocols:#创建协议
        protocol = paxosleaderprotocol(self)
        protocol.state = paxosleaderprotocol.state_agreed
        protocol.proposalid = message.proposalid
        protocol.instanceid = message.instanceid
        protocol.value = message.value
        record.addprotocol(protocol)
      else:
        protocol = record.getprotocol(message.proposalid)      protocol.dotransition(message)    return true
  # 新建提议
  def newproposal(self, value, instance=none):
    protocol = paxosleaderprotocol(self)
    if instance == none: # 创建协议标号
      self.highestinstance += 1
      instanceid = self.highestinstance
    else:
      instanceid = instance
    self.proposalcount += 1
    id = (self.port, self.proposalcount)
    if instanceid in self.instances:
      record = self.instances[instanceid]
    else:
      record = instancerecord()
      self.instances[instanceid] = record
    protocol.propose(value, id, instanceid)
    record.addprotocol(protocol)  def notifyleader(self, protocol, message):
    if protocol.state == paxosleaderprotocol.state_accepted:
      print("协议接口%s被%s接受" % (message.instanceid, message.value))
      self.instances[message.instanceid].accepted = true
      self.instances[message.instanceid].value = message.value
      self.highestinstance = max(message.instanceid, self.highestinstance)
      return
    if protocol.state == paxosleaderprotocol.state_rejected: # 重新尝试
      self.proposalcount = max(self.proposalcount, message.highestpid[1])
      self.newproposal(message.value)
      return true
    if protocol.state == paxosleaderprotocol.state_unaccepted:
      pass

leaderprotocol实现:

from message import message
class paxosleaderprotocol(object):
  state_undefined = -1 # 协议没有定义的情况0
  state_proposed = 0 # 协议消息
  state_rejected = 1 # 拒绝链接
  state_agreed = 2 # 同意链接
  state_accepted = 3 # 同意请求
  state_unaccepted = 4 # 拒绝请求
  def __init__(self, leader):
    self.leader = leader
    self.state = paxosleaderprotocol.state_undefined
    self.proposalid = (-1, -1)
    self.agreecount, self.acceptcount = (0, 0)
    self.rejectcount, self.unacceptcount = (0, 0)
    self.instanceid = -1
    self.highestseen = (0, 0)
  # 提议
  def propose(self, value, pid, instanceid):
    self.proposalid = pid
    self.value = value
    self.instanceid = instanceid
    message = message(message.msg_propose)
    message.proposalid = pid
    message.instanceid = instanceid
    message.value = value
    for server in self.leader.getacceptors():
      message.to = server
      self.leader.sendmessage(message)
    self.state = paxosleaderprotocol.state_proposed    return self.proposalid  # 過度
  def dotransition(self, message):
    # 根據狀態運行協議
    if self.state == paxosleaderprotocol.state_proposed:
      if message.command == message.msg_acceptor_agree:
        self.agreecount += 1
        if self.agreecount >= self.leader.getquorumsize(): # 选举
          print(u"达成协议的法定人数,最后的价值回答是:%s" % message.value)
          if message.value != none:
            if message.sequence[0] > self.highestseen[0] or (
                message.sequence[0] == self.highestseen[0] and message.sequence[1] > self.highestseen[
              1]):
              self.value = message.value
              self.highestseen = message.sequence            self.state = paxosleaderprotocol.state_agreed # 同意更新
            # 发送同意消息
            msg = message(message.msg_accept)
            msg.copyasreply(message)
            msg.value = self.value
            msg.leaderid = msg.to
            for server in self.leader.getacceptors():
              msg.to = server
              self.leader.sendmessage(msg)
            self.leader.notifyleader(self, message)
          return true        if message.command == message.msg_acceptor_reject:
          self.rejectcount += 1
          if self.rejectcount >= self.leader.getquorumsize():
            self.state = paxosleaderprotocol.state_rejected
            self.leader.notifyleader(self, message)
          return true    if self.state == paxosleaderprotocol.state_agreed:
      if message.command == message.msg_acceptor_accept: # 同意协议
        self.acceptcount += 1
        if self.acceptcount >= self.leader.getquorumsize():
          self.state = paxosleaderprotocol.state_accepted # 接受
          self.leader.notifyleader(self, message)
      if message.command == message.msg_acceptor_unaccept:
        self.unacceptcount += 1
        if self.unacceptcount >= self.leader.getquorumsize():
          self.state = paxosleaderprotocol.state_unaccepted
          self.leader.notifyleader(self, message)

测试模块:

import socket, pickle, time
from message import message
from paxosacceptor import paxosacceptor
from paxosleader import paxosleaderif __name__ == "__main__":
  # 设定5个客户端
  numclients = 5
  clients = [paxosacceptor(port, [54321, 54322]) for port in range(64320, 64320 + numclients)]
  # 两个领导者
  leader1 = paxosleader(54321, [54322], [c.port for c in clients])
  leader2 = paxosleader(54322, [54321], [c.port for c in clients])  # 开启领导者与追随者
  leader1.start()
  leader1.setprimary(true)
  leader2.setprimary(true)
  leader2.start()
  for c in clients:
    c.start()  # 破坏,客户端不链接
  clients[0].fail()
  clients[1].fail()  # 通信
  s = socket.socket(socket.af_inet, socket.sock_dgram) # udp协议
  start = time.time()
  for i in range(1000):
    m = message(message.msg_ext_propose) # 消息
    m.value = 0 + i # 消息参数
    m.to = 54322 # 设置传递的端口
    bytes = pickle.dumps(m) # 提取的二进制数据
    s.sendto(bytes, ("localhost", m.to)) # 发送消息  while leader2.getnumaccpted() < 999:
    print("休眠的这一秒 %d " % leader2.getnumaccpted())
    time.sleep(1)
  print(u"休眠10秒")
  time.sleep(10)
  print(u"停止leaders")
  leader1.stop()
  leader2.stop()
  print(u"停止客户端")
  for c in clients:
    c.stop()
  print(u"leader1历史纪录")
  print(leader1.gethistory())
  print(u"leader2历史纪录")
  print(leader2.gethistory())
  end = time.time()
  print(u"一共用了%f秒" % (end - start))

代码确实比较长,看起来有些困难,最好还是在pycharm上看这个逻辑,可以快速定位参数指向,如果有不对的地方欢迎指正

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

相关文章

  • 讲解python的scrapy爬虫框架使用代理进行采集的方法

    讲解python的scrapy爬虫框架使用代理进行采集的方法

    这篇文章主要介绍了讲解python的scrapy爬虫框架使用代理进行采集的方法,并介绍了随机使用预先设好的user-agent来进行爬取的用法,需要的朋友可以参考下
    2016-02-02
  • python排序搜索基本算法之插入排序实例分析

    python排序搜索基本算法之插入排序实例分析

    这篇文章主要介绍了python排序搜索基本算法之插入排序,结合实例形式分析了基于比较的插入排序和基于交换的插入排序实现技巧,需要的朋友可以参考下
    2017-12-12
  • python保存字符串到文件的方法

    python保存字符串到文件的方法

    这篇文章主要介绍了python保存字符串到文件的方法,实例分析了python文件与字符串操作的相关技巧,需要的朋友可以参考下
    2015-07-07
  • python中的迭代和可迭代对象代码示例

    python中的迭代和可迭代对象代码示例

    这篇文章主要介绍了python中的迭代和可迭代对象代码示例,具有一定借鉴价值,需要的朋友可以参考下
    2017-12-12
  • 对python程序内存泄漏调试的记录

    对python程序内存泄漏调试的记录

    今天小编就为大家分享一篇对python程序内存泄漏调试的记录,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-06-06
  • 解决django模板无法使用perms变量问题的方法

    解决django模板无法使用perms变量问题的方法

    这篇文章主要给大家介绍了关于解决django模板无法使用perms变量问题的方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧。
    2017-09-09
  • python queue模块详解

    python queue模块详解

    这篇文章主要介绍了python queue模块详解,需要的朋友可以参考下
    2014-11-11
  • python 去除二维数组/二维列表中的重复行方法

    python 去除二维数组/二维列表中的重复行方法

    今天小编就为大家分享一篇python 去除二维数组/二维列表中的重复行方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2019-01-01
  • python中正则表达式 re.findall 用法

    python中正则表达式 re.findall 用法

    在python中,通过内嵌集成re模块,程序媛们可以直接调用来实现正则匹配。本文重点给大家介绍python中正则表达式 re.findall 用法,感兴趣的朋友跟随小编一起看看吧
    2018-10-10
  • 面向初学者的python编辑器mu

    面向初学者的python编辑器mu

    今天小编就为大家分享一篇关于一个面向初学者的python编辑器mu,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
    2018-10-10

最新评论