Building Applications with POE

Earlier, we talked about the fundamental principles of application design with POE. Now it’s time to put my money where my mouth is and build some actual working code.

To make life a bit easier, let’s lay out a very simple problem. Let’s say we would like accept and parse data that resembles CGI query strings. This data will be key value pairs in which the key and value are separated by =’s and the pairs themselves are delimited by &. An example string we’ll use throughout this article is as follows:

  foo=bar&baz=1&bat=2

By the time we’re done, we will have a working Filter and Component to handle this incoming data.

Step 1: A Filter

The first step is building a simple filter to parse this incoming data. As we discussed earlier, filters are much easier to deal with because they are unaware of their environment and the POE context in which they are run. Our filter is made even easier since we are just parsing incoming data and not generating an outgoing datastream.

First off, we need the basics of any good module.

  package POE::Filter::SimpleQueryString;

  use warnings;
  use strict;

  use Carp qw(carp croak);

Next we need a constructor.

  sub new {
    my $class = shift;
    my $self = bless {}, $class;
    return $self;
  }

This is about the simplest constructor possible. This very simple filter requires no parameters to operate. It is perfectly reasonable, however, to demand parameters of filter users. For instance, if the filter could rot13 the incoming data before parsing, and a parameter could turn that feature on.

get()

Now we need the ability to parse data. We will be using the newer and much simpler get/put version of the Filter API. This version of the standard POE Filter API requires get() and put() methods with the ability to transform multiple record sets per invocation.

get()’s job is transform raw data into cooked record sets. The example string above (foo=bar&baz=1&bat=2) will become a hash:

  $VAR1 = {
    'foo' => 'bar',
    'baz  => '1',
    'bat  => '2',
  };

A POE Filter is just a normal Perl object with a defined interface.

  sub get {
    my $self = shift;
    my $buffer = shift;

The buffer can and probably will contain multiple records. The size of the buffer is determined by the POE Driver being used and the operating system in question. $buffer will always be an array reference. While it is generally sensible to test $buffer to make sure it conforms to the standard interface, for the purpose of this exercise, we will just trust POE.

In our super-easy format, an individual record is terminated by a \n. Key value pairs are delimited by & and key and value themselves are separated by an =. Note that we aren’t dealing with issues like character escaping or data taint. Production quality code will need to deal with these issues.

    my @chunks;

Each parsed line makes up a chunk of data. We want to represent each record as a distinct entity to the user.

    foreach my $record (@$buffer) {
      $record =~ s/\x0d\x0a$//;
      my @pairs = split(/&/, $record);

      my %chunk;
      foreach my $pair (@pairs) {
        my ($key, $value) = split(/=/, $pair, 2);

So what happens if there is more than one instance of a given key in a record? Simple. We make an array reference. The user will need to inspect the value of each key to determine if they have more than one value:

        if(defined $chunk{$key}) {
          if(ref $chunk{$key} eq 'ARRAY') {
            push @{ $chunk{$key} }, $value;
          } else {
            $chunk{$key} = [ $chunk{$key}, $value ],
          }
        } else {
          $chunk{$key} = $value;
        }
      }
      push @chunks, \%chunk;
    }
    return \@chunks;
  }

put()

We now have a simple query-string-like data parser. This is fine for read-only servers but it makes sense to allow our users to send data back and forth in the same format. To allow for that, we need a put() method. put()’s job is take the cooked form of our records and translate it to the raw form. In this case we will be taking a hash reference that looks like:

  $VAR1 = {
    'foo' => 'bar',
    'baz' => '1',
    'bat' => '2',
  };

And transforming it into:

  foo=bar&baz=1&bat=2

So that whatever Wheel our user has chosen can put the data onto the wire. Like get(), put() is a normal method call on a normal perl object.

  sub put {
    my $self = shift;
    my $records = shift;

Like get(), the data to act on is passed in as a parameter to the method call. It is always an array reference of records to translate.

Basically, our put() method performs an exact reversal of the get() algorithm. Take each key/value pair and concatenate them with an =. Each pair is then joined with a &.

The only real error condition we are checking for is the presence of non-array data. Since we haven’t defined behavior for this condition, we warn the user about the data and skip it:

    my @raw;
    foreach my $record (@$records) {
      my @chunks;
      foreach my $key (sort keys %$record) {
        if(ref $record->{$key}) {
          if(ref $record->{$key} eq 'ARRAY') {
            foreach my $value ( @{ $record->{$key} } ) {
              push @chunks, $key."=".$value;
            }
          } else {
            carp __PACKAGE__." cannot handle data of type
            ".ref $record->{$key};
          }
        } else {
          push @chunks, $key."=".$record->{$key};
        }
      }
      push @raw, join('&',@chunks)."\x0d\x0a";
    }
    return \@raw;
  }

The raw data is returned to the caller as an array reference of data chunks. The caller has the responsibility of putting the data on the wire in the appropriate fashion.

Step 2: A Wheel

