Sunday, March 4, 2012

Time-limited propagation, part 1

When aggregating data, often the results of the aggregation stay relevant longer than the original data.

For example, in the financials the data gets collected and aggregated for the current business day. After the day is closed, the day's detailed data are not interesting any more, and can be deleted in preparation for the next day. However the daily results stay interesting for a long time, and may even be archived for years.

This is not limited to the financials. A long time ago, in the times of slow and expensive Internet connections, I've done a traffic accounting system. It did the same: as the time went by, less and less detail was kept about the traffic usage. The modern accounting of the click-through advertisement also works in a similar way.

An easy way to achieve this result is to put a filter on the way of the aggregation results. It would compare the current idea of time and the time in the rows going by, and throw away the rows that are too old. This can be done as a label that gets the data from the aggregator and then forwards or not the data to the real destination. This solves the propagation problem but as the obsolete original data gets deleted, the aggregator will still be churning and producing the updates, only to have them thrown away at the filter. A more efficient way is to stop the churn by placing the filter right into the aggregator.

The next example demonstrates such an aggregator, in a simplified version of that traffic accounting system that I've once done. The example is actually about more than just stopping the data propagation. That stopping accounts for about 3 lines in it. But I also want to show a simple example of traffic accounting as such. And to show that the lack of the direct time support in Triceps does not stop you from doing any time-based processing. Because of this I'll show the whole example and not just snippets from it. But since the example is biggish, I'll paste it into the text in pieces with commentaries for each piece.

our $uTraffic = Triceps::Unit->new("uTraffic") or die "$!";

# one packet's header
our $rtPacket = Triceps::RowType->new(
    time => "int64", # packet's timestamp, microseconds
    local_ip => "string", # string to make easier to read
    remote_ip => "string", # string to make easier to read
    local_port => "int32",
    remote_port => "int32",
    bytes => "int32", # size of the packet
) or die "$!";

# an hourly summary
our $rtHourly = Triceps::RowType->new(
    time => "int64", # hour's timestamp, microseconds
    local_ip => "string", # string to make easier to read
    remote_ip => "string", # string to make easier to read
    bytes => "int64", # bytes sent in an hour
) or die "$!";

The router to the Internet provider forwards us the packet header information from all the packets that go though the outside link. The local_ip is always the address of a machine on our network, remote_ip outside our network, no matter in which direction the packet went. With a slow and expensive connection, we want to know two things: First, that the provider's billing at the end of the month is correct. Second, to be able to find out the high traffic users, and then maybe look whether that traffic was used for the business purposes or not.  This example goes up to aggregation of the hourly summaries and then stops, since the further aggregation by days and months is straightforward to do.

If there is no traffic for a while, the router is expected to periodically communicate its changing idea of time as the same kind of records but with the non-timestamp fields as NULLs. That by the way is the right way to communicate the time-based information between two machines: do not rely on any local synchronization and timeouts but have the master send the periodic time updates to the slave even if it has no data to send. The logic is then driven by the time reported by the master. A nice side effect is that the logic can also easily be replayed later, using these timestamps and without any concern of the real time. If there are multiple masters, the slave would have to order the data coming from them according to the timestamps, thus synchronizing them together.

The hourly data drops the port information, and sums up the traffic between two addresses in the hour. It still has the timestamp but now this timestamp is rounded to the start of the hour:

# compute an hour-rounded timestamp
sub hourStamp # (time)
{
    return $_[0]  - ($_[0] % (1000*1000*3600));
}

Next, to the aggregation:

# the current hour stamp that keeps being updated
our $currentHour;

# aggregation handler: recalculate the summary for the last hour
sub computeHourly # (table, context, aggop, opcode, rh, state, args...)
{
    my ($table, $context, $aggop, $opcode, $rh, $state, @args) = @_;
    our $currentHour;

    # don't send the NULL record after the group becomes empty
    return if ($context->groupSize()==0
        || $opcode == &Triceps::OP_NOP);

    my $rhFirst = $context->begin();
    my $rFirst = $rhFirst->getRow();
    my $hourstamp = &hourStamp($rFirst->get("time"));

    return if ($hourstamp < $currentHour);

    if ($opcode == &Triceps::OP_DELETE) {
        $context->send($opcode, $$state) or die "$!";
        return;
    }

    my $bytes = 0;
    for (my $rhi = $rhFirst; !$rhi->isNull();
            $rhi = $context->next($rhi)) {
        $bytes += $rhi->getRow()->get("bytes");
    }

    my $res = $context->resultType()->makeRowHash(
        time => $hourstamp,
        local_ip => $rFirst->get("local_ip"),
        remote_ip => $rFirst->get("remote_ip"),
        bytes => $bytes,
    ) or die "$!";
    ${$state} = $res;
    $context->send($opcode, $res) or die "$!";
}

sub initHourly #  (@args)
{
    my $refvar;
    return \$refvar;
}

The aggregation doesn't try to optimize by being additive, to keep the example simpler. The model keeps the notion of the current hour. As soon as the hour stops being current, the aggregation for it stops. The result of that aggregation will then be kept unchanged in the hourly result table, no matter what happens to the original data.

The tables are defined and connected thusly:

