#------------------------------------------------------------------------- # TCP Stream Analysis # Use the Unsniff Scripting API to perform TCP Stream Analysis # # You may use this code freely in your commercial and non-commercial work. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. # # Copyright (c) Unleash Networks 2005, All rights reserved #---------------------------------------------------------------------------- # Tim.V Original version # Vivek 16/Apr/06 Fixes to Tims code (support flexible DUPACK count) # Better handling to RTO, add SSTHRESH chart require 'rubygems' require 'win32ole' require 'fox16' include Fox require 'UnleashCharts' include UnleashCharts # global constants USECPERSEC=1000000 INITIAL_RTO=3000000 # 3 seconds - windows xp default initial rto MAX_RTO=240000000 # 240 seconds - windows xp default max rto DUPACK_CNT=3 # num dup acks before fast retransmit (why do some OS (windows) do 2 DUPACKs instead of 3 ?) class ChartWindow < FXMainWindow attr_reader :tabBook def initialize(theapp) # base class super(theapp, "TCP Stream Analysis (Unsniff Scripting API Demo)", nil, nil, DECOR_ALL, 0,0,600,400) # main parent frame @main = FXVerticalFrame.new(self, FRAME_SUNKEN|FRAME_THICK|LAYOUT_FILL_X|LAYOUT_FILL_Y) @label = FXLabel.new(@main, "TCP Stream Details: " , nil, LAYOUT_FILL_X) @label.justify = JUSTIFY_LEFT # Main window interior @splitter = FXSplitter.new(@main, (LAYOUT_SIDE_TOP|LAYOUT_FILL_X| LAYOUT_FILL_Y|SPLITTER_VERTICAL|SPLITTER_TRACKING)) @tabBook = FXTabBook.new(@splitter, nil, 0, LAYOUT_FILL_X|LAYOUT_FILL_Y) @tableFrame = FXVerticalFrame.new(@splitter, FRAME_SUNKEN|FRAME_THICK|LAYOUT_FILL_X|LAYOUT_FILL_Y) # raw data @table = FXTable.new(@tableFrame,nil,0, TABLE_COL_SIZABLE|LAYOUT_FILL_X|LAYOUT_FILL_Y, 0,0,0,0, 10,10,10,10) @table.setTableSize(0, 4) @table.setBackColor(FXRGB(255, 255, 255)) @table.setCellColor(0, 0, FXRGB(255, 255, 255)) @table.setCellColor(0, 1, FXRGB(255, 210, 240)) @table.setCellColor(1, 0, FXRGB(240, 255, 240)) @table.setCellColor(1, 1, FXRGB(240, 240, 255)) @splitter.setSplit(0, 250) end def addChart (name, model) tab = FXTabItem.new(@tabBook, name, nil) canvasFrame = FXVerticalFrame.new(@tabBook, FRAME_SUNKEN|FRAME_THICK|LAYOUT_FILL_X|LAYOUT_FILL_Y, 0,0,0,0, 0,0,0,0) canvas = UnTimeSeriesChart.new(canvasFrame) canvas.setModel( model) end def create super show(PLACEMENT_SCREEN) end def setModel (mod) @canvas.setModel( mod) end def loadTable (mod) @table.setTableSize( mod.total_samples,8) @table.setRowText(0,"Samp #") @table.setColumnText(0,"ID") @table.setColumnText(1,"Tm") @table.setColumnText(2,"Dir") @table.setColumnText(3,"Seq") @table.setColumnText(4,"Ack") @table.setColumnText(5,"Payload") @table.setColumnText(6,"RTT") @table.setColumnText(7,"Analysis") @table.setColumnWidth(0,40) @table.setColumnWidth(1,80) @table.setColumnWidth(2,30) @table.setColumnWidth(3,100) @table.setColumnWidth(4,100) @table.setColumnWidth(5,50) @table.setColumnWidth(6,60) @table.setColumnWidth(7,260) i=0 mod.each_table_val do |item| @table.setRowText(i,(i+1).to_s) # index @table.setItemText(i,0,item.pid.to_s) # packet id @table.setItemText(i,1,item.delta_time) # delta time from first packet @table.setItemText(i,2,item.dir) # direction @table.setItemText(i,3,item.seq.to_s) # sequence @table.setItemText(i,4,item.ack.to_s) # ack number @table.setItemText(i,5,item.payload.to_s) # payload bytes @table.setItemText(i,6,item.rttupdate) # was RTT updated due to this segment @table.setItemText(i,7,item.analysis) # analysis @table.getItem(i,7).justify = FXTableItem::LEFT i=i+1 end end def setStreamDetails(stmDetails) @label.text = stmDetails end end #------------------------------------------ # TCP analysis item #------------------------------------------ class TCPAnalysisItem attr_accessor :dir attr_accessor :payload, :seq, :ack, :pid,:wnd attr_accessor :flags, :analysis attr_accessor :tm_s, :tm_us, :rtt attr_accessor :cwnd, :ssthresh attr_accessor :exp_seq def delta_time "#{@tm_s}:#{@tm_us}" end def rttupdate if @rtt "#{@rtt/1000}:#{@rtt%1000}" else " " end end def deltaUSec @tm_s*::USECPERSEC + @tm_us end end def max (x,y) if x > y x else y end end def min (x,y) if x < y x else y end end #----------------------------------------- # TCP state item #----------------------------------------- class TCPState # constants to represent TCP states TCPSTATE_SLOW_START = 0 TCPSTATE_CONGESTION_AVOIDANCE = 1 TCPSTATE_FAST_RETRANSMIT = 2 TCPSTATE_FAST_RECOVERY = 3 TCPSTATE_UNKNOWN = 4 # constants for RTO computation (RFC 2988) RTO_GRAN = 1000000 RTO_K_VALUE = 4 RTO_BETA_VALUE = 0.25 RTO_ALPHA_VALUE = 0.125 # tcp state variables attr_accessor :cwnd, :ssthresh, :wnd, :tcpstate, :mss attr_accessor :last_seq, :max_seq,:expected_seq, :rtt attr_accessor :ack_window, :last_ack attr_writer :mode attr_reader :lastsegment attr_reader :seqmap attr_reader :dackmap attr_reader :laststate attr_reader :isn_seq, :isn_ack attr_reader :retran_cnt, :dupack_cnt, :outoforder_cnt # retransmission timer related (rfc2988) attr_accessor :rto attr_reader :srtt, :rttvar def initialize @cwnd, @ssthresh,@wnd, @rtt, @mss,@rto = 0,0,0,0,0,0 @last_seq, @expected_seq, @ack_window,@last_ack,@max_seq = 0,0,0,0,0 @mode=TCPSTATE_SLOW_START @laststate = "" @seqmap = Hash.new @dackmap = Hash.new @retran_cnt,@dupack_cnt,@outoforder_cnt = 0,0,0 end # the sender driving the pipe called for each segment placed into the pipe # controlled by the feedback stream def proc_segment_forward ( seq_no, ack_no, payload_size,segment) @max_seq=max(@max_seq,seq_no) @laststate = "" @lastsegment=segment # do sequence number analysis (only if payload present) if payload_size > 0 if seq_no > @expected_seq # Early arrival (packet loss perhaps) if !@seqmap[@expected_seq] rs = rel_seq(@expected_seq) @laststate = "Retransmission (Out of order early) until seq = #{rs}" @outoforder_cnt = @outoforder_cnt + 1 end @last_seq = seq_no @expected_seq = seq_no + payload_size @seqmap[seq_no]=segment elsif seq_no < @expected_seq # Late arrival (perhaps retransmission due to lost packet) retrans_seg = @seqmap[seq_no] if retrans_seg @laststate = "Retransmission of packet #{retrans_seg.ID}" @retran_cnt = @retran_cnt + 1 else rs = rel_seq(@expected_seq) @laststate = "Retransmission (Out of order late) until seq = #{rs}" @seqmap[seq_no]=segment @outoforder_cnt = @outoforder_cnt + 1 end else # Normal case @last_seq = seq_no @expected_seq = seq_no + payload_size @seqmap[seq_no]=segment end end if payload_size > 0 && retrans_seg # adjust congestion variables based on observed retransmission # are we still in fast recovery if @dupack_cnt>=DUPACK_CNT @laststate += " (fast retransmit)" else if is_rto_expired(retrans_seg, segment) @laststate += " (timeout)" else # sometimes there is a measurement error (no dupacks, no timeouts either!) @laststate += " (timeout?)" end @ssthresh = max(2*@mss, @cwnd / 2) @cwnd = @mss @rto = min(@rto * 2, MAX_RTO) end end end # the feedback stream consists of ACKs from the far side, the type and frequencey of # acks help to throttle the sender driving the pipe def proc_segment_feedback ( seq_no, ack_no, payload_size,segment) update_expected_seq(ack_no) # if last packet seen was retran or whatever, this is an invalid ack window for RTT (Karns algo) if @laststate.length > 0 fInvalidAckWindow=true end @laststate = "" # count duplicate acks (keep running count in dupack_cnt) if payload_size==0 dak = @dackmap[ack_no] if dak @laststate = "Duplicate ACK same as packet #{dak.ID}" @dupack_cnt = @dupack_cnt + 1 else @dackmap[ack_no]=segment @dupack_cnt = 0 end end # process all acknowledgements (including duplicate acks) if dak # process duplicate ACKs if @dupack_cnt == DUPACK_CNT # at the 3-rd dup ack (ie 1 + 3 (duplicates) ) enter fast retransmit @ssthresh = max(2*@mss, @cwnd / 2) @cwnd = @ssthresh + DUPACK_CNT * @mss elsif @dupack_cnt > DUPACK_CNT # for each additional dupack, open window by 1 seg @cwnd = @cwnd + @mss end else # process a fresh ACK # if we were in FAST_RECOVERY , get out of it, restore cwnd to the recorded ssthresh if @dupack_cnt >= DUPACK_CNT @dupack_cnt=0 @dackmap.clear @cwnd = @ssthresh else # open congestion window if @cwnd <= @ssthresh # slow start open by one @cwnd = @cwnd + @mss else # congestion avoidance (open by seg^2/cwnd at most once per rtt) if can_update_ca_wnd( segment ) @cwnd = @cwnd + @mss * @mss/ @cwnd @lastupdate_ca_seg = segment end end end end # for calculating RTT (we cant compute RTT if acking > mss bytes) - due to effect of delayed acks if fInvalidAckWindow @ack_window = 0 @last_ack = ack_no else @ack_window = ack_no - @last_ack @last_ack = ack_no end end # between the original and retransmitted segments, do we have a time lag greater than # or equal to the retransmission timeout (RTO) def is_rto_expired(old_seg, new_seg) t_s = new_seg.TimestampSecs t_us = new_seg.TimestampUSecs d_s = t_s - old_seg.TimestampSecs d_us = t_us - old_seg.TimestampUSecs if d_us < 0 d_us = 1000000 + d_us d_s = d_s - 1 end delta= d_s*::USECPERSEC + d_us if delta >= @rto return true end return false end def can_update_ca_wnd(newseg) if ! @lastupdate_ca_seg return true end t_s = newseg.TimestampSecs t_us = newseg.TimestampUSecs d_s = t_s - @lastupdate_ca_seg.TimestampSecs d_us = t_us - @lastupdate_ca_seg.TimestampUSecs if d_us < 0 d_us = 1000000 + d_us d_s = d_s - 1 end causs = d_s*::USECPERSEC + d_us if causs >= @rtt return true end return false end def init_seq_ack (seq, ack) @isn_seq = seq @isn_ack = ack @expected_seq = seq end def rel_seq(seq) if ( seq < isn_seq ) print "Wrapped!! isn=#{isn} seq = #{seq}\n" return 2**32 - @isn_seq + seq else return seq - @isn_seq end end # update expected sequence - if we see an ack larger than current expected sequence, update # this will happen if 'holes' close in the recv window def update_expected_seq(seq) if seq > @expected_seq @expected_seq = seq end end def init_state(mss1) @tcpstate = TCPSTATE_SLOW_START @cwnd = mss1 @mss = mss1 @ssthresh = 65535 @rto = INITIAL_RTO @rttvar = 0 @srtt=0 print "mss = #{@mss}\n" end def update_rto_timers(newrtt) if @srtt == 0 #first measurement @srtt = newrtt @rttvar = newrtt/2 @rto = @srtt + max(RTO_GRAN, RTO_K_VALUE * @rttvar ) else #subsequent meaurement @rttvar = (1 - RTO_BETA_VALUE) * @rttvar + RTO_BETA_VALUE * ( @srtt - newrtt).abs @srtt = (1 - RTO_ALPHA_VALUE) * @srtt + RTO_ALPHA_VALUE * newrtt @rto = @srtt + max(RTO_GRAN, RTO_K_VALUE * @rttvar ) end print "Updated RTO to #{@rto}\n" end end #------------------------------------------ # Time - Base analysis class # results of analysis are stored here # far side TCP states are guessed #------------------------------------------ class TCPStreamAnalyzer # Proximity of observation point PROXIMITY_UNKNOWN=0 PROXIMITY_CLIENT= 1 PROXIMITY_SERVER= 2 # Direction DIR_UNKNOWN=0 DIR_SYN=1 DIR_SYN_ACK=2 @segmentarr # segments as they are seen @tcpStateOut # state of nearside TCP @tcpStateIn # state of farside TCP @t0_s # start time (s) @t0_us # start time (us) @sessionmss # maximum segment size negotiated for this stream @proximity # observation point (near client:sender of SYN or server:sender of SYN+ACK ) @lastsegdir # direction of last seen segment attr_reader :startTime,:endTime,:b_rttus attr_reader :summary #----------------------------------- # initialize # analyze the given TCP stream (streamid) #----------------------------------- def initialize(filename, streamid) # just read first and last in order to set the # scales unsniffDB = WIN32OLE.new("Unsniff.Database") unsniffDB.Open(filename) streamIndex = unsniffDB.StreamIndex stream = streamIndex.Item(streamid.to_i - 1) packetIndex = stream.Packets @segmentarr = Array.new @tcpStateOut = TCPState.new @tcpStateIn = TCPState.new @sessionmss = 1460 @lastsegdir = DIR_UNKNOWN # Based on 3-way handshake calc_baseline_rtt(stream) calc_isn( packetIndex.Item(2) ) calc_t0( packetIndex.Item(0) ) @sessionmss = find_mss(packetIndex.Item(1) ) @tcpStateIn.init_state(@sessionmss) @tcpStateOut.init_state(@sessionmss) # packets nPackets = packetIndex.Count idx=0 while (idx < nPackets) pkt = packetIndex.Item(idx) layer_ip = pkt.FindLayer("IP") src_ip = layer_ip.FindField("Source IP") if src_ip.value == stream.SourceAddress proc_packet(pkt,"Out") else proc_packet(pkt,"In") end idx = idx + 1 end firstPkt = packetIndex.Item(0) lastPkt = packetIndex.Item(nPackets-1) # set start timestamp @startTime = Time.at(firstPkt.TimestampSecs,firstPkt.TimestampUSecs) # set end timestamp @endTime = Time.at(lastPkt.TimestampSecs,lastPkt.TimestampUSecs) print "Start time = #{@startTime}\n" print "End time = #{@endTime}\n" makeSummary(stream) unsniffDB.Close() end # make a summary text of the results in @summ def makeSummary(stream) @summary = "Source Address: #{stream.SourceAddress} Destination Address:#{stream.DestinationAddress}\n" + "Bytes In( #{stream.InByteCount}) Out( #{stream.OutByteCount})\n" + "Max Segment Size = #{@sessionmss}\n" + "Out of orders ( #{@tcpStateIn.outoforder_cnt} / #{@tcpStateOut.outoforder_cnt} ) " + "Retransmissions ( #{@tcpStateIn.retran_cnt} / #{@tcpStateOut.retran_cnt} ) " + "Dup Acks ( #{@tcpStateIn.dupack_cnt} / #{@tcpStateOut.dupack_cnt} )" end def total_samples @segmentarr.size end # attempt to 'discover MSS' by observing the SYN-ACK-ACK def find_mss (hand2) layer_tcp = hand2.FindLayer("TCP") f_o = layer_tcp.FindField("MSS") print "the mss is #{f_o.value}\n" f_o.value.to_i end # calculate t0 used for delta timestamps def calc_t0(hand1segment) @t0_s=hand1segment.TimestampSecs @t0_us=hand1segment.TimestampUSecs end # Set the initial sequence numbers based on handshake def calc_isn(hand3segment) layer_tcp = hand3segment.FindLayer("TCP") f_seq_no = layer_tcp.FindField("Sequence No") f_ack_no = layer_tcp.FindField("Ack No") @tcpStateOut.init_seq_ack(f_seq_no.value.to_i-1, f_ack_no.value.to_i-1) @tcpStateIn.init_seq_ack(f_ack_no.value.to_i-1, f_seq_no.value.to_i-1) end def each_table_val @segmentarr.each do |item| yield item end end def each_item @segmentarr.each do |item| yield item end end #------------------------------------------ # Calculate the TCP Payload Size #------------------------------------------ def calc_payload_size( layer_tcp, layer_ip) f_o_flags = layer_tcp.FindField("Offset/Flags") f_ver_ihl = layer_ip.FindField("Header Length") f_p_len = layer_ip.FindField("Packet Length") h_len = f_ver_ihl.RawData.hex & 0x0F h_len = h_len * 4 tcp_size = f_o_flags.RawData.hex tcp_size = (tcp_size >> 12) tcp_size = tcp_size * 4 p_bytes = f_p_len.RawData.hex - h_len - tcp_size return p_bytes end #----------------------------------------------------- # Calculate relative sequence / ack number # take into account wrapped sequences #---------------------------------------------------- def rel_seq( seq, segdir) if segdir == "Out" isn = @tcpStateOut.isn_seq elsif segdir == "In" isn = @tcpStateIn.isn_seq end rel=0 if ( seq < isn ) print "Wrapped!! isn=#{isn} seq = #{seq}\n" rel = 2**32 - isn + seq else rel = seq - isn end rel_seq = rel end def rel_ack( ack, segdir) if segdir == "Out" isn = @tcpStateOut.isn_ack elsif segdir == "In" isn = @tcpStateIn.isn_ack end if ( ack < isn ) rel = 2**32 - isn + ack else rel = ack - isn end rel_ack = rel end #----------------------------------------------------- # Proc incoming packet # incoming packet - match entry # if retran or dup ack (delete sample - invalid) #----------------------------------------------------- def proc_packet( tcp_segment, segdir) layer_tcp = tcp_segment.FindLayer("TCP") f_seq_no = layer_tcp.FindField("Sequence No") f_ack_no = layer_tcp.FindField("Ack No") win_sz = layer_tcp.FindField("Window") seq_no = f_seq_no.value.to_i ack_no = f_ack_no.value.to_i layer_ip = tcp_segment.FindLayer("IP") pay_size = calc_payload_size( layer_tcp, layer_ip) if segdir=="In" @tcpStateIn.proc_segment_forward(seq_no,ack_no,pay_size,tcp_segment) @tcpStateOut.proc_segment_feedback(seq_no,ack_no,pay_size,tcp_segment) else @tcpStateOut.proc_segment_forward(seq_no,ack_no,pay_size,tcp_segment) @tcpStateIn.proc_segment_feedback(seq_no,ack_no,pay_size,tcp_segment) end #create analysis item and push it it = TCPAnalysisItem.new() it.dir = segdir it.payload = pay_size it.seq=rel_seq(seq_no,segdir) it.ack=rel_ack(ack_no,segdir) it.pid=tcp_segment.ID it.wnd=win_sz.value.to_i it.analysis=@tcpStateIn.laststate + @tcpStateOut.laststate if segdir=="In" it.cwnd = @tcpStateIn.cwnd it.ssthresh = @tcpStateIn.ssthresh else it.cwnd = @tcpStateOut.cwnd it.ssthresh = @tcpStateOut.ssthresh end set_delta_time(it,tcp_segment) # calculate and set RTT time # incoming (Server to Client) segments are candidates for observing close to client # outgoing (Client to Server) segments when observing close to server if pay_size == 0 # if acking more than 1 fullsize segment (good for RTT) if segdir == "In" && @proximity == PROXIMITY_CLIENT && @lastsegdir == DIR_SYN if ack_no > @tcpStateOut.last_seq && @tcpStateOut.ack_window > @sessionmss seg0 = @tcpStateOut.lastsegment update_rtt(it,tcp_segment,seg0) @tcpStateOut.update_rto_timers(it.rtt) end elsif segdir == "Out" && @proximity == PROXIMITY_SERVER && @lastsegdir == DIR_SYN_ACK if ack_no > @tcpStateIn.last_seq && @tcpStateOut.ack_window > @sessionmss seg0 = @tcpStateIn.lastsegment update_rtt(it,tcp_segment,seg0) @tcpStateIn.update_rto_timers(it.rtt) end end end @segmentarr << it # update last segment direction if segdir=="In" @lastsegdir = DIR_SYN_ACK elsif segdir == "Out" @lastsegdir = DIR_SYN else @lastsegdir = DIR_UNKNOWN end end #---------------------------------------------------- # Baseline RTT based on 3-way handshake #---------------------------------------------------- def calc_baseline_rtt( session) segments = session.Packets s0 = segments.Item(0) s1 = segments.Item(1) s2 = segments.Item(2) t0secs = s1.TimestampSecs - s0.TimestampSecs t0usecs = s1.TimestampUSecs - s0.TimestampUSecs t0usecs = t0usecs + 1000000*t0secs t1secs = s2.TimestampSecs - s1.TimestampSecs t1usecs = s2.TimestampUSecs - s1.TimestampUSecs t1usecs = t1usecs + 1000000*t1secs if t1usecs > t0usecs b_rtt = t1usecs @proximity = PROXIMITY_SERVER else b_rtt = t0usecs @proximity = PROXIMITY_CLIENT end @b_rttus = b_rtt @tcpStateIn.rtt = b_rtt @tcpStateOut.rtt = b_rtt print "Baseline RTT = #{b_rtt}\n" end # we might be able to update RTT # make sure you exclude out-of-order, retransmitted, or dup acked packets (simple check of flags) def update_rtt( tcpanalysisitem, seg1 , seg0) t0secs = seg1.TimestampSecs - seg0.TimestampSecs t0usecs = seg1.TimestampUSecs - seg0.TimestampUSecs if t0usecs < 0 t0usecs = ::USECPERSEC - seg0.TimestampUSecs + seg1.TimestampUSecs t0secs = t0secs - 1 end tcpanalysisitem.rtt = t0secs*::USECPERSEC + t0usecs print "RTT set to #{tcpanalysisitem.rtt}\n" end # set delta time def set_delta_time(tcpItem, segment) t_s = segment.TimestampSecs t_us = segment.TimestampUSecs d_s = segment.TimestampSecs - @t0_s d_us = segment.TimestampUSecs - @t0_us if d_us < 0 d_us = 1000000 - @t0_us + segment.TimestampUSecs d_s = d_s - 1 end tcpItem.tm_s = d_s tcpItem.tm_us = d_us end # first sequence number def min_sequence_out #@tcpStateOut.isn_seq 0 end # last sequence number seen def max_sequence_out rel_seq(@tcpStateOut.max_seq,"Out") end # first sequence number def min_sequence_in #@tcpStateIn.isn_seq 0 end # last sequence number seen def max_sequence_in rel_seq(@tcpStateIn.max_seq,"In") end # number of analysis items def item_count @segmentarr.size end # last rtt us def base_rtt_us @b_rttus end end #------------------------------------------- # Base for all chart models # ------------------------------------------ class ModelBase @base def maxval 100 end def minval 10 end def totalTimeUSecs secdiff = @base.endTime.tv_sec - @base.startTime.tv_sec usdiff = @base.endTime.tv_usec - @base.startTime.tv_usec if usdiff < 0 usdiff = usdiff + ::USECPERSEC secdiff = secdiff-1 end totalTimeUSecs = (secdiff * ::USECPERSEC) + usdiff end def startTime @base.startTime end def endTime @base.endTime end def valunits "b" end def yscalelabel "Bytes" end def xscalelabel "Time" end end #------------------------------------------ # Models for Seq # Analysis # Seq # distributed over time #------------------------------------------ class SeqNoAnalysisModel < ModelBase @direction def initialize( baseModel, dir) @base=baseModel @direction=dir end def each_val @base.each_item do |item| if (@direction == "Both") || (item.dir == @direction) tm_s = @base.startTime.tv_sec + item.tm_s tm_us = @base.startTime.tv_usec + item.tm_us if tm_us >= ::USECPERSEC tm_us = tm_us - ::USECPERSEC tm_s = tm_s +1 end if item.analysis.length == 0 yield Time.at(tm_s,tm_us), item.seq, DataPointStyle.new(DataPointStyle::POINTCHAR,"o",nil) else if item.analysis =~ /early/ yield Time.at(tm_s,tm_us), item.seq, DataPointStyle.new(DataPointStyle::LINEHEIGHT,nil,FXRGB(240,0,0)) elsif item.analysis =~ /late/ yield Time.at(tm_s,tm_us), item.seq, DataPointStyle.new(DataPointStyle::LINEHEIGHT,nil,FXRGB(0,200,250)) elsif item.analysis =~ /Dup/ yield Time.at(tm_s,tm_us), item.seq, DataPointStyle.new(DataPointStyle::LINEHEIGHT,nil,FXRGB(200,200,0)) elsif item.analysis =~ /Retran/ yield Time.at(tm_s,tm_us), item.seq, DataPointStyle.new(DataPointStyle::LINEHEIGHT,nil,FXRGB(200,0,0)) else yield Time.at(tm_s,tm_us), item.seq, DataPointStyle.new(DataPointStyle::LINEHEIGHT,"o") end end end end end def maxval if @direction == "In" @base.max_sequence_in elsif @direction == "Out" @base.max_sequence_out else max(@base.max_sequence_in,@base.max_sequence_out) end end def minval if @direction == "In" @base.min_sequence_in elsif @direction == "Out" @base.min_sequence_out else min(@base.min_sequence_in,@base.min_sequence_out) end end def valunits "" end def yscalelabel "Sequence Number" end end #------------------------------------------ # Models for Congestion Window # Cwnd over time #------------------------------------------ class CongestionModel < ModelBase @direction def initialize( baseModel, dir) @base=baseModel @direction=dir end def each_val @base.each_item do |item| if (@direction == "Both") || (item.dir == @direction) tm_s = @base.startTime.tv_sec + item.tm_s tm_us = @base.startTime.tv_usec + item.tm_us if tm_us >= ::USECPERSEC tm_us = tm_us - ::USECPERSEC tm_s = tm_s +1 end yield Time.at(tm_s,tm_us), item.cwnd, DataPointStyle.new(DataPointStyle::POINTCHAR,",",nil) #print "cwnd = #{item.cwnd}\n" end end end def maxval 65536 end def minval 0 end def valunits "b" end def yscalelabel "Congestion Window (cwnd)" end end #------------------------------------------ # Models for slow start threshold # Ssthresh over time #------------------------------------------ class SSThreshModel < ModelBase @direction def initialize( baseModel, dir) @base=baseModel @direction=dir end def each_val @base.each_item do |item| if (@direction == "Both") || (item.dir == @direction) tm_s = @base.startTime.tv_sec + item.tm_s tm_us = @base.startTime.tv_usec + item.tm_us if tm_us >= ::USECPERSEC tm_us = tm_us - ::USECPERSEC tm_s = tm_s +1 end yield Time.at(tm_s,tm_us), item.ssthresh, DataPointStyle.new(DataPointStyle::LINECHAR," ",FXRGB(200,200,0)) end end end def maxval 65536 end def minval 0 end def valunits "b" end def yscalelabel "Slow start thresh (ssthresh)" end end #---------------------------------------------------- # Models for Bytes Transferred per RTT period # Seq # distributed over time #---------------------------------------------------- class TrafficPerRTTModel < ModelBase @direction @maxbps @rttarr def initialize( baseModel, dir) @base=baseModel @direction=dir @rttarr = Array.new @maxbps=100 createBuckets end def createBuckets nItems = @base.item_count currUSecs = @base.base_rtt_us byteCount = 0 idx=0 @base.each_item do |item| if item.dir == @direction if item.deltaUSec <= currUSecs byteCount += item.payload else bitCount = byteCount * 8 if bitCount > @maxbps @maxbps = bitCount end @rttarr << bitCount byteCount = 0 currUSecs = currUSecs + @base.base_rtt_us end idx+=1 end end end def each_val tm_s = @base.startTime.tv_sec tm_us = @base.startTime.tv_usec @rttarr.each do |sample| tm_us = tm_us + @base.base_rtt_us%::USECPERSEC tm_s = tm_s + @base.base_rtt_us/::USECPERSEC yield Time.at(tm_s,tm_us), sample, DataPointStyle.new(DataPointStyle::LINEHEIGHT,"o") end end def maxval @maxbps end def minval 10 end def yscalelabel "Bps" end end #---------------------------------------------------- # Bandwidth model bps/ measured over user defined # time slices #---------------------------------------------------- class BandwidthModel < ModelBase @direction @maxbps @bwarr @sliceus def initialize( baseModel, dir, slicems ) @base=baseModel @direction=dir @bwarr = Array.new @maxbps=100 @sliceus = 1000*slicems createBuckets end def createBuckets nItems = @base.item_count currUSecs = @sliceus byteCount = 0 idx=0 @base.each_item do |item| if item.dir == @direction if item.deltaUSec <= currUSecs byteCount += item.payload else bitCount = byteCount * 8 if bitCount > @maxbps @maxbps = bitCount end @bwarr << bitCount byteCount = 0 currUSecs = currUSecs + @sliceus end idx+=1 end end end def each_val tm_s = @base.startTime.tv_sec tm_us = @base.startTime.tv_usec @bwarr.each do |sample| tm_us = tm_us + @sliceus%::USECPERSEC tm_s = tm_s + @sliceus/::USECPERSEC yield Time.at(tm_s,tm_us), sample, DataPointStyle.new(DataPointStyle::LINEHEIGHT,"o") end end def maxval @maxbps end def minval 10 end def valunits "bps" end def yscalelabel "Bps" end end #---------------------------------------------------- #Inflight Data Model # time slices vs Data in flight in bytes #---------------------------------------------------- class InflightDataModel < ModelBase @direction @maxbytes @tmarr @sliceus def initialize( baseModel, dir, slicems ) @base=baseModel @direction=dir @tmarr = Array.new @maxbytes=100 @sliceus = 1000*slicems createBuckets end def createBuckets nItems = @base.item_count currUSecs = @sliceus nsamp = 0 diff = 0 lastitem = nil byteCount = 0 idx=0 @base.each_item do |item| if item.dir == @direction if lastitem && lastitem.dir != @direction && lastitem.ack > 0 diff = diff + item.seq - lastitem.ack nsamp = nsamp + 1 end end # time to update bucket ? if item.deltaUSec > currUSecs && nsamp > 0 avgdiff = diff / nsamp if avgdiff > @maxbytes @maxbytes= avgdiff end @tmarr << avgdiff nsamp,diff = 0,0 currUSecs = currUSecs + @sliceus end lastitem = item end end def each_val tm_s = @base.startTime.tv_sec tm_us = @base.startTime.tv_usec @tmarr.each do |sample| tm_us = tm_us + @sliceus%::USECPERSEC tm_s = tm_s + @sliceus/::USECPERSEC yield Time.at(tm_s,tm_us), sample, DataPointStyle.new(DataPointStyle::LINECONNECT,"o") end end def maxval @maxbytes end def minval 1 end def valunits "B" end def yscalelabel "Bytes in flight" end end #------------------------------------------------------------------- # RTT variation # how RTT estimated and Smoothed varies over time #------------------------------------------------------------------- class RTTModel < ModelBase @maxrttus @minrttus def initialize( baseModel ) @base=baseModel @maxrttus = USECPERSEC @minrttus = 0 calcRange end def calcRange @maxrttus = 2 * @base.b_rttus @minrttus = 0 @base.each_item do |item| rtt = item.rtt if rtt && rtt > 0 @minrttus = min(@minrttus, rtt) @maxrttus = max(@maxrttus,rtt) end end end def each_val tm_s = @base.startTime.tv_sec tm_us = @base.startTime.tv_usec # baseline RTT shown as a line tm_es = @base.endTime.tv_sec tm_eus = @base.endTime.tv_usec # draw baseline RTT line with legend yield Time.at(tm_s,tm_us), (110*@base.b_rttus)/100, DataPointStyle.new(DataPointStyle::POINTCHAR,"Baseline RTT") yield Time.at(tm_s,tm_us), @base.b_rttus, DataPointStyle.new(DataPointStyle::POINTCHAR,"") yield Time.at(tm_es,tm_eus), @base.b_rttus, DataPointStyle.new(DataPointStyle::LINECONNECT,"", FXRGB(192,192,192)) # rtt samples @base.each_item do |item| tm_s = @base.startTime.tv_sec + item.tm_s tm_us = @base.startTime.tv_usec + item.tm_us if tm_us >= ::USECPERSEC tm_us = tm_us - ::USECPERSEC tm_s = tm_s +1 end if item.rtt && item.rtt > 0 yield Time.at(tm_s,tm_us), item.rtt, DataPointStyle.new(DataPointStyle::LINECHAR,"X") end end end def maxval @maxrttus end def minval @minrttus end def valunits "us" end def yscalelabel "RTT Sample" end end #------------------------------------------ # Models for Window Size Analysis # Seq # distributed over time #------------------------------------------ class WindowSizeModel < ModelBase def initialize( baseModel) @base=baseModel end def each_val @base.each_item do |item| tm_s = @base.startTime.tv_sec + item.tm_s tm_us = @base.startTime.tv_usec + item.tm_us if tm_us >= ::USECPERSEC tm_us = tm_us - ::USECPERSEC tm_s = tm_s +1 end if (item.dir == "In") yield Time.at(tm_s,tm_us), item.wnd, DataPointStyle.new(DataPointStyle::POINTCHAR,",",FXRGB(0,0,250)) elsif (item.dir == "Out") yield Time.at(tm_s,tm_us), item.wnd, DataPointStyle.new(DataPointStyle::POINTCHAR,",",FXRGB(250,250,0)) end end end def maxval 65536 end def minval 1 end def valunits "b" end def yscalelabel "Advertised Window [ blue=by server(dest),yellow=by client(src) ]" end end USAGE = "anastm <capture-filename> <stream-number> <In/Out>" if ARGV.length != 3 puts USAGE exit 1 end # Analyze stream and store them into time buckets baseAnalysis = TCPStreamAnalyzer.new(ARGV[0],ARGV[1]) # A new Fox Application and MainWindow object theApp = FXApp.new theMainWindow = ChartWindow.new(theApp) useDirection = ARGV[2] theMainWindow.addChart( "Seq analysis" , SeqNoAnalysisModel.new(baseAnalysis , useDirection.capitalize ) ) theMainWindow.addChart( "Traffic / RTT" , TrafficPerRTTModel.new(baseAnalysis , useDirection.capitalize ) ) theMainWindow.addChart( "Inflight Data" , InflightDataModel.new(baseAnalysis , useDirection.capitalize, 1000 ) ) theMainWindow.addChart( "Bandwidth" , BandwidthModel.new(baseAnalysis , useDirection, 700) ) theMainWindow.addChart( "RTT Samples" , RTTModel.new(baseAnalysis) ) theMainWindow.addChart( "Congestion Analysis" , CongestionModel.new(baseAnalysis,useDirection.capitalize) ) theMainWindow.addChart( "SSThresh" , SSThreshModel.new(baseAnalysis,useDirection.capitalize) ) theMainWindow.addChart( "Window" , WindowSizeModel.new(baseAnalysis) ) theMainWindow.loadTable( baseAnalysis) theMainWindow.setStreamDetails( baseAnalysis.summary) # Run application theApp.create theMainWindow.show theApp.run