These unnecessary computations can be prevented by disconnecting the daily data from the hourly data, and performing the manual aggregation only when the day changes. Then these complicated computations would happen only once a day, not many times per second.
Here is how the last example gets amended to produce the once-a-day daily summaries of all the traffic (as before, in multiple snippets, this time showing only the added or changed code):
# an hourly summary, now with the day extracted our $rtHourly = Triceps::RowType->new( time => "int64", # hour's timestamp, microseconds day => "string", # in YYYYMMDD local_ip => "string", # string to make easier to read remote_ip => "string", # string to make easier to read bytes => "int64", # bytes sent in an hour ) or die "$!"; # a daily summary: just all traffic for that day our $rtDaily = Triceps::RowType->new( day => "string", # in YYYYMMDD bytes => "int64", # bytes sent in an hour ) or die "$!";
The hourly rows get an extra field, for convenient aggregation by day. This notion of the day is calculated as:
# compute the date of a timestamp, a string YYYYMMDD sub dateStamp # (time) { my @ts = gmtime($_[0]/1000000); # microseconds to seconds return sprintf("%04d%02d%02d", $ts[5]+1900, $ts[4]+1, $ts[3]); }
The calculation is done in GMT, so that the code produces the same result all around the world. If you're doing this kind of project for real, you may want to use the local time zone instead.
The packets-to-hour aggregation function now populates this extra field:
sub computeHourly # (table, context, aggop, opcode, rh, state, args...) { ... my $res = $context->resultType()->makeRowHash( time => $hourstamp, day => &dateStamp($hourstamp), ... }
And the model keeps a global notion of the current day in addition to the current hour:
# the current day stamp that keeps being updated our $currentDay;
The hourly table type grows an extra secondary index for the manuall aggregation into the daily data:
# the aggregated hourly stats, kept longer our $ttHourly = Triceps::TableType->new($rtHourly) ->addSubIndex("byAggr", Triceps::SimpleOrderedIndex->new( time => "ASC", local_ip => "ASC", remote_ip => "ASC") ) ->addSubIndex("byDay", Triceps::IndexType->newHashed(key => [ "day" ]) ->addSubIndex("group", Triceps::IndexType->newFifo() ) ) or die "$!"; # remember the daily secondary index type our $idxHourlyByDay = $ttHourly->findSubIndex("byDay") or die "$!"; our $idxHourlyByDayGroup = $idxHourlyByDay->findSubIndex("group") or die "$!";
And a table for the daily data is created but not connected to any other tables:
# the aggregated daily stats, kept even longer our $ttDaily = Triceps::TableType->new($rtDaily) ->addSubIndex("byDay", Triceps::IndexType->newHashed(key => [ "day" ]) ) or die "$!"; $ttDaily->initialize() or die "$!"; our $tDaily = $uTraffic->makeTable($ttDaily, &Triceps::EM_CALL, "tDaily") or die "$!"; # label to print the changes to the daily stats makePrintLabel("lbPrintDaily", $tDaily->getOutputLabel());
Instead it gets updated manually with the function that performs the manual aggregation of the hourly data:
sub computeDay # ($dateStamp) { our $uTraffic; my $bytes = 0; my $rhFirst = $tHourly->findIdxBy($idxHourlyByDay, day => $_[0]) or die "$!"; my $rhEnd = $rhFirst->nextGroupIdx($idxHourlyByDayGroup) or die "$!"; for (my $rhi = $rhFirst; !$rhi->same($rhEnd); $rhi = $rhi->nextIdx($idxHourlyByDay)) { $bytes += $rhi->getRow()->get("bytes"); } $uTraffic->makeHashCall($tDaily->getInputLabel(), "OP_INSERT", day => $_[0], bytes => $bytes, ) or die "$!"; }
This logic doesn't check whether any data for that day existed. If none did, it would just produce a row with traffic of 0 bytes anyway. This is different from the normal aggregation but here may actually be desirable: it shows for sure that yes, the aggregation for that day really did happen.
The main loop then gets extended with the day-keeping logic and with the extra command to dump the daily data:
while(<STDIN>) { chomp; my @data = split(/,/); # starts with a command, then string opcode my $type = shift @data; if ($type eq "new") { my $rowop = $tPackets->getInputLabel()->makeRowopArray(@data) or die "$!"; # update the current notion of time (simplistic) $currentHour = &hourStamp($rowop->getRow()->get("time")); my $lastDay = $currentDay; $currentDay = &dateStamp($currentHour); if (defined($rowop->getRow()->get("local_ip"))) { $uTraffic->call($rowop) or die "$!"; } &flushOldPackets(); # flush the packets if (defined $lastDay && $lastDay ne $currentDay) { &computeDay($lastDay); # manual aggregation } $uTraffic->drainFrame(); # just in case, for completeness } elsif ($type eq "dumpPackets") { &dumpTable($tPackets); } elsif ($type eq "dumpHourly") { &dumpTable($tHourly); } elsif ($type eq "dumpDaily") { &dumpTable($tDaily); } }
It now maintains the current day, and after the packet computation is done, looks, whether the day has changed. If it did, it calls the manual aggregation of the last day.
And here is an example of its work:
new,OP_INSERT,1330886011000000,1.2.3.4,5.6.7.8,2000,80,100 tPackets.out OP_INSERT time="1330886011000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="100" tHourly.out OP_INSERT time="1330884000000000" day="20120304" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="100" new,OP_INSERT,1330886012000000,1.2.3.4,5.6.7.8,2000,80,50 tPackets.out OP_INSERT time="1330886012000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="50" tHourly.out OP_DELETE time="1330884000000000" day="20120304" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="100" tHourly.out OP_INSERT time="1330884000000000" day="20120304" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="150" new,OP_INSERT,1330889811000000,1.2.3.4,5.6.7.8,2000,80,300 tPackets.out OP_INSERT time="1330889811000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="300" tHourly.out OP_INSERT time="1330887600000000" day="20120304" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="300"
So far all the 3 packets are for the same day, and nothing new has happened.
new,OP_INSERT,1330972411000000,1.2.3.5,5.6.7.9,3000,80,200 tPackets.out OP_INSERT time="1330972411000000" local_ip="1.2.3.5" remote_ip="5.6.7.9" local_port="3000" remote_port="80" bytes="200" tHourly.out OP_INSERT time="1330970400000000" day="20120305" local_ip="1.2.3.5" remote_ip="5.6.7.9" bytes="200" tPackets.out OP_DELETE time="1330886011000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="100" tPackets.out OP_DELETE time="1330886012000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="50" tPackets.out OP_DELETE time="1330889811000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="300" tDaily.out OP_INSERT day="20120304" bytes="450"
When a packet for the next day arrives, it has three effects: (1) inserts the packet data as usual, (2) finds that the previous packet data is obsolete and flushes it (without upsetting the hourly summaries) and (3) finds that the day has changed and performs the manual aggregation of last day's hourly data into daily.
new,OP_INSERT,1331058811000000 tPackets.out OP_DELETE time="1330972411000000" local_ip="1.2.3.5" remote_ip="5.6.7.9" local_port="3000" remote_port="80" bytes="200" tDaily.out OP_INSERT day="20120305" bytes="200"
A time update for the yet next day flushes out the previous day's detailed packets and again builds the daily summary of that day.
new,OP_INSERT,1331145211000000 tDaily.out OP_INSERT day="20120306" bytes="0"
Yet another day's time roll now has no old data to delete (since none arrived in the previous day) but still produces the daily summary of 0 bytes.
dumpDaily day="20120305" bytes="200" day="20120304" bytes="450" day="20120306" bytes="0"
This shows the eventual contents of the daily summaries. The order of the rows is fairly random, because of the hashed index. Note that the hourly summaries weren't flushed either, they are all still there too. If you want them eventually deleted after some time, you would need to provide more of the manual logic for that.
No comments:
Post a Comment