Monday, March 5, 2012

Time-limited propagation, part 3

In the last example if we keep aggregating the data from hours to days and the days to months, then the arrival of each new packet will update the whole chain. Sometimes that's what we want, sometimes it isn't. The daily stats might be fed into some complicated computation, with nobody looking at the results until the next day. In this situation each packet will trigger these complicated computations, for no good reason, since nobody cares of them until the day is closed.

These unnecessary computations can be prevented by disconnecting the daily data from the hourly data, and performing the manual aggregation only when the day changes. Then these complicated computations would happen only once a day, not many times per second.

Here is how the last example gets amended to produce the once-a-day daily summaries of all the traffic (as before, in multiple snippets, this time showing only the added or changed code):

# an hourly summary, now with the day extracted
our $rtHourly = Triceps::RowType->new(
    time => "int64", # hour's timestamp, microseconds
    day => "string", # in YYYYMMDD
    local_ip => "string", # string to make easier to read
    remote_ip => "string", # string to make easier to read
    bytes => "int64", # bytes sent in an hour
) or die "$!";

# a daily summary: just all traffic for that day
our $rtDaily = Triceps::RowType->new(
    day => "string", # in YYYYMMDD
    bytes => "int64", # bytes sent in an hour
) or die "$!";

The hourly rows get an extra field, for convenient aggregation by day. This notion of the day is calculated as:

# compute the date of a timestamp, a string YYYYMMDD
sub dateStamp # (time)
{
    my @ts = gmtime($_[0]/1000000); # microseconds to seconds
    return sprintf("%04d%02d%02d", $ts[5]+1900, $ts[4]+1, $ts[3]);
}

The calculation is done in GMT, so that the code produces the same result all around the world. If you're doing this kind of project for real, you may want to use the local time zone instead.

The packets-to-hour aggregation function now populates this extra field:

