PHP 实现多进程 Linux IPC 消息队列

编辑于 2017-02-11

* 移动设备下, 可左滑手指以查看较宽代码

Linux IPC 消息队列是一个全内存设计,内核保证读写顺序和数据同步,并且性能非常强悍的先进现先出数据结构。

参考:韩天峰博客

说明

它的特征如下:

1)每秒可读写超过50万次(4核/4G内存的机器)

2)支持消息多类型,抢队列时可根据需要获取特定的消息类型

3)每个消息长度最大支持65535个字节

4)队列长度受内存大小限制,最大不超过机器内存的50%,可以修改内核参数来调整

消息队列可以用在很多场景下,如异步任务处理,抢占式的数据分发,顺序缓存区等。使用方法也非常简单,Linux提供了4个库函数,msgget,msgsnd,msgrcv,msgctl,分别用于创建/获取消息队列、发送数据、接收数据、设置/获取消息队列。PHP 内核包含了这个扩展,需要在 ./configure 时加入–enable-sysvmsg来开启。具体可参考PHP手册。

代码

下面写一个简单的例子,采用单主进程 + 多 Worker 进程的模式,功能是做异步任务的处理。本代码没有提供进程管理、信号处理、队列过载保护,如果要用在生产环境,请自行实现。

<?php
$msg_key = 0x3000111; // 系统消息队列的key
$worker_num = 2;   // 启动的Worker进程数量
$worker_pid = array();

$queue = msg_get_queue($msg_key, 0666) or die("create queue fail\n");

for($i = 0; $i < $worker_num; $i++) {
    $pid = pcntl_fork();
    // 主进程
    if($pid > 0) {
        $worker_pid[] = $pid;
        echo "create worker $i.pid = $pid\n";
        continue;
    }
    // 子进程
    elseif($pid == 0) {
        procWorker($i);
        exit;
    } else {
        echo "fork fail\n";
    }
}

procMain();

function procMain()
{
    global $queue;
    $bind = "udp://0.0.0.0:9999";

    // 建立一个 UDP 服务器接收请求
    $socket = stream_socket_server($bind, $errno, $errstr, STREAM_SERVER_BIND)
    or die("$errstr ($errno)");

    stream_set_blocking($socket, 1);
    echo "stream_socket_server bind=$bind\n";
    while (1) {
        $errCode = 0;
        $peer = '';
        $pkt = stream_socket_recvfrom($socket, 8192, 0, $peer);

        if($pkt == false) {
            echo "udp error\n";
        }
        // 如果队列满了,这里会阻塞
        $ret = msg_send($queue, 1, $pkt, false, true, $errCode);
        if($ret) {
            stream_socket_sendto($socket, "OK\n", 0, $peer);
        } else {
            stream_socket_sendto($socket, "ER\n", 0, $peer);
        }
    }
}

function procWorker($id)
{
    global $queue;
    $msg_type = 0;
    $msg_pkt = '';
    $errCode = 0;
    while(1) {
        $ret = msg_receive($queue, 0, $msg_type, 8192, $msg_pkt, false, $errCode);
        if($ret) {
            //TODO 这里处理接收到的数据
            echo "[Worker $id] ".$msg_pkt;
        } else {
            echo "ERROR: queue errno={$errCode}\n";
        }
    }
}

作为测试,我们使用 UDP 发送数据到程序,程序再通过消息队列通讯并显示结果。

先运行程序,然后在 Linux 终端输入:

nc -u 127.0.0.1 9999
#然后输入:
# 1 回车
# 2 回车
# 3 回车

运行结果:

create worker 0.pid = 18885
create worker 1.pid = 18886
stream_socket_server bind=udp://0.0.0.0:9999
[Worker 0] 1
[Worker 1] 2
[Worker 0] 3

使用 ipcs -q 来查看系统消息队列:

Message Queues:
q  65536 0x03000111 --rw-rw-rw-     tony    staff

0x03000111 就是刚才我们创建的消息队列。