Chances are that our users want to send data in one of the standard UNIX methods – sockets, pipes, and so on. Lucky for us, POE already has Wheels to deal with just about any methodology of data transfer you can imagine. Let’s work with a method that should work just about anywhere, TCP sockets. POE::Wheel::SocketFactory provides the functionality we need. First, we need a session to plug the wheel into. (Remember that wheels mutate sessions to provide new functionality.)

  POE::Session->create(
    inline_states => {
      _start      => \&start,
      factory_success => \&factory_success,

      client_input  => \&client_input,
      client_error  => \&client_error,

      fatal_error   => sub { die "A fatal error occurred" },
      _stop       => sub {},
    },
  );

  POE::Kernel->run();

This session will be our controller for the wheels we need to perform socket operations. Each wheel-based event provides a unique identifier so it is possible to handle more than one client per session.

When the session starts up, we spin up the SocketFactory wheel. With the Reuse flag on, SocketFactory will continuously listen on the specified port and address, handing us events for each client. The unique id passed to the SuccessEvent identifies each client.

  sub start {
    $_[HEAP]->{factory} = POE::Wheel::SocketFactory->new(
      BindAddress   => '127.0.0.1',
      BindPort    => '31337',
      SuccessEvent  => 'factory_success',
      FailureEvent  => 'fatal_error',
      SocketProtocol  => 'tcp',
      Reuse       => 'on',
    );
  }

When a client makes a connection, the SocketFactory lets us know. It is our job to figure out what to do with the filehandle SocketFactory built for us. In this case, we want read/write functionality using the filter we built above. POE::Wheel::ReadWrite provides this functionality, including the ability to plug in our filter.

  sub factory_success {
    my( $handle, $wheel_id ) = @_[ARG0, ARG1];
    $_[HEAP]->{clients}->{ $wheel_id }  =
      POE::Wheel::ReadWrite->new(
        Handle    => $handle,
        Driver    => POE::Driver::SysRW->new(),
        Filter    => POE::Filter::SimpleQueryString->new(),
        InputEvent  => 'client_input',
      );
  }

Now the data path is set up. We have the ability for programs to connect to a port and provide data in our simple format. What to do with the data though? Let’s simply print it out and echo it back to the client.

  sub client_input {
    my ($input, $wheel_id) = @_[ARG0, ARG1];

    use Data::Dumper;
    print Dumper $input;

    $_[HEAP]->{clients}->{ $wheel_id }->put( $input );
  }

Data::Dumper handles printing out the structure for us. The put() call puts the structure back out onto the wire. If our algorithms are correct, we should get the same data back that we put in.

  sungo@cthulu% telnet localhost 31337
  Trying 127.0.0.1...
  Connected to localhost.
  Escape character is '^]'.
  foo=bar

The server prints out:

  sungo@cthulu% perl -Ilib examples/server.pl
  $VAR1 = {
    'foo' => 'bar'
  };

And then echoes back to us:

  foo=bar

We’re in business.

Step 3: A Component

Man, that was a lot of code to get a simple TCP server up and running. Surely this can be simplified. Again, POE itself comes to the rescue. POE ships with a component specifically designed to simplify TCP server creation. We can replace all that code above with a simple call to the component’s constructor.

  POE::Component::Server::TCP->new(
    Address => '127.0.0.1',
    Port  => '31337',

    ClientFilter => "POE::Filter::SimpleQueryString",
    ClientInput => sub {
      my $input = $_[ARG0];
      use Data::Dumper;
      print Dumper $input;

      $_[HEAP]->{client}->put($input);
    }

And we’re done. The downside is that Server::TCP doesn’t allow for argument passing to the filter’s constructor and we lose the flexibility of doing things by hand. For a lot of problems, however, this component does the trick quite nicely.

We can make this even easier for our users by making our own component. For the purpose of this example, we’re going to wrap the smaller code above instead of the larger wheel based example. There is no reason why you couldn’t use the wheel-based code in your component, however.

  package POE::Component::SimpleQueryString;

  use warnings;
  use strict;

  use POE;
  use POE::Component::Server::TCP;

  use POE::Filter::SimpleQueryString;

  use Carp qw(croak carp);

  sub new {
    my $class = shift;
    my %args = @_;

    my $addr = delete $args{ListenAddr} || croak "ListenAddr required";
    my $port = delete $args{ListenPort} || croak "ListenPort required";
    my $input_event = delete $args{InputEvent} ||
      croak "InputEvent required";

    my $server = POE::Component::Server::TCP->new(
            Address => $addr,
            Port => $port,

            ClientInput => $input_event,
            ClientFilter => "POE::Filter::SimpleQueryString",
           );

    return $server;
  }

  1;

Now our users can just load up the component like so:

  POE::Component::SimpleQueryString->new(
    ListenAddr => '127.0.0.1',
    ListenPort => '31337',
    InputEvent => sub {
      my $input = $_[ARG0];
      use Data::Dumper;
      print Dumper $input;

      $_[HEAP]->{client}->put($input);
    },
  );

Conclusion

We have seen how to build POE filters and components and combine them with wheels and custom code to create flexible and maintainable programs. The code examples provided above may be downloaded under the BSD License.

Tags

Feedback

Something wrong with this article? Help us out by opening an issue or pull request on GitHub