From: Martin Mares Date: Fri, 6 Jun 2003 23:19:19 +0000 (+0000) Subject: Flow parsing, try 0. X-Git-Url: http://mj.ucw.cz/gitweb/?a=commitdiff_plain;h=1afeb5295fb7b0824f628f5b9872d0dba1d71d46;p=netgrind.git Flow parsing, try 0. --- diff --git a/lib/Makefile b/lib/Makefile index c5190fd..1bf3eb8 100644 --- a/lib/Makefile +++ b/lib/Makefile @@ -1,7 +1,7 @@ DIRS+=lib LIBSH=obj/lib/libsh.a -LIBSH_MODS=alloc assert +LIBSH_MODS=alloc assert prime LIBSH_MOD_PATHS=$(addprefix obj/lib/,$(LIBSH_MODS)) $(LIBSH): $(addsuffix .o,$(LIBSH_MOD_PATHS)) diff --git a/lib/heap.h b/lib/heap.h new file mode 100644 index 0000000..e0b27de --- /dev/null +++ b/lib/heap.h @@ -0,0 +1,105 @@ +/* + * Sherlock Library -- Universal Heap Macros + * + * (c) 2001 Martin Mares + * + * This software may be freely distributed and used according to the terms + * of the GNU Lesser General Public License. + */ + +#define HEAP_BUBBLE_DOWN_J(heap,num,less,swap) \ + for (;;) \ + { \ + l = 2*j; \ + if (l > num) \ + break; \ + if (less(heap[j],heap[l]) && (l == num || less(heap[j],heap[l+1]))) \ + break; \ + if (l != num && less(heap[l+1],heap[l])) \ + l++; \ + swap(heap,j,l,x); \ + j = l; \ + } + +#define HEAP_BUBBLE_UP_J(heap,num,less,swap) \ + while (j > 1) \ + { \ + u = j/2; \ + if (less(heap[u], heap[j])) \ + break; \ + swap(heap,u,j,x); \ + j = u; \ + } + +#define HEAP_INIT(type,heap,num,less,swap) \ + do { \ + uns i = num; \ + uns j, l; \ + type x; \ + while (i >= 1) \ + { \ + j = i; \ + HEAP_BUBBLE_DOWN_J(heap,num,less,swap) \ + i--; \ + } \ + } while(0) + +#define HEAP_DELMIN(type,heap,num,less,swap) \ + do { \ + uns j, l; \ + type x; \ + swap(heap,1,num,x); \ + num--; \ + j = 1; \ + HEAP_BUBBLE_DOWN_J(heap,num,less,swap); \ + } while(0) + +#define HEAP_INSERT(type,heap,num,less,swap) \ + do { \ + uns j, u; \ + type x; \ + j = num; \ + HEAP_BUBBLE_UP_J(heap,num,less,swap); \ + } while(0) + +#define HEAP_INCREASE(type,heap,num,less,swap,pos) \ + do { \ + uns j, l; \ + type x; \ + j = pos; \ + HEAP_BUBBLE_DOWN_J(heap,num,less,swap); \ + } while(0) + +#define HEAP_DECREASE(type,heap,num,less,swap,pos) \ + do { \ + uns j, l; \ + type x; \ + j = pos; \ + HEAP_BUBBLE_UP_J(heap,num,less,swap); \ + } while(0) + +#define HEAP_CHANGE(type,heap,num,less,swap,pos) \ + do { \ + uns j, l, u; \ + type x; \ + j = pos; \ + if (j == 1) \ + ; \ + else if (less(heap[j], heap[j/2])) \ + HEAP_BUBBLE_UP_J(heap,num,less,swap) \ + else \ + HEAP_BUBBLE_DOWN_J(heap,num,less,swap) \ + } while(0) + +#define HEAP_DELETE(type,heap,num,less,swap,pos) \ + do { \ + uns j, l, u; \ + type x; \ + j = pos; \ + swap(heap,j,num,x); \ + num--; \ + if (less(heap[j], heap[num+1])) \ + HEAP_BUBBLE_UP_J(heap,num,less,swap) \ + else \ + HEAP_BUBBLE_DOWN_J(heap,num,less,swap); \ + } while(0) diff --git a/lib/lib.h b/lib/lib.h index 13c07d6..84d13ae 100644 --- a/lib/lib.h +++ b/lib/lib.h @@ -50,7 +50,7 @@ void assert_failed(void) NONRET; #endif #ifdef LOCAL_DEBUG -#define DBG(x,y...) log(L_DEBUG, x,##y) +#define DBG(x,y...) printf(x,##y) #else #define DBG(x,y...) do { } while(0) #endif @@ -83,4 +83,9 @@ void *xrealloc(void *, unsigned); void *xmalloc_zero(unsigned); byte *stralloc(byte *); +/* prime.c */ + +int isprime(uns); +uns nextprime(uns); + #endif diff --git a/lib/prime.c b/lib/prime.c new file mode 100644 index 0000000..9be4c41 --- /dev/null +++ b/lib/prime.c @@ -0,0 +1,78 @@ +/* + * Sherlock Library -- Prime Number Tests + * + * (c) 1997 Martin Mares + * + * This software may be freely distributed and used according to the terms + * of the GNU Lesser General Public License. + */ + +#include "lib/lib.h" + +static int /* Sequential search */ +__isprime(uns x) /* We know x != 2 && x != 3 */ +{ + uns test = 5; + + if (x == 5) + return 1; + for(;;) + { + if (!(x % test)) + return 0; + if (x / test <= test) + return 1; + test += 2; /* 6k+1 */ + if (!(x % test)) + return 0; + if (x / test <= test) + return 1; + test += 4; /* 6k-1 */ + } +} + +int +isprime(uns x) +{ + if (x < 5) + return (x == 2 || x == 3); + switch (x % 6) + { + case 1: + case 5: + return __isprime(x); + default: + return 0; + } +} + +uns +nextprime(uns x) /* Returns some prime greater than x */ +{ + x += 5 - (x % 6); /* x is 6k-1 */ + for(;;) + { + x += 2; /* 6k+1 */ + if (__isprime(x)) + return x; + x += 4; /* 6k-1 */ + if (__isprime(x)) + return x; + } +} + +#ifdef TEST + +#include +#include + +int +main(int argc, char **argv) +{ + uns k = atol(argv[1]); + printf("%d is%s prime\n", k, isprime(k) ? "" : "n't"); + printf("Next prime is %d\n", nextprime(k)); + return 0; +} + +#endif diff --git a/netgrind/netgrind.c b/netgrind/netgrind.c index a25f02a..9ef3f78 100644 --- a/netgrind/netgrind.c +++ b/netgrind/netgrind.c @@ -7,7 +7,10 @@ * of the GNU General Public License. */ +#define LOCAL_DEBUG + #include "lib/lib.h" +#include "lib/heap.h" #include "netgrind/netgrind.h" #include "netgrind/pkt.h" @@ -67,9 +70,182 @@ static inline uns tcpip_verify_checksum(uns csum) return (csum == 0xffff); } +/*** FLOW ANALYSIS ***/ + +enum close_cause { + CAUSE_CLOSE, + CAUSE_RESET, + CAUSE_TIMEOUT, + CAUSE_DOOMSDAY +}; + +struct flow; + +struct appl_hooks { + void (*open)(struct flow *f); + void (*input)(struct flow *f, int dir, struct pkt *p); + void (*close)(struct flow *f, int cause); +}; + +static void sink_open(struct flow *f) +{ +} + +static void sink_close(struct flow *f, int cause) +{ +} + +static void sink_input(struct flow *f, int dir, struct pkt *p) +{ +} + +struct appl_hooks appl_sink = { + .open = sink_open, + .input = sink_input, + .close = sink_close +}; + /*** TCP LAYER ***/ -static struct pkt_stats stat_tcp_in, stat_tcp_invalid; +static struct pkt_stats stat_tcp_in, stat_tcp_invalid, stat_tcp_badsum, stat_tcp_unmatched, + stat_tcp_on_closed; + +struct pipe { + list queue; + u32 last_acked_seq; + enum { + FLOW_IDLE, + FLOW_SYN_SENT, /* sent SYN, waiting for SYN ACK */ + FLOW_SYN_SENT_ACK, /* sent SYN ACK, waiting for first ACK */ + FLOW_ESTABLISHED, /* established state including waiting for ACK of SYN ACK */ + FLOW_FIN_SENT, /* sent FIN, waiting for its ACK */ + FLOW_CLOSED /* closed, ignoring further packets */ + } state; +}; + +struct flow { + struct flow *hash_next; + u32 saddr, daddr, sport, dport; + u32 timeout; + uns heap_pos; + struct appl_hooks *appl; + void *appl_data; + struct pipe pipe[2]; +}; + +static uns num_flows, max_flows; +static struct flow **flow_hash; +static struct flow **flow_heap; + +static uns flow_calc_hash(u32 saddr, u32 daddr, u32 sport, u32 dport) +{ + saddr = (saddr >> 16) | (saddr << 16); + daddr = (daddr >> 8) | (daddr << 24); + sport <<= 7; + dport <<= 21; + return (saddr + daddr + sport + dport) % max_flows; +} + +#define FLOW_HEAP_LESS(a,b) (a->timeout < b->timeout) +#define FLOW_HEAP_SWAP(h,a,b,t) do { t=h[a]; h[a]=h[b]; h[b]=t; h[a]->heap_pos=a; h[b]->heap_pos=b; } while(0) + +static void flow_rehash(void) +{ + uns omax = max_flows; + struct flow **ohash = flow_hash; + + if (flow_heap) + xfree(flow_heap); + if (max_flows) + max_flows = nextprime(2*max_flows); + else + max_flows = 3; + DBG("Rehashing to %d buckets\n", max_flows); + flow_hash = xmalloc_zero(sizeof(struct flow *) * max_flows); + flow_heap = xmalloc_zero(sizeof(struct flow *) * (max_flows+1)); + num_flows = 0; + for (uns i=0; ihash_next; + uns h = flow_calc_hash(f->saddr, f->daddr, f->sport, f->dport); + f->hash_next = flow_hash[h]; + flow_hash[h] = f; + flow_heap[++num_flows] = f; + f->heap_pos = num_flows; + f = n; + } + } + if (ohash) + xfree(ohash); + HEAP_INIT(struct flow *, flow_heap, num_flows, FLOW_HEAP_LESS, FLOW_HEAP_SWAP); +} + +static struct flow *flow_lookup(u32 saddr, u32 daddr, u32 sport, u32 dport) +{ + uns h = flow_calc_hash(saddr, daddr, sport, dport); + for (struct flow *f = flow_hash[h]; f; f=f->hash_next) + if (f->saddr == saddr && f->daddr == daddr && + f->sport == sport && f->dport == dport) + return f; + return NULL; +} + +static struct flow *flow_create(u32 saddr, u32 daddr, u32 sport, u32 dport) +{ + if (num_flows >= max_flows) + flow_rehash(); + uns h = flow_calc_hash(saddr, daddr, sport, dport); + struct flow *f = xmalloc_zero(sizeof(struct flow)); + f->saddr = saddr; + f->daddr = daddr; + f->sport = sport; + f->dport = dport; + f->timeout = ~0U; + f->hash_next = flow_hash[h]; + flow_hash[h] = f; + flow_heap[++num_flows] = f; + f->heap_pos = num_flows; + return f; +} + +static void flow_set_timeout(struct flow *f, u32 when) +{ + f->timeout = when; + HEAP_CHANGE(struct flow *, flow_heap, num_flows, FLOW_HEAP_LESS, FLOW_HEAP_SWAP, f->heap_pos); +} + +static uns flow_now(struct pkt *p) +{ + return p->timestamp >> 20; +} + +static void tcp_time_step(uns now) +{ + while (num_flows && flow_heap[1]->timeout <= now) + { + struct flow *f = flow_heap[1]; + HEAP_DELMIN(struct flow *, flow_heap, num_flows, FLOW_HEAP_LESS, FLOW_HEAP_SWAP); + DBG("TIMEOUT for flow %p(%d/%d)\n", f, f->pipe[0].state, f->pipe[1].state); + if (f->pipe[0].state != FLOW_CLOSED || f->pipe[1].state != FLOW_CLOSED) + f->appl->close(f, (now == ~0U) ? CAUSE_DOOMSDAY : CAUSE_TIMEOUT); + uns h = flow_calc_hash(f->saddr, f->daddr, f->sport, f->dport); + struct flow **gg = &flow_hash[h]; + for(;;) + { + ASSERT(*gg); + if (*gg == f) + { + *gg = f->hash_next; + break; + } + gg = &(*gg)->hash_next; + } + xfree(f); + } +} static void tcp_got_packet(struct iphdr *iph, struct pkt *p) { @@ -81,6 +257,9 @@ static void tcp_got_packet(struct iphdr *iph, struct pkt *p) byte proto; u16 len; } fakehdr; + uns now = flow_now(p); + + tcp_time_step(now); pkt_account(&stat_tcp_in, p); if (!(tcph = pkt_peek(p, sizeof(*tcph)))) @@ -96,20 +275,183 @@ static void tcp_got_packet(struct iphdr *iph, struct pkt *p) uns sum = tcpip_calc_checksum(&fakehdr, sizeof(fakehdr), 0); sum = tcpip_calc_checksum(p->data, pkt_len(p), sum); if (!tcpip_verify_checksum(sum)) - goto invalid; + { + pkt_account(&stat_tcp_badsum, p); + goto drop; + } + /* XXX: Check TCP options? */ + pkt_pop(p, hdrlen); + + u32 seq = ntohl(tcph->seq); + u32 ack = ntohl(tcph->ack_seq); + DBG("TCP %08x %08x %04x %04x seq=%u+%u ack=%u%s%s%s%s%s%s\n", + ntohl(iph->saddr), ntohl(iph->daddr), ntohs(tcph->source), ntohs(tcph->dest), seq, pkt_len(p), ack, + (tcph->fin ? " FIN" : ""), + (tcph->syn ? " SYN" : ""), + (tcph->rst ? " RST" : ""), + (tcph->psh ? " PSH" : ""), + (tcph->ack ? " ACK" : ""), + (tcph->urg ? " URG" : "")); + + struct flow *f; + struct pipe *a, *b; + if (f = flow_lookup(iph->saddr, iph->daddr, tcph->source, tcph->dest)) + { + a = &f->pipe[0]; + b = &f->pipe[1]; + } + else if (f = flow_lookup(iph->daddr, iph->saddr, tcph->dest, tcph->source)) + { + a = &f->pipe[1]; + b = &f->pipe[0]; + } + else + { + /* Flow not found, if it's a SYN packet, go create it */ + if (tcph->syn && !tcph->ack && !tcph->rst && !tcph->fin) + { + f = flow_create(iph->saddr, iph->daddr, tcph->source, tcph->dest); + f->appl = &appl_sink; + f->appl->open(f); + a = &f->pipe[0]; + b = &f->pipe[1]; + list_init(&a->queue); + a->last_acked_seq = seq; + a->state = FLOW_SYN_SENT; + list_init(&b->queue); + b->state = FLOW_IDLE; + DBG("\t%p NEW\n", f); + goto drop; + } + DBG("\tUnmatched\n"); + pkt_account(&stat_tcp_unmatched, p); + goto drop; + } + + DBG("\t%p(%d/%d) ", f, a->state, b->state); + if (a->state == FLOW_CLOSED && b->state == FLOW_CLOSED) + { + DBG("closed\n"); + pkt_account(&stat_tcp_on_closed, p); + goto drop; + } - /* Here we should do something with the packet */ + if (tcph->rst) + { + DBG("RESET\n"); + f->appl->close(f, CAUSE_RESET); + a->state = b->state = FLOW_CLOSED; + flow_set_timeout(f, now + 300); /* FIXME */ + goto drop; + } + + flow_set_timeout(f, now + 600); /* FIXME */ + + if (tcph->syn) + { + if (tcph->fin || pkt_len(p)) + goto inval; + if (tcph->ack) + { /* SYN ACK */ + if (b->state == FLOW_SYN_SENT && b->last_acked_seq+1 == ack) + { + DBG("SYN ACK\n"); + b->last_acked_seq = ack; + a->state = FLOW_SYN_SENT_ACK; + a->last_acked_seq = seq; + goto drop; + } + else if (b->state == FLOW_ESTABLISHED) + goto dup; + else + goto unex; + } + else + goto dup; /* otherwise SYN on already existing connection gets ignored */ + } + + if (tcph->ack) + { + if (b->state == FLOW_SYN_SENT_ACK && b->last_acked_seq+1) + { + a->state = b->state = FLOW_ESTABLISHED; + b->last_acked_seq = ack+1; + DBG("ACKED SYN, "); + } + if (b->state == FLOW_FIN_SENT && b->last_acked_seq+1 == ack) + { + b->state = FLOW_CLOSED; + if (a->state == FLOW_CLOSED) + { + DBG("CLOSED BOTH WAYS\n"); + f->appl->close(f, CAUSE_CLOSE); + flow_set_timeout(f, now + 300); /* FIXME */ + goto drop; + } + else + DBG("CLOSED ONE-WAY, "); + } + else if (b->state == FLOW_ESTABLISHED || b->state == FLOW_FIN_SENT) + DBG("ACK, "); + else if (b->state == FLOW_CLOSED) + ; + else + goto unex; + } + + if (tcph->fin) + { + if (a->state == FLOW_ESTABLISHED) + { + a->state = FLOW_FIN_SENT; + a->last_acked_seq = seq; + DBG("FIN SENT, waiting for FIN ACK, "); + } + else if (a->state == FLOW_FIN_SENT) + ; + else + goto unex; + } + + if (!pkt_len(p)) + { + DBG("EMPTY\n"); + goto drop; + } + + if (b->state == FLOW_ESTABLISHED || b->state == FLOW_FIN_SENT || b->state == FLOW_CLOSED) + { + DBG("DATA\n"); + // list_add_tail(&b->queue, &p->n); /* FIXME */ + if (pkt_len(p)) + f->appl->input(f, (a == &f->pipe[1]), p); + return; + } + else + goto unex; + + drop: pkt_free(p); return; + dup: + DBG("DUP\n"); + goto drop; + + unex: + DBG("UNEXPECTED\n"); + goto drop; + + inval: + DBG("???\n"); invalid: pkt_account(&stat_tcp_invalid, p); - pkt_free(p); + goto drop; } /*** IP LAYER ***/ -static struct pkt_stats stat_ip_in, stat_ip_invalid, stat_ip_uninteresting, stat_ip_fragmented; +static struct pkt_stats stat_ip_in, stat_ip_invalid, stat_ip_uninteresting, stat_ip_fragmented, stat_ip_badsum; static void ip_got_packet(struct pkt *p) { @@ -126,7 +468,10 @@ static void ip_got_packet(struct pkt *p) if (pkt_len(p) < hdrlen) goto invalid; if (!tcpip_verify_checksum(tcpip_calc_checksum(p->data, hdrlen, 0))) - goto invalid; + { + pkt_account(&stat_ip_badsum, p); + goto drop; + } uns len = ntohs(iph->tot_len); if (len < hdrlen || len > pkt_len(p)) goto invalid; @@ -209,6 +554,7 @@ static void got_pcap_packet(u_char *userdata UNUSED, const struct pcap_pkthdr *h } struct pkt *p = pkt_new(0, hdr->len); memcpy(pkt_append(p, hdr->len), pkt, hdr->len); + p->timestamp = (u64)hdr->ts.tv_sec * 1000000 + hdr->ts.tv_usec; link_handler(p); } @@ -221,6 +567,8 @@ int main(int argc, char **argv) if (argc != 2) die("Usage: netgrind "); + flow_rehash(); + if (!(pcap = pcap_open_offline(argv[1], errbuf))) die("Unable to open %s: %s", argv[1], errbuf); dlt = pcap_datalink(pcap); @@ -228,6 +576,7 @@ int main(int argc, char **argv) die("Don't know how to handle data link type %d", dlt); if (pcap_loop(pcap, -1, got_pcap_packet, NULL) < 0) die("Capture failed: %s", pcap_geterr(pcap)); + tcp_time_step(~0U); #if 0 struct pcap_stat stats; if (pcap_stats(pcap, &stats)) @@ -241,14 +590,16 @@ int main(int argc, char **argv) stat_link_dwarf.packets, stat_link_dwarf.bytes, stat_link_unknown.packets, stat_link_unknown.bytes, stat_link_arp.packets, stat_link_arp.bytes); - printf("IP: %Ld(%Ld) in, %Ld(%Ld) invalid, %Ld(%Ld) boring, %Ld(%Ld) fragmented\n", + printf("IP: %Ld(%Ld) in, %Ld(%Ld) invalid, %Ld(%Ld) boring, %Ld(%Ld) fragmented, %Ld(%Ld) bad checksum\n", stat_ip_in.packets, stat_ip_in.bytes, stat_ip_invalid.packets, stat_ip_invalid.bytes, stat_ip_uninteresting.packets, stat_ip_uninteresting.bytes, - stat_ip_fragmented.packets, stat_ip_fragmented.bytes); - printf("TCP: %Ld(%Ld) in, %Ld(%Ld) invalid\n", + stat_ip_fragmented.packets, stat_ip_fragmented.bytes, + stat_ip_badsum.packets, stat_ip_badsum.bytes); + printf("TCP: %Ld(%Ld) in, %Ld(%Ld) invalid, %Ld(%Ld) bad checksum\n", stat_tcp_in.packets, stat_tcp_in.bytes, - stat_tcp_invalid.packets, stat_tcp_invalid.bytes); + stat_tcp_invalid.packets, stat_tcp_invalid.bytes, + stat_tcp_badsum.packets, stat_tcp_badsum.bytes); pcap_close(pcap); return 0; } diff --git a/netgrind/pkt.h b/netgrind/pkt.h index c1e4a99..c53946e 100644 --- a/netgrind/pkt.h +++ b/netgrind/pkt.h @@ -10,8 +10,9 @@ #include "lib/lists.h" struct pkt { - struct node n; + node n; u64 timestamp; + u32 seq; byte *data, *stop, *ebuf; byte buf[0]; };