Friday, June 7, 2013

reordering the data from multiple threads, part 3

And finally the code of the example. I've written it with the simple approach of read stdin, print to stdout. It's easier this way, and anyway to demonstrate the rowop reordering, all the input has to be sent quickly in one chunk. The application starts as:

Triceps::Triead::startHere(
    app => "ForkJoin",
    thread => "main",
    main => \&mainT,
    workers => 2,
    delay => 0.02,
);

The main thread is:

sub mainT # (@opts)
{
    my $opts = {};
    &Triceps::Opt::parse("mainT", $opts, {@Triceps::Triead::opts,
        workers => [ 1, undef ], # number of worker threads
        delay => [ 0, undef ], # artificial delay for the 0th thread
    }, @_);
    undef @_; # avoids a leak in threads module
    my $owner = $opts->{owner};
    my $app = $owner->app();
    my $unit = $owner->unit();


So far the pretty standard boilerplate with the argument parsing.

    my $rtRate = Triceps::RowType->new( # an exchange rate between two currencies
        ccy1 => "string", # currency code
        ccy2 => "string", # currency code
        rate => "float64", # multiplier when exchanging ccy1 to ccy2
    ) or confess "$!";

    # the resulting trade recommendations
    my $rtResult = Triceps::RowType->new(
        triead => "int32", # id of the thread that produced it
        ccy1 => "string", # currency code
        ccy2 => "string", # currency code
        ccy3 => "string", # currency code
        rate1 => "float64",
        rate2 => "float64",
        rate3 => "float64",
        looprate => "float64",
    ) or confess "$!";

The row types originate from the self-join example code, same as before.

    # each tray gets sequentially numbered and framed
    my $rtFrame = Triceps::RowType->new(
        seq => "int64", # sequence number
        triead => "int32", # id of the thread that produced it (optional)
    ) or confess "$!";

    # the plain-text output of the result
    my $rtPrint = Triceps::RowType->new(
        text => "string",
    ) or confess "$!";

These are the new additions. The frame row type is used to send the information about the sequence number in the _BEGIN_ label. The print row type is used to send the text for printing back to the main thread.

    # the input data
    my $faIn = $owner->makeNexus(
        name => "input",
        labels => [
            rate => $rtRate,
            _BEGIN_ => $rtFrame,
        ],
        import => "none",
    );

    # the raw result collected from the workers
    my $faRes = $owner->makeNexus(
        name => "result",
        labels => [
            result => $rtResult,
            _BEGIN_ => $rtFrame,
        ],
        import => "none",
    );

    my $faPrint = $owner->makeNexus(
        name => "print",
        labels => [
            raw => $rtPrint, # in raw order as received by collator
            cooked => $rtPrint, # after collation
        ],
        import => "reader",
    );

The processing will go in essentially a pipeline: read the input -> process in the worker threads -> collate -> print in the main thread. Only the worker thread stage spreads into multiple threads, that are then joining the data paths back together in the collator.

    Triceps::Triead::start(
        app => $app->getName(),
        thread => "reader",
        main => \&readerT,
        to => $owner->getName() . "/input",
    );

    for (my $i = 0; $i < $opts->{workers}; $i++) {
        Triceps::Triead::start(
            app => $app->getName(),
            thread => "worker$i",
            main => \&workerT,
            from => $owner->getName() . "/input",
            to => $owner->getName() . "/result",
            delay => ($i == 0? $opts->{delay} : 0),
            workers => $opts->{workers},
            identity => $i,
        );
    }

    Triceps::Triead::start(
        app => $app->getName(),
        thread => "collator",
        main => \&collatorT,
        from => $owner->getName() . "/result",
        to => $owner->getName() . "/print",
    );

    my @rawp; # the print in original order
    my @cookedp; # the print in collated order

    $faPrint->getLabel("raw")->makeChained("lbRawP", undef, sub {
        push @rawp, $_[1]->getRow()->get("text");
    });
    $faPrint->getLabel("cooked")->makeChained("lbCookedP", undef, sub {
        push @cookedp, $_[1]->getRow()->get("text");
    });

    $owner->readyReady();

    $owner->mainLoop();

    print("--- raw ---\n", join("\n", @rawp), "\n");
    print("--- cooked ---\n", join("\n", @cookedp), "\n");
}

The collator will send the data for printing twice: first time in the order it was received ("raw"), second time in the order after collation ("cooked").