# the full stats for the recent time
our $ttPackets = Triceps::TableType->new($rtPacket)
    ->addSubIndex("byHour",
        Triceps::IndexType->newPerlSorted("byHour", undef, sub {
            return &hourStamp($_[0]->get("time")) <=> &hourStamp($_[1]->get("time"));
        })
        ->addSubIndex("byIP",
            Triceps::IndexType->newHashed(key => [ "local_ip", "remote_ip" ])
            ->addSubIndex("group",
                Triceps::IndexType->newFifo()
                ->setAggregator(Triceps::AggregatorType->new(
                    $rtHourly, "aggrHourly", \&initHourly, \&computeHourly)
                )
            )
        )
    )
or die "$!";

$ttPackets->initialize() or die "$!";
our $tPackets = $uTraffic->makeTable($ttPackets,
    &Triceps::EM_CALL, "tPackets") or die "$!";

# the aggregated hourly stats, kept longer
our $ttHourly = Triceps::TableType->new($rtHourly)
    ->addSubIndex("byAggr",
        Triceps::SimpleOrderedIndex->new(
            time => "ASC", local_ip => "ASC", remote_ip => "ASC")
    )
or die "$!";

$ttHourly->initialize() or die "$!";
our $tHourly = $uTraffic->makeTable($ttHourly,
    &Triceps::EM_CALL, "tHourly") or die "$!";

# connect the tables
$tPackets->getAggregatorLabel("aggrHourly")->chain($tHourly->getInputLabel())
    or die "$!";

The table of incoming packets has a 3-level index: it starts with being sorted by the hour part of the timestamp, then goes by the ip addresses to complete the aggregation key, and then a FIFO for each aggregation group. Arguably, maybe it would have been better to include the ip addresses straight into the top-level sorting index, I don't know, and it doesn't seem worth measuring. The top-level ordering by the hour is important, it will be used to delete the rows that have become old.

The table of hourly aggregated stats uses the same kind of index, only now there is no need for a FIFO because there is only one row per this key. And the timestamp is already rounded to the hour right in the rows, so a SimpleOrderedIndex can be used without writing a manual comparison function, and the ip fields have been merged into it too.

The output of the aggregator on the packets table is connected to the input of the hourly table.

The next part has to do with displaying the result of the work:

# a template to make a label that prints the data passing through another label
sub makePrintLabel # ($print_label_name, $parent_label)
{
    my $name = shift;
    my $lbParent = shift;
    my $lb = $lbParent->getUnit()->makeLabel($lbParent->getType(), $name,
        undef, sub { # (label, rowop)
            print($_[1]->printP(), "\n");
        }) or die "$!";
    $lbParent->chain($lb) or die "$!";
    return $lb;
}

# label to print the changes to the detailed stats
makePrintLabel("lbPrintPackets", $tPackets->getOutputLabel());
# label to print the changes to the hourly stats
makePrintLabel("lbPrintHourly", $tHourly->getOutputLabel());

The printing of the result row gets reused pretty often, so I've made a simple template function for it, which would generate the label and the printing on it, and connect it where it belongs. It will probably eventually become a part of the Triceps library. That template is then used to generate the debugging printouts from both tables.

Next go a couple of helper functions:

# dump a table's contents
sub dumpTable # ($table)
{
    my $table = shift;
    for (my $rhit = $table->begin(); !$rhit->isNull(); $rhit = $rhit->next()) {
        print($rhit->getRow()->printP(), "\n");
    }
}

# how long to keep the detailed data, hours
our $keepHours = 2;

# flush the data older than $keepHours from $tPackets
sub flushOldPackets
{
    my $earliest = $currentHour - $keepHours * (1000*1000*3600);
    my $next;
    # the default iteration of $tPackets goes in the hour stamp order
    for (my $rhit = $tPackets->begin(); !$rhit->isNull(); $rhit = $next) {
        last if (&hourStamp($rhit->getRow()->get("time")) >= $earliest);
        $next = $rhit->next(); # advance before removal
        $tPackets->remove($rhit);
    }
}

The dumpTable() is a straightforward iteration through a table and print. It can be used on any table, printP() takes care of any differences.

The flushing goes through the packets table and deletes the rows that belong to an older hour than the current one or $keepHours before it. For this to work right, the rows must go in the order of the hour stamps, which the outer index "byHour" takes care of.

All the time-related logic expects that the time never goes backwards. This is a simplification to make the example shorter, a production code can not assume this.

And the final part is the main loop:

while(<STDIN>) {
    chomp;
    my @data = split(/,/); # starts with a command, then string opcode
    my $type = shift @data;
    if ($type eq "new") {
        my $rowop = $tPackets->getInputLabel()->makeRowopArray(@data)
            or die "$!";
        # update the current notion of time (simplistic)
        $currentHour = &hourStamp($rowop->getRow()->get("time"));
        if (defined($rowop->getRow()->get("local_ip"))) {
            $uTraffic->call($rowop) or die "$!";
        }
        &flushOldPackets(); # flush the packets
        $uTraffic->drainFrame(); # just in case, for completeness
    } elsif ($type eq "dumpPackets") {
        &dumpTable($tPackets);
    } elsif ($type eq "dumpHourly") {
        &dumpTable($tHourly);
    }
}

The input comes in the CSV form as a command followed by more data. If the command is "new" then the data is the opcode and data fields, as it would be sent by the router. The commands "dumpPackets" and "dumpHourly" are used to print the contents of the tables, to see, what is going on in them.

In an honest implementation there would be a separate label that would differentiate between a reported packet and just a time update from the router. Here for simplicity this logic is placed right into the main loop. On each input record it updates the model's idea of the current timestamp, then if there is a packet data, it gets processed, and finally the rows that have become too old for the new timestamp get flushed.

No comments:

Post a Comment