Memcache 中实现消息队列

 更新时间:2016年11月25日 16:30

class Memcache_Queue
private $memcache;
private $name;
private $prefix;
function __construct($maxSize, $name, $memcache, $prefix = "__memcache_queue__")
if ($memcache == null) {
throw new Exception("memcache object is null, new the object first.");
$this->memcache = $memcache;
$this->name = $name;
$this->prefix = $prefix;
$this->maxSize = $maxSize;
$this->front = 0;
$this->real = 0;
$this->size = 0;
function __get($name)
return $this->get($name);
function __set($name, $value)
$this->add($name, $value);
return $this;
function isEmpty()
return $this->size == 0;
function isFull()
return $this->size == $this->maxSize;
function enQueue($data)
if ($this->isFull()) {
throw new Exception("Queue is Full");
$this->set($this->real, $data);
$this->set("real", ($this->real + 1) % $this->maxSize);
return $this;
function deQueue()
if ($this->isEmpty()) {
throw new Exception("Queue is Empty");
$this->set("front", ($this->front + 1) % $this->maxSize);
return $this;
function getTop()
return $this->get($this->front);
function getAll()
return $this->getPage();
function getPage($offset = 0, $limit = 0)
if ($this->isEmpty() || $this->size < $offset) {
return null;
$keys[] = $this->getKeyByPos(($this->front + $offset) % $this->maxSize);
$num = 1;
for ($pos = ($this->front + $offset + 1) % $this->maxSize; $pos != $this->real; $pos = ($pos + 1) % $this->maxSize)
$keys[] = $this->getKeyByPos($pos);
if ($limit > 0 && $limit == $num) {
return array_values($this->memcache->get($keys));
function makeEmpty()
$keys = $this->getAllKeys();
foreach ($keys as $value) {
private function getAllKeys()
if ($this->isEmpty())
return array();
$keys[] = $this->getKeyByPos($this->front);
for ($pos = ($this->front + 1) % $this->maxSize; $pos != $this->real; $pos = ($pos + 1) % $this->maxSize)
$keys[] = $this->getKeyByPos($pos);
return $keys;
private function add($pos, $data)
$this->memcache->add($this->getKeyByPos($pos), $data);
return $this;
private function increment($pos)
return $this->memcache->increment($this->getKeyByPos($pos));
private function decrement($pos)
private function set($pos, $data)
$this->memcache->set($this->getKeyByPos($pos), $data);
return $this;
private function get($pos)
return $this->memcache->get($this->getKeyByPos($pos));
private function delete($pos)
return $this->memcache->delete($this->getKeyByPos($pos));
private function getKeyByPos($pos)
return $this->prefix . $this->name . $pos;

php  array_push 向数组增加值函数
 public static function insert(&$array, $key, $newValue, $before = true) {
  $result = false;
  $size = sizeof($array);
  for ($i=0; $i<$size; $i++) {
   $value = array_shift($array);
   if ($i==$key) {
    if ($before) {
     array_push($array, $newValue);
     array_push($array, $value);
    } else {
     array_push($array, $value);
     array_push($array, $newValue);
    $result = true;
   } else {
    array_push($array, $value);
  if (!$result) {
   array_push($array, $newValue);

* 进行写锁定的测试
* 打开线程1
$lock = new File_Lock(dirname(dirname(__FILE__)) . "/FileLock.lock");
/** 单个线程锁定的速度 1s 钟 3万次。 **/
/** 两个线程写,两万的数据 大概要 7s 钟*/
/** 一个线程写,一万的数据 大概要 3.9s 钟,居然两个文件同时写,要快一点*/
/** 不进行锁定,一个进程 写大概要 2.8s 钟,加锁是有代价的。 */
/** 不进行锁定,两个进程 分布不是很均匀,而且大多数都冲突 */
while ($lock->get() < 2) {
echo "begin to runing n";
$t1 = microtime(true);
for ($i = 0; $i < 10000; $i++)
$t2 = microtime(true) - $t1;
echo $t2;

class File_Lock
private $name;
private $handle;
private $mode;
function __construct($filename, $mode = 'a+b')
global $php_errormsg;
$this->name = $filename;
$path = dirname($this->name);
if ($path == '.' || !is_dir($path)) {
global $config_file_lock_path;
$this->name = str_replace(array("/", "\"), array("_", "_"), $this->name);
if ($config_file_lock_path == null) {
$this->name = dirname(__FILE__) . "/lock/" . $this->name;
} else {
$this->name = $config_file_lock_path . "/" . $this->name;
$this->mode = $mode;
$this->handle = @fopen($this->name, $mode);
if ($this->handle == false) {
throw new Exception($php_errormsg);
public function close()
if ($this->handle !== null ) {
$this->handle = null;
public function __destruct()
public function lock($lockType, $nonBlockingLock = false)
if ($nonBlockingLock) {
return flock($this->handle, $lockType | LOCK_NB);
} else {
return flock($this->handle, $lockType);
public function readLock()
return $this->lock(LOCK_SH);
public function writeLock($wait = 0.1)
$startTime = microtime(true);
$canWrite = false;
do {
$canWrite = flock($this->handle, LOCK_EX);
if(!$canWrite) {
usleep(rand(10, 1000));
} while ((!$canWrite) && ((microtime(true) - $startTime) < $wait));
* if you want't to log the number under multi-thread system,
* please open the lock, use a+ mod. then fopen the file will not
* destroy the data.
* this function increment a delt value , and save to the file.
* @param int $delt
* @return int
public function increment($delt = 1)
$n = $this->get();
$n += $delt;
return $n;
public function get()
fseek($this->handle, 0);
return (int)fgets($this->handle);
public function set($value)
ftruncate($this->handle, 0);
return fwrite($this->handle, (string)$value);
public function unlock()
if ($this->handle !== null ) {
return flock($this->handle, LOCK_UN);
} else {
return true;


php  google 风格分页代码
public function showCtrlPanel_g($halfPer = 5) {
  $re = '<div class="pageMore">
  if($this->currentPage-$halfPer >1){
   $re .= '<li><a href="'.$this->fileName.'pageno=1"><span>1</span></a></li>';
   if($this->currentPage-$halfPer*2 >1){
    $re .= '<li><a href="'.$this->fileName.'pageno='.($this->currentPage-$halfPer*2).'"><span>...</span></a></li>';
    $re .= '<li><a href="'.$this->fileName.'pageno=1"><span>...</span></a></li>';
  for ( $i = $this->currentPage - $halfPer,$i > 1 || $i = 1 , $j = $this->currentPage + $halfPer, $j < $this->pageCount || $j = $this->pageCount;$i <= $j ;$i++ )
   $re .= $i ==  $this->currentPage
    ? '<li class="linkOn"><a href="'.$this->fileName.'pageno='.$i.'"><span>'.$i.'</span></a></li>'." "
    : '<li><a href="'.$this->fileName.'pageno='.$i.'"><span>'.$i.'</span></a></li>'." ";
  if($this->currentPage+$halfPer < $this->pageCount){
   if($this->currentPage+$halfPer*2 < $this->pageCount){
    $re .= '<li><a href="'.$this->fileName.'pageno='.($this->currentPage+$halfPer*2).'"><span>...</span></a></li>';
    $re .= '<li><a href="'.$this->fileName.'pageno='.$this->pageCount.'"><span>...</span></a></li>';
   $re .= '<li><a href="'.$this->fileName.'pageno='.$this->pageCount.'"><span>'.$this->pageCount.'</span></a></li>';
  $re .= ' 
  return $re;

php 日期转换成日时截

private function toTimeStamp ($dateTimeString = NULL) {
  if (!$dateTimeString) {
   $dateTimeString = time();
  $numeric = '';
  $add_space = false;
  for($i=0;$i<strlen($dateTimeString);$i++) {
   if(strpos('0123456789',$dateTimeString[$i])===false) {
    if($add_space) {
     $numeric .= ' ';
     $add_space = false;
   } else {
    $numeric .= $dateTimeString[$i];
    $add_space = true;
  $numeric_array = explode(' ',$numeric,6);
  if(sizeof($numeric_array)<3 || ($numeric_array[0]==0 && $numeric_array[1]==0 && $numeric_array[2]==0)) {
   throw new Exception($dateTimeString . ' is an invalid parameter', 5);
  } else {
   $result = mktime(intval($numeric_array[3]), intval($numeric_array[4]), intval($numeric_array[5]),
        intval($numeric_array[1]), intval($numeric_array[2]), intval($numeric_array[0])) ;
  return $result;



