In this part of the series we will add the ability to accept input from named pipes to the queue server, an important feature so that the queue can actually be given things to do. Again, I would like to point out that it is assumed we are running on a Unixesque operating system such as Linux, BSD, or Solaris.

What are Named Pipes?

Named Pipes are special files that the kernel can create allowing for buffered byte streams. They operate on the principle of FIFO, where the first thing put into it is the first thing it spits out. This is perfect for our queue system as we want to process the first thing put into it first. To store data into a named pipe, it is as easy as `echo lol > /path/to/pipe` and reading the data back out is as easy as `cat /path/to/pipe`. An interesting note about named pipes is that by default they block until there is both a reader and a writer. This means the echo command will hang until another terminal is opened and the cat command is executed.

Here is a quick video demonstrating how the queue system will work by the end of this post. We will be able to launch the server, and send input to it from another terminal via the named pipe.

Building the pipeline…

To deal with a named pipe in PHP, we have a few special functions to set it up. Past that, reading and writing to them are the same as any other file.

PHP Code:
<?php

$pipefile 
'/tmp/queueserver-input';

//. delete the old pipe if it exists just to clear out stale
//. references and whatnot.
if(file_exists($pipefile))
    if(!
unlink($pipefile)) 
        die(
'unable to remove stale file');

//. create a new pipe and we are going to go ahead and allow
//. anyone to write to it for the time being.
umask(0);
if(!
posix_mkfifo($pipefile,0666))
    die(
'unable to create named pipe');

//. open the file so we can read it. i am opening it with read
//. and write so that there is both a reader and a writer,
//. allowing us to skip past some blocking issues later.
$pipe fopen($pipefile,'r+');
if(!
$pipe) die('unable to open the named pipe');
stream_set_blocking($pipe,false);

?>

This is the bare minimum yet complete code required to setup a named pipe from our programming. First thing is to clean up stale files if they are still laying around. I am not sure what the proper defined behaviour is of a named pipe when the system is rebooted or whatnot, and generally I find it just best practice to make sure we start with nice fresh references all the time. Second the pipe is made with posix_mkfifo(), and for the purpose of this demo it is given world write privileges.

After that all there is to do is open the pipe and use it. It is opened and set to be non-blocking to start. This way if there is no input, our processing loop passes through once and renders the “nothing to do” message straight out.

Reading from the pipe

Now at the beginning of the main queue loop we stick in the code to read from the named pipe. Pulling numbers out of my arse, at least on FreeBSD if I recall correctly named pipes can only hold about 65K of data. Granted that is a lot of text but if you do end up filling all of that because your process takes a long time, once that is full the next time something tries to write to it the writer will hang until there is enough free space – and that would make your web API or whatever suck. The solution for that is to read everything currently pending in the pipe into an array, using application RAM to be as large as a buffer as you need. I used fgets to read from the pipe because I have decided for this demo, our data format will always be terminated by new lines, so fgets is now a built in data parser.

PHP Code:
<?php

while(1) {

    
//. read in all the lines waiting to be read before
    //. proceeding any further.
    
while($input trim(fgets($pipe))) {
        
$queue[] = $input;
    }

    
$job current($queue);
    
$jobkey key($queue);
    if(
$job) {
        echo 
'processing job '$jobPHP_EOL;
        
        
process($job);
        
        
next($queue);
        unset(
$job,$queue[$jobkey]);        
    } else {
        echo 
'no jobs to do - waiting...'PHP_EOL;
        
sleep(10);    
    }

}

?>

Because we set the file stream to be non-blocking, once our pipe is empty the read loop ends and our queue progresses, however our application still has that sleep() call in it, and in the previous posts I promised we would do away with that.

Using the stream to flow control itself

The trick to getting rid of that sleep() that is to play with the blocking status of our pipe stream. Every time we read from the stream we will set it non-blocking so when it runs out of things to read it will quit trying. Every time we run out of things to process we will set the stream to be blocking so that instead of sleeping we sit waiting for more data and the moment it comes in, it gets processed instead of sitting around waiting for sleep() to expire.

PHP Code:
<?php

while(1) {

    while(
$input trim(fgets($pipe))) {

        
//. unblock the stream so when we run out of things, this loop ends.
        
stream_set_blocking($pipe,false);

        
$queue[] = $input;
    }

    
$job current($queue);
    
$jobkey key($queue);
    if(
$job) {
        echo 
'processing job '$jobPHP_EOL;
        
        
process($job);
        
        
next($queue);
        unset(
$job,$queue[$jobkey]);        
    } else {
        echo 
'no jobs to do - waiting...'PHP_EOL;

        
//. now that we are out of things to do, the only thing there is to do
        //. is wait for more things to do. >_>
        
stream_set_blocking($pipe,true);
    }

}

?>

At this point now we have a fully working queue service. It has the ability to manage a list of things to do, and the ability to accept input to add things to that list. Here is the full program so far.

PHP Code:
<?php

function process($job) {
    
sleep(1); //. make it look like we did work.
    
return;
}

$queue = array();

//////// setup our named pipe ////////
$pipefile '/tmp/queueserver-input';
if(
file_exists($pipefile))
    if(!
unlink($pipefile)) 
        die(
'unable to remove stale file');

umask(0);
if(!
posix_mkfifo($pipefile,0666))
    die(
'unable to create named pipe');

$pipe fopen($pipefile,'r+');
if(!
$pipe) die('unable to open the named pipe');
stream_set_blocking($pipe,false);

//////// process the queue ////////
while(1) {

    while(
$input trim(fgets($pipe))) {
        
stream_set_blocking($pipe,false);
        
$queue[] = $input;
    }

    
$job current($queue);
    
$jobkey key($queue);
    if(
$job) {
        echo 
'processing job '$jobPHP_EOL;
        
        
process($job);
        
        
next($queue);
        unset(
$job,$queue[$jobkey]);        
    } else {
        echo 
'no jobs to do - waiting...'PHP_EOL;
        
stream_set_blocking($pipe,true);
    }

}

?>

In the next post we will give the program the ability to fork itself into a background daemon process. As it stands right now to keep it running you would have to do something like running it in a screen session. Many may find that just as fine a solution as any, but giving the application the ability to background itself is a much more elegant solution.