Thursday, April 11, 2013

Multithreaded pipeline, part 3

The rest of this example might be easier to understand by looking at an example of a run first. The lines starting with a "!" are the copies of the input lines that ReaderMain() sends and PrintMain() faithfully prints.

input.packet are the rows that reach the PrintMain on the "print" label (remember, "input" is the name with which it imports its input nexus). input.hourly is the data aggregated by the hour intervals (and also by the IP addresses, dropping the port information), and input.daily further aggregates it per day (and again per the IP addresses). The timestamps in the hourly and daily rows are rounded down to the start of the hour or day.

And the lines without any prefixes are the dumps of the table contents that again reach the PrintMain() through the "print" label:

! new,OP_INSERT,1330886011000000,1.2.3.4,5.6.7.8,2000,80,100
input.packet OP_INSERT time="1330886011000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="100"
input.hourly OP_INSERT time="1330884000000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="100"
input.daily OP_INSERT time="1330819200000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="100"
! new,OP_INSERT,1330886012000000,1.2.3.4,5.6.7.8,2000,80,50
input.packet OP_INSERT time="1330886012000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="50"
input.hourly OP_DELETE time="1330884000000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="100"
input.daily OP_DELETE time="1330819200000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="100"
input.hourly OP_INSERT time="1330884000000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="150"
input.daily OP_INSERT time="1330819200000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="150"
! new,OP_INSERT,1330889612000000,1.2.3.4,5.6.7.8,2000,80,150
input.packet OP_INSERT time="1330889612000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="150"
input.hourly OP_INSERT time="1330887600000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="150"
input.daily OP_DELETE time="1330819200000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="150"
input.daily OP_INSERT time="1330819200000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="300"
! new,OP_INSERT,1330889811000000,1.2.3.4,5.6.7.8,2000,80,300
input.packet OP_INSERT time="1330889811000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="300"
input.hourly OP_DELETE time="1330887600000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="150"
input.daily OP_DELETE time="1330819200000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="300"
input.daily OP_INSERT time="1330819200000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="150"
input.hourly OP_INSERT time="1330887600000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="450"
input.daily OP_DELETE time="1330819200000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="150"
input.daily OP_INSERT time="1330819200000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="600"
! new,OP_INSERT,1330972411000000,1.2.3.5,5.6.7.9,3000,80,200
input.packet OP_INSERT time="1330972411000000" local_ip="1.2.3.5" remote_ip="5.6.7.9" local_port="3000" remote_port="80" bytes="200"
input.hourly OP_INSERT time="1330970400000000" local_ip="1.2.3.5" remote_ip="5.6.7.9" bytes="200"
input.daily OP_INSERT time="1330905600000000" local_ip="1.2.3.5" remote_ip="5.6.7.9" bytes="200"
! new,OP_INSERT,1331058811000000
input.packet OP_INSERT time="1331058811000000"
! new,OP_INSERT,1331145211000000
input.packet OP_INSERT time="1331145211000000"
! dump,packets
time="1330886011000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="100"
time="1330886012000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="50"
time="1330889612000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="150"
time="1330889811000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="300"
time="1330972411000000" local_ip="1.2.3.5" remote_ip="5.6.7.9" local_port="3000" remote_port="80" bytes="200"
! dump,hourly
time="1330884000000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="150"
time="1330887600000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="450"
time="1330970400000000" local_ip="1.2.3.5" remote_ip="5.6.7.9" bytes="200"
! dump,daily
time="1330819200000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="600"
time="1330905600000000" local_ip="1.2.3.5" remote_ip="5.6.7.9" bytes="200"

Note that the order of the lines is completely nice and predictable, nothing goes out of order. Each nexus preserves the order of the rows put into it, and the fact that there is only one writer per nexus and that every thread is fed from only one nexus, avoids the races.

No comments:

Post a Comment