sub readerT # (@opts)
{
    my $opts = {};
    &Triceps::Opt::parse("readerT", $opts, {@Triceps::Triead::opts,
        to => [ undef, \&Triceps::Opt::ck_mandatory ], # dest nexus
    }, @_);
    undef @_; # avoids a leak in threads module
    my $owner = $opts->{owner};
    my $unit = $owner->unit();

    my $faIn = $owner->importNexus(
        from => $opts->{to},
        import => "writer",
    );

    my $lbRate = $faIn->getLabel("rate");
    my $lbBegin = $faIn->getLabel("_BEGIN_");
    # _END_ is always defined, even if not defined explicitly
    my $lbEnd = $faIn->getLabel("_END_");

This demonstrates that the labels _BEGIN_ and _END_ always get defined in each nexus, even if they are not defined explicitly. Well, here _BEGIN_ was defined explicitly but _END_ was not, and nevertheless it can be found and used.

    my $seq = 0; # the sequence

    $owner->readyReady();

    while(<STDIN>) {
        chomp;

        ++$seq; # starts with 1
        $unit->makeHashCall($lbBegin, "OP_INSERT", seq => $seq);
        my @data = split(/,/); # starts with a string opcode
        $unit->makeArrayCall($lbRate, @data);
        # calling _END_ is an equivalent of flushWriter()
        $unit->makeHashCall($lbEnd, "OP_INSERT");
    }

    {
        # drain the pipeline before shutting down
        my $ad = Triceps::AutoDrain::makeShared($owner);
        $owner->app()->shutdown();
    }
}

Each input row is sent through in a separate transaction, or in another word, a separate tray. The _BEGIN_ label carries the sequence number of the tray. The trays can as well be sent on with flushWriters() or flushWriter(), but I wanted to show that you can also flush it by calling the _END_ label.

sub workerT # (@opts)
{
    my $opts = {};
    &Triceps::Opt::parse("workerT", $opts, {@Triceps::Triead::opts,
        from => [ undef, \&Triceps::Opt::ck_mandatory ], # src nexus
        to => [ undef, \&Triceps::Opt::ck_mandatory ], # dest nexus
        delay => [ 0, undef ], # processing delay
        workers => [ undef, \&Triceps::Opt::ck_mandatory ], # how many workers
        identity => [ undef, \&Triceps::Opt::ck_mandatory ], # which one is us
    }, @_);
    undef @_; # avoids a leak in threads module
    my $owner = $opts->{owner};
    my $unit = $owner->unit();
    my $delay = $opts->{delay};
    my $workers = $opts->{workers};
    my $identity = $opts->{identity};

    my $faIn = $owner->importNexus(
        from => $opts->{from},
        import => "reader",
    );

    my $faRes = $owner->importNexus(
        from => $opts->{to},
        import => "writer",
    );

    my $lbInRate = $faIn->getLabel("rate");
    my $lbResult = $faRes->getLabel("result");
    my $lbResBegin = $faRes->getLabel("_BEGIN_");
    my $lbResEnd = $faRes->getLabel("_END_");

The worker thread starts with the pretty usual boilerplate.

    my $seq; # sequence from the frame labels
    my $compute; # the computation is to be done by this label
    $faIn->getLabel("_BEGIN_")->makeChained("lbInBegin", undef, sub {
        $seq = $_[1]->getRow()->get("seq");
    });

The processing of each transaction starts by remembering its sequence number from the _BEGIN_ label. It doesn't send a _BEGIN_ to the output yet because all the threads get the input, to be able to update its table, but then only one thread produces the output. And the thread doesn't know whether it will be the one producing the output until it knows, what is the primary key in the data. So it can start sending the output only after it had seen the data. This whole scheme works because there is exactly one data row per each transaction. A more general approach might be to have the reader thread decide, which worker will produce the result and put this information (as either a copy of the primary key or the computed thread id) into the _BEGIN_ rowop.

    # the table gets updated for every incoming rate
    $lbInRate->makeChained("lbIn", undef, sub {
        my $ccy1 = $_[1]->getRow()->get("ccy1");
        # decide, whether this thread is to perform the join
        $compute = ((ord(substr($ccy1, 0, 1)) - ord('A')) % $workers == $identity);

        # this relies on every Xtray containing only one rowop,
        # otherwise one Xtray will be split into multiple
        if ($compute) {
            $unit->makeHashCall($lbResBegin, "OP_INSERT", seq => $seq, triead => $identity);
            select(undef, undef, undef, $delay) if ($delay);
        }

        # even with $compute is set, this might produce some output or not,
        # but the frame still goes out every time $compute is set, because
        # _BEGIN_ forces it
        $unit->call($lbRateInput->adopt($_[1]));
    });

There the decision is made of whether this join is to be computed for this thread, remembered in the flag $compute, and used to generate the _BEGIN_ rowop for the output. Then the table gets updated in any case ($lbRateInput is the table's input label). I've skipped over the table creation, it's the same as in the self-join example, and you can always find the full tetx of the example in xForkJoinMt.t.

    $tRate->getOutputLabel()->makeChained("lbCompute", undef, sub {
        return if (!$compute); # not this thread's problem
        ...
                $unit->call($result);
        }
    });
    ##################################################

    $owner->readyReady();

    $owner->mainLoop();
}