sub computeHourly # (table, context, aggop, opcode, rh, state, args...)
{
...
    my $res = $context->resultType()->makeRowHash(
        time => $hourstamp,
        day => &dateStamp($hourstamp),
 ...
}

And the model keeps a global notion of the current day in addition to the current hour:

# the current day stamp that keeps being updated
our $currentDay;

The hourly table type grows an extra secondary index for the manuall aggregation into the daily data:

# the aggregated hourly stats, kept longer
our $ttHourly = Triceps::TableType->new($rtHourly)
    ->addSubIndex("byAggr",
        Triceps::SimpleOrderedIndex->new(
            time => "ASC", local_ip => "ASC", remote_ip => "ASC")
    )
    ->addSubIndex("byDay",
        Triceps::IndexType->newHashed(key => [ "day" ])
        ->addSubIndex("group",
            Triceps::IndexType->newFifo()
        )
    )
or die "$!";

# remember the daily secondary index type
our $idxHourlyByDay = $ttHourly->findSubIndex("byDay")
    or die "$!";
our $idxHourlyByDayGroup = $idxHourlyByDay->findSubIndex("group")
    or die "$!";

And a table for the daily data is created but not connected to any other tables:

# the aggregated daily stats, kept even longer
our $ttDaily = Triceps::TableType->new($rtDaily)
    ->addSubIndex("byDay",
        Triceps::IndexType->newHashed(key => [ "day" ])
    )
or die "$!";

$ttDaily->initialize() or die "$!";
our $tDaily = $uTraffic->makeTable($ttDaily,
    &Triceps::EM_CALL, "tDaily") or die "$!";

# label to print the changes to the daily stats
makePrintLabel("lbPrintDaily", $tDaily->getOutputLabel());

Instead it gets updated manually with the function that performs the manual aggregation of the hourly data:

sub computeDay # ($dateStamp)
{
    our $uTraffic;
    my $bytes = 0;

    my $rhFirst = $tHourly->findIdxBy($idxHourlyByDay, day => $_[0])
        or die "$!";
    my $rhEnd = $rhFirst->nextGroupIdx($idxHourlyByDayGroup)
        or die "$!";
    for (my $rhi = $rhFirst;
            !$rhi->same($rhEnd); $rhi = $rhi->nextIdx($idxHourlyByDay)) {
        $bytes += $rhi->getRow()->get("bytes");
    }
    $uTraffic->makeHashCall($tDaily->getInputLabel(), "OP_INSERT",
        day => $_[0],
        bytes => $bytes,
    ) or die "$!";
}

This logic doesn't check whether any data for that day existed. If none did, it would just produce a row with traffic of 0 bytes anyway. This is different from the normal aggregation but here may actually be desirable: it shows for sure that yes, the aggregation for that day really did happen.

The main loop then gets extended with the day-keeping logic and with the extra command to dump the daily data:

while(<STDIN>) {
    chomp;
    my @data = split(/,/); # starts with a command, then string opcode
    my $type = shift @data;
    if ($type eq "new") {
        my $rowop = $tPackets->getInputLabel()->makeRowopArray(@data)
            or die "$!";
        # update the current notion of time (simplistic)
        $currentHour = &hourStamp($rowop->getRow()->get("time"));
        my $lastDay = $currentDay;
        $currentDay = &dateStamp($currentHour);
        if (defined($rowop->getRow()->get("local_ip"))) {
            $uTraffic->call($rowop) or die "$!";
        }
        &flushOldPackets(); # flush the packets
        if (defined $lastDay && $lastDay ne $currentDay) {
            &computeDay($lastDay); # manual aggregation
        }
        $uTraffic->drainFrame(); # just in case, for completeness
    } elsif ($type eq "dumpPackets") {
        &dumpTable($tPackets);
    } elsif ($type eq "dumpHourly") {
        &dumpTable($tHourly);
    } elsif ($type eq "dumpDaily") {
        &dumpTable($tDaily);
    }
}

It now maintains the current day, and after the packet computation is done, looks, whether the day has changed. If it did, it calls the manual aggregation of the last day.

And here is an example of its work:

new,OP_INSERT,1330886011000000,1.2.3.4,5.6.7.8,2000,80,100
tPackets.out OP_INSERT time="1330886011000000" local_ip="1.2.3.4"
 remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="100" 
tHourly.out OP_INSERT time="1330884000000000" day="20120304"
 local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="100" 
new,OP_INSERT,1330886012000000,1.2.3.4,5.6.7.8,2000,80,50
tPackets.out OP_INSERT time="1330886012000000" local_ip="1.2.3.4"
 remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="50" 
tHourly.out OP_DELETE time="1330884000000000" day="20120304"
 local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="100" 
tHourly.out OP_INSERT time="1330884000000000" day="20120304"
 local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="150" 
new,OP_INSERT,1330889811000000,1.2.3.4,5.6.7.8,2000,80,300
tPackets.out OP_INSERT time="1330889811000000" local_ip="1.2.3.4"
 remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="300" 
tHourly.out OP_INSERT time="1330887600000000" day="20120304"
 local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="300" 

So far all the 3 packets are for the same day, and nothing new has happened.

new,OP_INSERT,1330972411000000,1.2.3.5,5.6.7.9,3000,80,200
tPackets.out OP_INSERT time="1330972411000000" local_ip="1.2.3.5"
 remote_ip="5.6.7.9" local_port="3000" remote_port="80" bytes="200" 
tHourly.out OP_INSERT time="1330970400000000" day="20120305"
 local_ip="1.2.3.5" remote_ip="5.6.7.9" bytes="200" 
tPackets.out OP_DELETE time="1330886011000000" local_ip="1.2.3.4"
 remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="100" 
tPackets.out OP_DELETE time="1330886012000000" local_ip="1.2.3.4"
 remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="50" 
tPackets.out OP_DELETE time="1330889811000000" local_ip="1.2.3.4"
 remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="300" 
tDaily.out OP_INSERT day="20120304" bytes="450"

When a packet for the next day arrives, it has three effects: (1) inserts the packet data as usual, (2) finds that the previous packet data is obsolete and flushes it (without upsetting the hourly summaries) and (3) finds that the day has changed and performs the manual aggregation of last day's hourly data into daily.

new,OP_INSERT,1331058811000000
tPackets.out OP_DELETE time="1330972411000000" local_ip="1.2.3.5"
 remote_ip="5.6.7.9" local_port="3000" remote_port="80" bytes="200" 
tDaily.out OP_INSERT day="20120305" bytes="200" 

A time update for the yet next day flushes out the previous day's detailed packets and again builds the daily summary of that day.

new,OP_INSERT,1331145211000000
tDaily.out OP_INSERT day="20120306" bytes="0" 

Yet another day's time roll now has no old data to delete (since none arrived in the previous day) but still produces the daily summary of 0 bytes.

dumpDaily
day="20120305" bytes="200" 
day="20120304" bytes="450" 
day="20120306" bytes="0" 

This shows the eventual contents of the daily summaries. The order of the rows is fairly random, because of the hashed index. Note that the hourly summaries weren't flushed either, they are all still there too. If you want them eventually deleted after some time, you would need to provide more of the manual logic for that.

No comments:

Post a Comment