Monday, April 29, 2013

Multithreaded socket server, part 5, socket writer

The socket writer thread is the last part of the puzzle.

sub chatSockWriteT
{
    my $opts = {};
    &Triceps::Opt::parse("chatSockWriteT", $opts, {@Triceps::Triead::opts,
        socketName => [ undef, \&Triceps::Opt::ck_mandatory ],
        ctlFrom => [ undef, \&Triceps::Opt::ck_mandatory ],
    }, @_);
    undef @_;
    my $owner = $opts->{owner};
    my $app = $owner->app();
    my $tname = $opts->{thread};

    my ($tsock, $sock) = $owner->trackGetSocket($opts->{socketName}, ">");

    my $faChat = $owner->importNexus(
        from => "global/chat",
        import => "reader",
    );

    my $faCtl = $owner->importNexus(
        from => $opts->{ctlFrom},
        import => "reader",
    );

The usual preamble. The trackGetSocket() consumes the socket from the App, and this time reopens it for writing. The previously created nexuses are imported.

    my %topics; # subscribed topics for this thread

    $faChat->getLabel("msg")->makeChained("lbMsg", undef, sub {
        my $row = $_[1]->getRow();
        my $topic = $row->get("topic");
        if ($topic eq "*" || exists $topics{$topic}) {
            printOrShut($app, $opts->{fragment}, $sock, $topic, ",", $row->get("msg"), "\n");
        }
    });

The logic is defined as the connected labels. The topic hash keeps the keys that this thread is subscribed to. When a message is received from the chat nexus and the topic is in the hash or is "*", the message gets sent into the socket in the CSV format:

topic,text

The function printOrShut() is imported from Triceps::X::ThreadedServer. Its first 3 arguments are fixed, and the rest are passed through to print(). It prints the message to the socket file handle, flushes the socket, and in case of any errors it shuts down the fragment specified in its second argument. This way if the socket gets closed from the other side, the threads handling it automatically shut down.

    $faCtl->getLabel("ctl")->makeChained("lbCtl", undef, sub {
        my $row = $_[1]->getRow();
        my ($cmd, $arg) = $row->toArray();
        if ($cmd eq "print") {
            printOrShut($app, $opts->{fragment}, $sock, $arg, "\n");
        } elsif ($cmd eq "subscribe") {
            $topics{$arg} = 1;
            printOrShut($app, $opts->{fragment}, $sock, "!subscribed,$arg\n");
        } elsif ($cmd eq "unsubscribe") {
            delete $topics{$arg};
            printOrShut($app, $opts->{fragment}, $sock, "!unsubscribed,$arg\n");
        } else {
            printOrShut($app, $opts->{fragment}, $sock, "!invalid command,$cmd,$arg\n");
        }
    });

The handling of the control commands is pretty straightforward.

    $owner->readyReady();

    $owner->mainLoop();

    $tsock->close(); # not strictly necessary
}

And the rest is taken care of by the mainLoop(). The thread's main loop runs until the thread gets shut down, by handling the incoming messages. So if say printOrShut() decides to shut down the fragment, the next iteration of the loop will detect it and exit.

No comments:

Post a Comment