Here again I've skipped over the way the result is computed. The important part is that if the $compute flag is not set, the whole self-joining computation is not performed. The _END_ label is not touched, the flushing of transactions is taken care of by the mainLoop(). Note that the _BEGIN_ label is always sent if the data is designated to this thread, even if no output as such is produced. This is done because the collator needs to get an uninterrupted sequence of transactions. Otherwise it would not be able to say if some transaction has been dropped or is only delayed.

 sub collatorT # (@opts)
{
    my $opts = {};
    &Triceps::Opt::parse("collatorT", $opts, {@Triceps::Triead::opts,
        from => [ undef, \&Triceps::Opt::ck_mandatory ], # src nexus
        to => [ undef, \&Triceps::Opt::ck_mandatory ], # dest nexus
    }, @_);
    undef @_; # avoids a leak in threads module
    my $owner = $opts->{owner};
    my $unit = $owner->unit();

    my $faRes = $owner->importNexus(
        from => $opts->{from},
        import => "reader",
    );

    my $faPrint = $owner->importNexus(
        from => $opts->{to},
        import => "writer",
    );

    my $lbResult = $faRes->getLabel("result");
    my $lbResBegin = $faRes->getLabel("_BEGIN_");
    my $lbResEnd = $faRes->getLabel("_END_");

    my $lbPrintRaw = $faPrint->getLabel("raw");
    my $lbPrintCooked = $faPrint->getLabel("cooked");

    my $seq = 1; # next expected sequence
    my @trays; # trays held for reordering: $trays[0] is the slot for sequence $seq
        # (only of course that slot will be always empty but the following ones may
        # contain the trays that arrived out of order)
    my $curseq; # the sequence of the current arriving tray

The collator thread starts very much as usual. It has its expectation of the next tray in order, which gets set correctly. The trays that arrive out of order will be buffered in the array @trays. Well, more exactly, for simplicity, all the trays get buffered there and then sent on if their turn has come.But it's possible to make an optimized version that would let the data flow through immediately if it's arriving in order.

    # The processing of data after it has been "cooked", i.e. reordered.
    my $bindRes = Triceps::FnBinding->new(
        name => "bindRes",
        on => $faRes->getFnReturn(),
        unit => $unit,
        withTray => 1,
        labels => [
            "_BEGIN_" => sub {
                $unit->makeHashCall($lbPrintCooked, "OP_INSERT", text => $_[1]->printP("BEGIN"));
            },
            "result" => sub {
                $unit->makeHashCall($lbPrintCooked, "OP_INSERT", text => $_[1]->printP("result"));
            }
        ],
    );
    $faRes->getFnReturn()->push($bindRes); # will stay permanently

The data gets collected into trays through a binding that gets permanently pushed onto the facet's FnReturn. Then when the tray's order comes, it wil lbe simply called and will produce the print calls for the cooked data order.

    # manipulation of the reordering,
    # and along the way reporting of the raw sequence
    $lbResBegin->makeChained("lbBegin", undef, sub {
        $unit->makeHashCall($lbPrintRaw, "OP_INSERT", text => $_[1]->printP("BEGIN"));
        $curseq = $_[1]->getRow()->get("seq");
    });
    $lbResult->makeChained("lbResult", undef, sub {
        $unit->makeHashCall($lbPrintRaw, "OP_INSERT", text => $_[1]->printP("result"));
    });
    $lbResEnd->makeChained("lbEnd", undef, sub {
        my $tray = $bindRes->swapTray();
        if ($curseq == $seq) {
            $unit->call($tray);
            shift @trays;
            $seq++;
            while ($#trays >= 0 && defined($trays[0])) {
                # flush the trays that arrived misordered
                $unit->call(shift @trays);
                $seq++;
            }
        } elsif ($curseq > $seq) {
            $trays[$curseq-$seq] = $tray; # remember for the future
        } else {
            # should never happen but just in case
            $unit->call($tray);
        }
    });

    $owner->readyReady();

    $owner->mainLoop();
};

The input rowops get not only collected in the binding's tray but also chained directly to the labels that print the raw order of arrival. The handling of _BEGIN_ also remembers its sequence number.

The handler of _END_ (which rowops get produced implicitly at the end of transaction) then does the heavy lifting. It looks at the sequence number remembered from _BEGIN_ and makes the decision. If the received sequence is the next expected one, the data collected in the tray gets sent on immediately, and then the contents of the @trays array gets sent on until it hits a blank spot of missing data. Or if the received sequence leaves a gap, the tray is placed into an appropriate spot in @trays for later processing.

This whole logic can be encapsulated in a class but I haven't decided yet on the best way to do it. Maybe somewhere in the future.

No comments:

Post a Comment