php socket服务的模型以及实现 多进程IO复用libevent
端口复用技术,这样就可以很好的解决惊群问题和stream_socket_server性能瓶颈的问题.
<?php
/**
* 多进程IO复用libevent
* 同时处理多个连接
* 端口复用---建议php7
*/
class Xtgxiso_server
{
public $socket = false;
public $master = array();
public $onConnect = null;
public $onMessage = null;
public $onClose = null;
public $process_num = 2;
private $pids = array();
public $receive = array();
private $host='127.0.0.1';
private $port = 1215;
function __construct($host="0.0.0.0",$port=1215){
//产生子进程分支
$pid = pcntl_fork();
if ($pid == -1) {
die("could not fork"); //pcntl_fork返回-1标明创建子进程失败
} else if ($pid) {
exit(); //父进程中pcntl_fork返回创建的子进程进程号
} else {
// 子进程pcntl_fork返回的时0
}
// 从当前终端分离
if (posix_setsid() == -1) {
die("could not detach from terminal");
}
umask(0);
$this->host = $host;
$this->port = $port;
}
private function start_worker_process(){
$pid = pcntl_fork();
switch ($pid) {
case -1:
echo "fork error : {$i} \r\n";
exit;
case 0:
$context_option['socket']['so_reuseport'] = 1;
$context = stream_context_create($context_option);
$this->socket = stream_socket_server("tcp://".$this->host.":".$this->port, $errno, $errstr,STREAM_SERVER_BIND | STREAM_SERVER_LISTEN,$context);
if (!$this->socket) die($errstr."--".$errno);
stream_set_blocking($this->socket,0);
$id = (int)$this->socket;
$this->master[$id] = $this->socket;
$base = event_base_new();
$event = event_new();
event_set($event, $this->socket, EV_READ | EV_PERSIST, array(__CLASS__, 'ev_accept'), $base);
event_base_set($event, $base);
event_add($event);
echo posix_getpid()." start run...\n";
event_base_loop($base);
default:
$this->pids[$pid] = $pid;
break;
}
}
public function run(){
for($i = 1; $i <= $this->process_num; $i++){
$this->start_worker_process();
}
while(1){
foreach ($this->pids as $i => $pid) {
if($pid) {
$res = pcntl_waitpid($pid, $status,WNOHANG);
if ( $res == -1 || $res > 0 ){
$this->start_worker_process();
unset($this->pids[$pid]);
}
}
}
sleep(1);
}
}
public function ev_accept($socket, $flag, $base){
$connection = @stream_socket_accept($socket);
echo posix_getpid()." -- accepted " . stream_socket_get_name($connection,true) . "\n";
if ( !$connection ){
return;
}
stream_set_blocking($connection, 0);
$id = (int)$connection;
if($this->onConnect) {
call_user_func($this->onConnect, $connection);
}
$buffer = event_buffer_new($connection, array(__CLASS__, 'ev_read'), array(__CLASS__, 'ev_write'), array(__CLASS__, 'ev_error'), $id);
event_buffer_base_set($buffer, $base);
event_buffer_timeout_set($buffer, 30, 30);
event_buffer_watermark_set($buffer, EV_READ, 0, 0xffffff);
event_buffer_priority_set($buffer, 10);
event_buffer_enable($buffer, EV_READ | EV_PERSIST);
$this->master[$id] = $connection;
$this->buffer[$id] = $buffer;
$this->receive[$id] = '';
}
function ev_read($buffer, $id)
{
while( 1 ) {
$read = event_buffer_read($buffer, 3);
if($read === '' || $read === false)
{
break;
}
$pos = strpos($read, "\n");
if($pos === false)
{
$this->receive[$id] .= $read;
//echo "received:".$read.";not all package,continue recdiveing\n";
}else{
$this->receive[$id] .= trim(substr ($read,0,$pos+1));
$read = substr($read,$pos+1);
if($this->onMessage)
{
call_user_func($this->onMessage,$this->master[$id],$this->receive[$id]);
}
switch ( $this->receive[$id] ){
case "quit":
echo "client close conn\n";
if($this->onClose) {
call_user_func($this->onClose, $this->master[$id]);
}
fclose($this->master[$id]);
break;
default:
//echo "all package:\n";
//echo $this->receive[$id]."\n";
break;
}
$this->receive[$id]='';
}
}
}
function ev_write($buffer, $id)
{
echo "$id -- " ."\n";
}
function ev_error($buffer, $error, $id)
{
echo "ev_error - ".$error."\n";
}
}
$server = new Xtgxiso_server();
$server->onConnect = function($conn){
echo "onConnect -- accepted " . stream_socket_get_name($conn,true) . "\n";
fwrite($conn,"conn success\n");
};
$server->onMessage = function($conn,$msg){
echo "onMessage --" . $msg . "\n";
fwrite($conn,"received ".$msg."\n");
};
$server->onClose = function($conn){
echo "onClose --" . stream_socket_get_name($conn,true) . "\n";
};
$server->run();
经过多次服务模型的演变,基本我们实现了一个高性能的服务模型!
前面我们花了一段时间来搭建高性能的socket服务,可以同时处理大量的连接,但这是在没有具体业务的情况下。
如果我们启用了一个单进程的server,但里面的一个业务耗时1秒,那么在这1秒内是阻塞的,后续的请求会等待,如果并发三个请求,那么三个请求的执行时间会分别昌1秒,2秒,3秒.提高并发的方法有以下几种:
1:多启动进程,提高并发数
2:优化业务,减少耗时间相当于减少阻塞时间,提高并发数
3:异步编程,避免阻塞,提高并发数
这里我们重点介绍第三种方法,以访问第三方http为例。
代码如下:
<?php
//同步读取
function get_data_blocking(){
$socket = stream_socket_client("tcp://test.raventech.cn:80", $errno, $errstr, 6);
fwrite($socket, "GET /sleep1.php HTTP/1.0\r\nHost: test.raventech.cn\r\nAccept: */*\r\n\r\n");
$str = "";
while (!feof($socket)) {
$str .= fgets($socket, 1024);
}
fclose($socket);
return $str;
}
//异步读取
function get_data_unblocking(){
$socket = stream_socket_client("tcp://test.raventech.cn:80", $errno, $errstr, 6);
stream_set_blocking($socket, 0);
fwrite($socket, "GET /sleep1.php HTTP/1.0\r\nHost: test.raventech.cn\r\nAccept: */*\r\n\r\n");
$write = NULL;
$except = NULL;
while( $socket ){
$read = array($socket);
$num_changed_streams = stream_select($read, $write, $except, 0);
if ( $num_changed_streams > 0 ) {
foreach($read as $r){
$str = fread($r,2048);
fclose($socket);
$socket = false;
return $str;
}
}
usleep(100);
}
}
//真正的异步读取--利用server的IO复用事件来提高并发
class Get_data_event{
public $onMessage = null;
private $str='';
function __construct(&$server){
$socket = stream_socket_client("tcp://test.xtgxiso.cn:80", $errno, $errstr, 6);
stream_set_blocking($socket, 0);
fwrite($socket, "GET /sleep1.php HTTP/1.0\r\nHost: test.xtgxiso.cn\r\nAccept: */*\r\n\r\n");
$server->add_socket($socket, array($this, 'read'));
}
public function read($socket){
while (1) {
$buffer = fread($socket, 1024);
if ($buffer === '' || $buffer === false) {
break;
}
$this->str .= $buffer;
}
if( $this->onMessage && $this->str ) {
call_user_func($this->onMessage, $this->str);
}
$this->str = '';
return false;
}
}
/**
* 单进程IO复用select
*/
class Xtgxiso_server
{
public $socket = false;
public $master = array();
public $onConnect = null;
public $onMessage = null;
public $other_socket_callback = array();
function __construct($host="0.0.0.0",$port=1215)
{
$this->socket = stream_socket_server("tcp://".$host.":".$port,$errno, $errstr);
if (!$this->socket) die($errstr."--".$errno);
stream_set_blocking($this->socket,0);
$id = (int)$this->socket;
$this->master[$id] = $this->socket;
}
public function add_socket($socket,$callback){
$id = (int)$socket;
$this->master[$id] = $socket;
$this->other_socket_callback[$id] = $callback;
}
public function run(){
$read = $this->master;
$receive = array();
echo "start run...\n";
while ( 1 ) {
$read = $this->master;
//echo "waiting...\n";
$mod_fd = @stream_select($read, $_w = NULL, $_e = NULL, 60);
if ($mod_fd === FALSE) {
break;
}
foreach ( $read as $k => $v ) {
$id = (int)$v;
if ( $v === $this->socket ) {
//echo "new conn\n";
$conn = stream_socket_accept($this->socket);
if ($this->onConnect) {
call_user_func($this->onConnect, $conn);
}
$id = (int)$conn;
$this->master[$id] = $conn;
} else if ( @$this->other_socket_callback[$id] ){
call_user_func_array($this->other_socket_callback[$id], array($v));
} else {
//echo "read data\n";
if ( !isset($receive[$k]) ){
$receive[$k]="";
}
$buffer = fread($v, 1024);
//echo $buffer."\n";
if ( strlen($buffer) === 0 ) {
if ( $this->onClose ){
call_user_func($this->onClose,$v);
}
fclose($v);
$id = (int)$v;
unset($this->master[$id]);
} else if ( $buffer === FALSE ) {
if ( $this->onClose ){
call_user_func($this->onClose, $this->master[$key_to_del]);
}
fclose($v);
$id = (int)$v;
unset($this->master[$id]);
} else {
$pos = strpos($buffer, "\r\n\r\n");
if ( $pos === false) {
$receive[$k] .= $buffer;
//echo "received:".$buffer.";not all package,continue recdiveing\n";
}else{
$receive[$k] .= trim(substr ($buffer,0,$pos+4));
$buffer = substr($buffer,$pos+4);
if($this->onMessage) {
call_user_func($this->onMessage,$v,$receive[$k]);
}
$receive[$k]='';
}
}
}
}
usleep(10000);
}
}
}
$server = new Xtgxiso_server();
$server->onConnect = function($conn){
echo "onConnect -- accepted " . stream_socket_get_name($conn,true) . "\n";
};
$server->onMessage = function($conn,$msg) use ( $server ) {
/*
$respone ="";//响应内容
$respone = "HTTP/1.1 200 OK\r\n";
$respone .= "Server: openresty\r\n";
$respone .= "Content-Type: text/html; charset=utf-8\r\n";
$body = time().rand(111111,999999);
$len = strlen($body);
$respone .= "Content-Length:$len\r\n";
$respone .= "Connection: close\r\n";
$respone .= "\r\n$body\r\n\r\n";
echo "onMessage --" . $msg . "\n";
*/
//同步读取
//$respone = get_data_blocking();
//fwrite($conn,$respone);
//异步读取
//$respone = get_data_unblocking();
//fwrite($conn,$respone);
//真正异步
$data = new Get_data_event($server);
$data->onMessage = function($str) use($conn){
fwrite($conn,$str);
};
};
$server->onClose = function($conn){
echo "onClose --" . "\n";
};
$server->run();
第三方服务sleep1.php的代码比较简单
<?php
sleep(1);//模拟耗时
echo "OK";
通过以上代码示例,我们分别注释运行 同步读取,异步读取,真正异步,来观察server的并发.测试方法可以写个test.html来模拟三个并发.
<script src="http://127.0.0.1:1215/?id=1"></script>
<script src="http://127.0.0.1:1215/?id=2"></script>
<script src="http://127.0.0.1:1215/?id=3"></script>
通过测试发现,真正异步的是并发的,每个请求耗时1秒,这样我们总算明白什么是真正的非阻塞异步编程了,关键就在共用IO复用.
为了更好的了解redis协议,我们用php来实现一个支持大部份命令的客户端类.
redis的协议可参考这个文章http://redis.cn/topics/protocol.html
代码如下:
<?php
namespace xtgxiso;
class Redis {
private $redis_socket = false;
private $cmd = '';
public function __construct($host='127.0.0.1',$port=6379,$timeout = 3) {
$this->redis_socket = stream_socket_client("tcp://".$host.":".$port, $errno, $errstr, $timeout);
if ( !$this->redis_socket) {
throw new Exception("{$errno} - {$errstr}");
}
}
public function __destruct() {
fclose($this->redis_socket);
}
public function __call($name, $args) {
$crlf = "\r\n";
array_unshift($args,$name);
$command = '*' . count($args) . $crlf;
foreach ($args as $arg) {
$command .= '$' . strlen($arg) . $crlf . $arg . $crlf;
}
$fwrite = fwrite($this->redis_socket,$command);
if ($fwrite === FALSE || $fwrite <= 0) {
throw new Exception('Failed to write entire command to stream');
}
return $this->readResponse();
}
private function readResponse() {
$reply = trim(fgets($this->redis_socket, 1024));
switch (substr($reply, 0, 1)) {
case '-':
throw new Exception(trim(substr($reply, 4)));
break;
case '+':
$response = substr(trim($reply), 1);
if ($response === 'OK') {
$response = TRUE;
}
break;
case '$':
$response = NULL;
if ($reply == '$-1') {
break;
}
$read = 0;
$size = intval(substr($reply, 1));
if ($size > 0) {
do {
$block_size = ($size - $read) > 1024 ? 1024 : ($size - $read);
$r = fread($this->redis_socket, $block_size);
if ($r === FALSE) {
throw new Exception('Failed to read response from stream');
} else {
$read += strlen($r);
$response .= $r;
}
} while ($read < $size);
}
fread($this->redis_socket, 2); /* discard crlf */
break;
/* Multi-bulk reply */
case '*':
$count = intval(substr($reply, 1));
if ($count == '-1') {
return NULL;
}
$response = array();
for ($i = 0; $i < $count; $i++) {
$response[] = $this->readResponse();
}
break;
/* Integer reply */
case ':':
$response = intval(substr(trim($reply), 1));
break;
default:
throw new RedisException("Unknown response: {$reply}");
break;
}
return $response;
}
}
/*
$redis = new Client_test();
var_dump($redis->auth("123456"));
var_dump($redis->set("xtgxiso",'abc'));
var_dump($redis->get("xtgxiso"));
*/
通过实现,我们基本了解redis的协议。
大家都知道redis是用C来实现的,现在我用php来实现一个简单的仅支持SET和GET命令的redis服务端,主要是为了更好的了解redis的服务端和php的网络编程.
代码如下:
<?php
/**
* 多进程阻塞式
*/
class Xtgxiso_server
{
private $socket = false;
private $process_num = 100;
public $redis_kv_data = array();
public $onMessage = null;
function __construct($host="0.0.0.0",$port=1215)
{
$this->socket = stream_socket_server("tcp://".$host.":".$port,$errno, $errstr);
if (!$this->socket) die($errstr."--".$errno);
echo "listen $host $port \r\n";
ini_set("memory_limit", "128M");
}
private function parseRESP(&$conn){
$line = fgets($conn);
if($line === '' || $line === false)
{
return null;
}
$type = $line[0];
$line = mb_substr($line,1,-2);
switch ( $type ){
case "*":
$count = (int) $line;
$data = array();
for ($i = 1; $i <= $count; $i++) {
$data[] = $this->parseRESP($conn);
}
return $data;
case "$":
if ($line == '-1') {
return null;
}
$length = $line + 2;
$data = '';
while ($length > 0) {
$block = fread($conn, $length);
if ($length !== strlen($block)) {
throw new Exception('RECEIVING');
}
$data .= $block;
$length -= mb_strlen($block);
}
return mb_substr($data, 0, -2);
}
return $line;
}
private function start_worker_process(){
$pid = pcntl_fork();
switch ($pid) {
case -1:
echo "fork error : {$i} \r\n";
exit;
case 0:
while ( 1 ) {
echo "waiting...\n";
$conn = stream_socket_accept($this->socket, -1);
if ( !$conn ){
continue;
}
//"*3\r\n$3\r\nSET\r\n$5\r\nmykey\r\n$7\r\nmyvalue\r\n"
while(1){
$arr = $this->parseRESP($conn);
if ( is_array($arr) ) {
if ($this->onMessage) {
call_user_func($this->onMessage, $conn, $arr);
}
}else if ( $arr ){
if ($this->onMessage) {
call_user_func($this->onMessage, $conn, $arr);
}
}else{
fclose($conn);
break;
}
}
}
default:
$this->pids[$pid] = $pid;
break;
}
}
public function run(){
for($i = 1; $i <= $this->process_num; $i++){
$this->start_worker_process();
}
while(1){
foreach ($this->pids as $i => $pid) {
if($pid) {
$res = pcntl_waitpid($pid, $status,WNOHANG);
if ( $res == -1 || $res > 0 ){
$this->start_worker_process();
unset($this->pids[$pid]);
}
}
}
sleep(1);
}
}
}
$server = new Xtgxiso_server();
$server->onMessage = function($conn,$info) use($server){
if ( is_array($info) ){
if ( $info["0"] == "SET" ) {
$key = $info[1];
$val = $info[2];
$server->redis_kv_data[$key] = $val;
fwrite($conn, "+OK\r\n");
}else if ( $info["0"] == "GET" ){
$key = $info[1];
fwrite($conn, "$".strlen($server->redis_kv_data[$key])."\r\n".$server->redis_kv_data[$key]."\r\n");
}else{
fwrite($conn,"+OK\r\n");
}
}else{
fwrite($conn,"+OK\r\n");
}
};
$server->run();
通过如下命令来测试PHP实现的性能:
redis-benchmark -h 10.170.233.221 -p 1215 -t set -n 80000 -q
看来还是不错的,大家有兴趣可以再实现其他命令!
相关文章
- php语言实现redis的客户端与服务端有一些区别了因为前面介绍过服务端了这里我们来介绍客户端吧,希望文章对各位有帮助。 为了更好的了解redis协议,我们用php来实现...2016-11-25
- 有时我们在页面上需要选择数值范围,如购物时选取价格区间,购买主机时自主选取CPU,内存大小配置等,使用直观的滑块条直接选取想要的数值大小即可,无需手动输入数值,操作简单又方便。HTML首先载入jQuery库文件以及jRange相关...2015-03-15
- 本文实例讲述了JS实现的简洁纵向滑动菜单(滑动门)效果。分享给大家供大家参考,具体如下:这是一款纵向布局的CSS+JavaScript滑动门代码,相当简洁的手法来实现,如果对颜色不满意,你可以试着自己修改CSS代码,这个滑动门将每一...2015-10-21
jQuery+slidereveal实现的面板滑动侧边展出效果
我们借助一款jQuery插件:slidereveal.js,可以使用它控制面板左右侧滑出与隐藏等效果,项目地址:https://github.com/nnattawat/slideReveal。如何使用首先在页面中加载jquery库文件和slidereveal.js插件。复制代码 代码如...2015-03-15- 翻板抽奖的实现流程:前端页面提供6个方块,用数字1-6依次表示6个不同的方块,当抽奖者点击6个方块中的某一块时,方块翻转到背面,显示抽奖中奖信息。看似简单的一个操作过程,却包含着WEB技术的很多知识面,所以本文的读者应该熟...2015-10-21
SQLMAP结合Meterpreter实现注入渗透返回shell
sqlmap 是一个自动SQL 射入工具。它是可胜任执行一个广泛的数据库管理系统后端指印, 检索遥远的DBMS 数据库等,下面我们来看一个学习例子。 自己搭建一个PHP+MYSQ...2016-11-25- 这篇文章主要介绍了c# socket网络编程,server端接收,client端发送数据,大家参考使用吧...2020-06-25
- 这篇文章主要介绍了JS WebSocket断开原因和心跳机制,对websocket感兴趣的同学,可以参考下...2021-05-08
- 复制代码 代码如下: // 第一种写法 $da = date("w"); if( $da == "1" ){ echo "今天是星期一"; }else if( $da == "2" ){ echo "今天是星期二"; }else if( $da == "3" ){ echo "今天是星期三"; }else if( $da == "4"...2013-10-04
- 这篇文章主要介绍了C#实现Socket通信的解决方法,需要的朋友可以参考下...2020-06-25
- 本篇文章主要介绍了C# Socket异步通信,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧...2020-06-25
- js里面设置DOM节点透明度的函数属性:filter= "alpha(opacity=" + value+ ")"(兼容ie)和opacity=value/100(兼容FF和GG)。 先来看看设置透明度的兼容性代码: 复制代码 代码如下: function setOpacity(ele, opacity) { if (...2014-06-07
- 本篇文章主要介绍了C# Socket的TCP通讯,socket通讯方式有两种:同步和异步,详细的介绍了这两种方法,有兴趣的可以了解一下。...2020-06-25
python使用socket高效传输视频数据帧(连续发送图片)
本文主要介绍了python使用socket高效传输视频数据帧(连续发送图片),文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...2021-10-23- 这篇文章主要介绍了C语言中send()函数和sendto()函数的使用方法,是C语言入门学习中的基础知识,需要的朋友可以参考下...2020-04-25
- 我们在调试过程使用的工具有:modheader,postman等,但这些工具都会存在的问题:缺少客户端里相应的设备信息;即使将cookie信息复制出来,也是存在过期的问题;多个设备之间切换时不方便;针对这些存在的问题,我基于websocket双向通信的特点,实现了多端桥接管理平台...2021-05-15
- 本文我们需要解决的问题是如何实现Http请求来实现通信,解决Android 2.3 版本以后无法使用Http请求问题,下面请看正文。 Android开发中使用HttpClient来开发Http程序...2016-09-20
- 这篇文章主要介绍了C# 实现WebSocket服务端教程,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...2020-12-08
- 复制代码 代码如下:call PROCEDURE_split('分享,代码,片段',',');select * from splittable;复制代码 代码如下:drop PROCEDURE if exists procedure_split;CREATE PROCEDURE `procedure_split`( inputstring varc...2014-05-31
- 这篇文章主要介绍了C# Socket编程实现简单的局域网聊天器,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧...2020-06-25