加入收藏 | 设为首页 | 会员中心 | 我要投稿 汽车网 (https://www.0577qiche.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 站长学院 > PHP教程 > 正文

php中socket服务的模型下的编程方法

发布时间:2023-09-22 10:39:22 所属栏目:PHP教程 来源:
导读:前面我们花了一段时间来搭建高性能的socket服务,可以同时处理大量的连接,但这是在没有具体业务的情况下。

如果我们启用了一个单进程的server,但里面的一个业务耗时1秒,那么在这1秒内是阻塞的,后续的请求会等待
前面我们花了一段时间来搭建高性能的socket服务,可以同时处理大量的连接,但这是在没有具体业务的情况下。

如果我们启用了一个单进程的server,但里面的一个业务耗时1秒,那么在这1秒内是阻塞的,后续的请求会等待,如果并发三个请求,那么三个请求的执行时间会分别昌1秒,2秒,3秒.提高并发的方法有以下几种:

1:多启动进程,提高并发数

2:优化业务,减少耗时间相当于减少阻塞时间,提高并发数

3:异步编程,避免阻塞,提高并发数

这里我们重点介绍第三种方法,以访问第三方http为例。

代码如下:

//同步读取 
function get_data_blocking(){ 
    $socket = stream_socket_client("tcp://test.raventech.cn:80", $errno, $errstr, 6); 
    fwrite($socket, "GET /sleep1.php HTTP/1.0/r/nHost: test.raventech.cn/r/nAccept: */*/r/n/r/n"); 
    $str = ""; 
    while (!feof($socket)) { 
        $str .= fgets($socket, 1024); 
    } 
    fclose($socket); 
    return $str; 

 
//异步读取 
function get_data_unblocking(){ 
    $socket = stream_socket_client("tcp://test.raventech.cn:80", $errno, $errstr, 6); 
    stream_set_blocking($socket, 0); 
    fwrite($socket, "GET /sleep1.php HTTP/1.0/r/nHost: test.raventech.cn/r/nAccept: */*/r/n/r/n"); 
    $write  = NULL; 
    $except = NULL; 
    while( $socket ){ 
        $read   = array($socket); 
        $num_changed_streams = stream_select($read, $write, $except, 0); 
        if ( $num_changed_streams > 0 ) { 
            foreach($read as $r){ 
                $str = fread($r,2048); 
                fclose($socket); 
                $socket = false; 
                return $str; 
            } 
        } 
        usleep(100); 
    } 

 
//真正的异步读取--利用server的IO复用事件来提高并发 
class Get_data_event{ 
 
    public $onMessage = null; 
    private $str=''; 
 
    function __construct(&$server){ 
        $socket = stream_socket_client("tcp://test.xtgxiso.cn:80", $errno, $errstr, 6); 
        stream_set_blocking($socket, 0); 
        fwrite($socket, "GET /sleep1.php HTTP/1.0/r/nHost: test.xtgxiso.cn/r/nAccept: */*/r/n/r/n"); 
        $server->add_socket($socket, array($this, 'read')); 
    } 
 
    public function read($socket){ 
        while (1) { 
            $buffer = fread($socket, 1024); 
            if ($buffer === '' || $buffer === false) { 
                break; 
            } 
            $this->str .= $buffer; 
        } 
        if( $this->onMessage && $this->str ) { 
            call_user_func($this->onMessage, $this->str); 
        } 
        $this->str = ''; 
        return false; 
    } 
 

 
/** 
 * 单进程IO复用select 
 */ 
class Xtgxiso_server 

    public $socket = false; 
    public $master = array(); 
    public $onConnect = null; 
    public $onMessage = null; 
    public $other_socket_callback = array(); 
 
    function __construct($host="0.0.0.0",$port=1215) 
    { 
        $this->socket = stream_socket_server("tcp://".$host.":".$port,$errno, $errstr); 
        if (!$this->socket) die($errstr."--".$errno); 
        stream_set_blocking($this->socket,0); 
        $id = (int)$this->socket; 
        $this->master[$id] = $this->socket; 
    } 
 
    public function add_socket($socket,$callback){ 
        $id = (int)$socket; 
        $this->master[$id] = $socket; 
        $this->other_socket_callback[$id] = $callback; 
    } 
 
    public function run(){ 
        $read = $this->master; 
        $receive = array(); 
        echo  "start run.../n"; 
        while ( 1 ) { 
            $read = $this->master; 
            //echo  "waiting.../n"; 
            $mod_fd = @stream_select($read, $_w = NULL, $_e = NULL, 60); 
            if ($mod_fd === FALSE) { 
                break; 
            } 
            foreach ( $read as $k => $v ) { 
                $id = (int)$v; 
                if ( $v === $this->socket ) { 
                    //echo "new conn/n"; 
                    $conn = stream_socket_accept($this->socket); 
                    if ($this->onConnect) { 
                        call_user_func($this->onConnect, $conn); 
                    } 
                    $id = (int)$conn; 
                    $this->master[$id] = $conn; 
                } else if ( @$this->other_socket_callback[$id] ){ 
                    call_user_func_array($this->other_socket_callback[$id], array($v)); 
                } else { 
                    //echo "read data/n"; 
                    if ( !isset($receive[$k]) ){ 
                        $receive[$k]=""; 
                    } 
                    $buffer = fread($v, 1024); 
                    //echo $buffer."/n"; 
                    if ( strlen($buffer) === 0 ) { 
                        if ( $this->onClose ){ 
                            call_user_func($this->onClose,$v); 
                        } 
                        fclose($v); 
                        $id = (int)$v; 
                        unset($this->master[$id]); 
                    } else if ( $buffer === FALSE ) { 
                        if ( $this->onClose ){ 
                            call_user_func($this->onClose, $this->master[$key_to_del]); 
                        } 
                        fclose($v); 
                        $id = (int)$v; 
                        unset($this->master[$id]); 
                    } else { 
                        $pos = strpos($buffer, "/r/n/r/n"); 
                        if ( $pos === false) { 
                            $receive[$k] .= $buffer; 
                            //echo "received:".$buffer.";not all package,continue recdiveing/n"; 
                        }else{ 
                            $receive[$k] .= trim(substr ($buffer,0,$pos+4)); 
                            $buffer = substr($buffer,$pos+4); 
                            if($this->onMessage) { 
                                call_user_func($this->onMessage,$v,$receive[$k]); 
                            } 
                            $receive[$k]=''; 
                        } 
                    } 
                } 
            } 
            usleep(10000); 
        } 
    } 

 
 
$server =  new Xtgxiso_server(); 
 
$server->onConnect = function($conn){ 
    echo "onConnect -- accepted " . stream_socket_get_name($conn,true) . "/n"; 
}; 
 
$server->onMessage = function($conn,$msg) use ( $server ) { 
    /* 
    $respone ="";//响应内容 
    $respone = "HTTP/1.1 200 OK/r/n"; 
    $respone .= "Server: openresty/r/n"; 
    $respone .= "Content-Type: text/html; charset=utf-8/r/n"; 
    $body = time().rand(111111,999999); 
    $len = strlen($body); 
    $respone .= "Content-Length:$len/r/n"; 
    $respone .= "Connection: close/r/n"; 
    $respone .= "/r/n$body/r/n/r/n"; 
    echo "onMessage --" . $msg . "/n"; 
    */ 
 
    //同步读取 
    //$respone = get_data_blocking(); 
    //fwrite($conn,$respone); 
 
    //异步读取 
    //$respone = get_data_unblocking(); 
    //fwrite($conn,$respone); 
 
    //真正异步 
    $data = new Get_data_event($server); 
    $data->onMessage = function($str) use($conn){ 
        fwrite($conn,$str); 
    }; 
 
}; 
 
$server->onClose = function($conn){ 
    echo "onClose --" . "/n"; 
}; 
 
$server->run(); 
第三方服务sleep1.php的代码比较简单:

sleep(1);//模拟耗时 
echo "OK"; 
通过以上代码示例,我们分别注释运行 同步读取,异步读取,真正异步,来观察server的并发.测试方法可以写个test.html来模拟三个并发.

<script src="http://127.0.0.1:1215/?id=1"></script> 
<script src="http://127.0.0.1:1215/?id=2"></script> 
<script src="http://127.0.0.1:1215/?id=3"></script> 
通过测试发现,真正异步的是并发的,每个请求耗时1秒,这样我们总算明白什么是真正的非阻塞异步编程了,关键就在共用IO复用.

 

(编辑:汽车网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章