~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~ [ freetext search ] ~ [ file search ] ~

Linux Cross Reference
cvs/emstar/devel/sympathy_devel/sympathy_events.c


  1 /* ex: set tabstop=2 expandtab shiftwidth=2 softtabstop=2: */
  2 /*
  3  *
  4  * Copyright (c) 2003 The Regents of the University of California.  All 
  5  * rights reserved.
  6  *
  7  * Redistribution and use in source and binary forms, with or without
  8  * modification, are permitted provided that the following conditions
  9  * are met:
 10  *
 11  * - Redistributions of source code must retain the above copyright
 12  *   notice, this list of conditions and the following disclaimer.
 13  *
 14  * - Neither the name of the University nor the names of its
 15  *   contributors may be used to endorse or promote products derived
 16  *   from this software without specific prior written permission.
 17  *
 18  * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS''
 19  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
 20  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
 21  * PARTICULAR  PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR
 22  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
 23  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
 24  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
 25  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
 26  * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 27  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 28  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 29  *
 30  */
 31 
 32  /*
 33   *
 34   * Author: Nithya Ramanthan
 35   *
 36   */
 37 
 38 #include "sympathy_app.h"
 39 #include <tos-contrib/multihop/tos/lib/MultihopTypes.h>
 40 #include <tos-contrib/include/protocols.h>
 41 
 42 #define NUM_RB_ELEMS RING_BUF_NUM_ELEMS
 43 
 44 uint8_t ssize = sizeof(Spkt_t);
 45 uint8_t rsize = sizeof(RBelem_t);
 46 uint8_t seqno = 0;
 47 
 48 sevent_ctx_t ctx = {
 49     sevent_req_period_secs: EVENT_REQUEST_PERIOD_MSEC/1000,
 50 };
 51 
 52 /*** STATIC FUNCTIONS ***/
 53 
 54 static int find_event_info(if_id_t src)
 55 {
 56   int i;
 57   for (i = 0; i < ctx.num_srcs; i++)
 58   {
 59     if (ctx.events[i].addr == src) return i;
 60   }
 61   return -1;
 62 }
 63 
 64 static
 65 void print_rb_elem(buf_t* tbuf, RBelem_t* rb)
 66 {
 67   bufprintf(tbuf, "%d   %s   %d    ", rb->time, 
 68     (rb->Sevent.type == NEIGHBOR_ADDED ? "add-neigh":
 69             rb->Sevent.type == NEIGHBOR_DROPPED ? "remv-neigh":
 70               rb->Sevent.type == CHANGE_IN_INGRESS ? "change-ingress": 
 71                 rb->Sevent.type == CHANGE_IN_EGRESS ? "change-egress": 
 72                   rb->Sevent.type == NEXT_HOP_CHANGED ? "nh-changed": 
 73                     rb->Sevent.type == NEXT_HOP_QUALITY_CHANGED ? "nh-qual-changed":
 74                       rb->Sevent.type == NEXT_HOP_NOT_NEIGHBOR ? "nh-not-neighbor":
 75          ""),
 76       rb->Sevent.address);
 77 
 78   if ((rb->Sevent.type == NEIGHBOR_ADDED) 
 79     || (rb->Sevent.type == NEIGHBOR_DROPPED)) 
 80   { 
 81     new_neighbor_t* n = (new_neighbor_t *) &(rb->Sevent.data[0]);
 82     bufprintf(tbuf, "%d\t %d\t %d\n", n->ingress, n->egress, 
 83         n->num_current_neighbors);
 84   }
 85   else if ((rb->Sevent.type == CHANGE_IN_INGRESS)  
 86      || (rb->Sevent.type == CHANGE_IN_EGRESS)) 
 87   { 
 88     lq_node_t* n = (lq_node_t *) &(rb->Sevent.data[0]);
 89     bufprintf(tbuf, "%d    %d\n", n->quality, n->change);
 90   }
 91 
 92   else if ((rb->Sevent.type == NEXT_HOP_CHANGED)  
 93     || (rb->Sevent.type == NEXT_HOP_QUALITY_CHANGED) 
 94     || (rb->Sevent.type == NEXT_HOP_NOT_NEIGHBOR)) 
 95   { 
 96     nh_node_t* n = (nh_node_t *) &(rb->Sevent.data[0]);
 97     bufprintf(tbuf, "%d    %d\n", n->quality, n->sink);
 98   }
 99   else bufprintf(tbuf, "\n"); 
100 }
101 
102 static
103 int get_minutes_since_event(struct timeval* time)
104 {
105   struct timeval t;
106   gettimeofday(&t, NULL);
107   return ((int) (t.tv_sec - time->tv_sec) / 60);
108 }
109 
110 static
111 void print_events(buf_t* buf, sevent_rb_t* stat)
112 {
113   int i, minutes = get_minutes_since_event(&stat->event_time);
114 
115   bufprintf(buf, "\n**************\nNode %d, number-elems: %d\n", 
116       stat->addr, stat->rb_num_elems);
117   bufprintf(buf, "Packets-rx from node: %d\n", 
118       stat->pkts_rx_from_node);
119   bufprintf(buf, "Number requests tx: %d\n", ctx.num_sevent_reqs);
120   bufprintf(buf, "Minutes since last pkt-rx: %d mins\n", minutes);
121   bufprintf(buf, "\nTimestamp of last pkt rx: %d mins\n\n", 
122       stat->last_event_time_mins);
123 
124   if (stat->rb_num_elems > 0)
125   {
126     bufprintf(buf, "Time\t Type\t\t Addr\t Quality\t Change/Sink\n");
127     bufprintf(buf, "Time\t Type\t\t Addr\t Ingress\t Egress\t Num-neighbors\n");
128     for (i = 0; i < stat->rb_num_elems; i++)
129     {
130       print_rb_elem(buf, &stat->rb[i]);
131     }
132   }
133 }
134 
135 int sympathy_print_events(status_context_t *info, buf_t *buf)
136 {
137   int i;
138         for (i = 0; i < ctx.num_srcs; i++)
139         {
140     print_events(buf, &ctx.events[i]);
141         }
142   return STATUS_MSG_COMPLETE;
143 }
144 static int abs_subtract(int a, int b)
145 {
146   if (a > b) return a - b;
147   return b - a;
148 }
149 
150 static RBelem_t* get_last_element(sevent_rb_t* stat)
151 {
152         /* Get a pointer to the last-element. If the head-ptr is 0, then
153          * it is either because the rb is empty or because it has wrapped
154          * around */
155   if (stat->rb_head == 0)
156   {
157           if (stat->rb_full) return(&stat->rb[NUM_RB_ELEMS - 1]);
158     else return NULL;
159   }
160         /* No wrap-around has ocurred yet, but the buffer isn't full */
161   else return(&stat->rb[stat->rb_head - 1]);
162         return NULL;
163 }
164 
165 /**** Public functions ****/
166 static
167 int add_event_info(if_id_t node)
168 {
169   int sctr = ctx.num_srcs;
170   if (node == 0) return -1;
171 
172   ctx.events[sctr].addr = node;
173   elog(LOG_DEBUG(1), "Inserting node %d in slot %d\n", node, sctr);
174   inc_mod(&ctx.node_ctr, 1, SMAX_SRCS);
175   ctx.num_srcs = min(ctx.num_srcs + 1, SMAX_SRCS);
176   return sctr;
177 }
178 
179 static void update_node(sevent_rb_t* stat)
180 {
181   sympathy_status_info_t info = {
182     pkt_tx:ctx.num_sevent_reqs,
183     pkt_rx:stat->pkts_rx_from_node,
184     pkt_expected_rx:ctx.num_sevent_reqs,
185     time_rx_last_pkt: stat->event_time,
186   };
187   link_pkt_t pkt = {
188     dst: {
189       id: LINK_BROADCAST
190     },
191     src: {
192       id: stat->addr
193     },
194     ext_type: SCOMP_STATS6,
195     type: SSINK_UPDATE,
196   };
197   buf_t* buf = buf_new();
198 
199   bufcpy(buf, &pkt, sizeof(link_pkt_t));
200   bufcpy(buf, &info, sizeof(sympathy_status_info_t));
201 
202   elog(LOG_DEBUG(1), "updating for node %d\n", stat->addr);
203   if (lu_send(ctx.update_lu, (link_pkt_t *)buf->buf, buf->len - sizeof(link_pkt_t)) < 0)
204   {
205      elog(LOG_ERR, "Unable to send pkt: %m!\n");
206   }
207   buf_free(buf);
208 }
209 
210 /* Returns the number of elements copied over */
211 uint16_t handle_event_data(if_id_t node_addr, Spkt_t* mpkt, uint8_t len)
212 {
213   uint8_t num_copied = 0;
214         uint8_t start_index = 0; /* start copying from pkt's rb */
215         uint8_t num_elem_in_pkt = (len - ssize) / rsize;
216         RBelem_t* last_elem, *tmpE = NULL;
217         int diff, sctr, last_t = 0;
218   sevent_rb_t* stat;
219 
220   elog(LOG_DEBUG(1), "from node %d, packet %dB\n", node_addr, len);
221   if ((sctr = find_event_info(node_addr)) < 0)
222   {
223     sctr = add_event_info(node_addr);
224   }
225   stat = &ctx.events[sctr];
226   if (!stat) return 0;
227   gettimeofday(&stat->event_time, NULL);
228   stat->pkts_rx_from_node++;
229 
230         if ((last_elem = get_last_element(stat))) last_t = last_elem->time;
231         while (start_index < num_elem_in_pkt)
232         {
233                 tmpE = (RBelem_t *)&mpkt->data[start_index*rsize];
234 
235                 /* Only take the difference if this is NOT a wrap-around case */
236                 if (last_t >= tmpE->time) diff = 0;
237                 else diff = abs_subtract(tmpE->time, last_t);
238 
239                 if (diff > TIME_DIFF_THRESH) 
240                 {
241                         elog(LOG_DEBUG(1), "WARN, skip elem w/time %d, its %d > than last-time (%d)\n",
242                tmpE->time, diff, last_t);
243                 } 
244 
245                 /* Insert elements if ring-buffer is empty, or if the time
246                  * of the element is > than the last-time - after passing the
247                  * "garbage check" */
248                 else if (tmpE->time >= last_t)
249                 {
250                         memcpy(&stat->rb[stat->rb_head], &mpkt->data[rsize * start_index], rsize);
251                         inc_mod(&(stat->rb_head), 1, NUM_RB_ELEMS);
252       if (stat->rb_head == 0) stat->rb_full = 1;
253                         last_t = tmpE->time;
254       stat->last_event_time_mins = tmpE->time / 60;
255                         num_copied++;
256                 }
257     start_index++;
258         }
259 
260   elog(LOG_DEBUG(1), "copying %d elements\n", num_copied);
261         if (num_copied > 0)  
262         {
263                 stat->rb_num_elems = min16(stat->rb_num_elems + num_copied, NUM_RB_ELEMS);
264     update_node(stat);
265         }
266   return (num_copied);
267 }
268 
269 static
270 int sevent_send_pkt(Spkt_t* pkt, void* data, ssize_t data_len)
271 {
272   buf_t* buf = buf_new();
273   int retval = 0;
274   link_pkt_t hdr = {
275     dst: {
276       id: LINK_BROADCAST,
277     },
278     ext_type: MULTIHOP_SYMPATHY,
279     type: MULTIHOP_SYMPATHY
280   };
281 
282   bufcpy(buf, &hdr, sizeof(hdr));
283   bufcpy(buf, pkt, sizeof(Spkt_t));
284   if (data_len > 0) bufcpy(buf, data, data_len);
285   if ((lu_send(ctx.mh_link, (link_pkt_t*)buf->buf, buf->len - sizeof(link_pkt_t))) < 0)
286   {
287     elog(LOG_ERR, "Unable to send packet of type: %d!\n", pkt->type);
288     retval = -1;
289     goto done;
290   }
291   elog(LOG_DEBUG(1), "will send packet with type: %d, len: %d\n", pkt->type,
292     buf->len - sizeof(link_pkt_t));   
293 done:
294   buf_free(buf);
295   return retval;
296 }
297 
298 static int send_event_request(void* data, int interval, g_event_t* event)
299 {
300   Spkt_t pkt = {
301     type: SREQUEST_EVENTS 
302   };   
303   Srequest_t req = {
304     seqno: seqno++,
305   };
306   int i;
307   sevent_rb_t* stat;
308 
309   elog(LOG_DEBUG(1), "num-srcs: %d, sending seqno: %d\n", 
310       ctx.num_srcs, req.seqno);
311   if (sevent_send_pkt(&pkt, (void *)&req, sizeof(Srequest_t)) == 0) 
312   {
313     ctx.num_sevent_reqs++;
314     for (i = 0; i < ctx.num_srcs; i++)
315     {
316       stat = &ctx.events[i];
317       elog(LOG_DEBUG(1), "%d: calling update for node %d\n", i, stat->addr);
318       update_node(stat);
319     }
320     elog(LOG_DEBUG(1),"Sending sympathy request!\n");
321   }
322   else
323   {
324     elog(LOG_ERR, "Unable to send sympathy request!\n");
325   }
326   return EVENT_RENEW;
327 }
328 
329 void change_sreq_period(int period_msecs)
330 {
331   elog(LOG_DEBUG(1), "New period_msecs = %d\n", period_msecs);
332   if (ctx.sreq_timer) 
333   {
334     if (period_msecs == 0) g_event_destroy(ctx.sreq_timer);
335     g_timer_resched(ctx.sreq_timer, period_msecs);
336     return;
337   }
338   elog(LOG_DEBUG(1), "Adding new timer!\n");
339   if (period_msecs > 0) g_timer_add(period_msecs, send_event_request, NULL, NULL, &ctx.sreq_timer);
340 }
341 
342 static
343 int sevent_command(char* cmd, size_t size, void* data)
344 {
345   parser_state_t *ps = misc_parse_init(cmd, MISC_PARSE_COLON_SCHEME);
346 
347   elog(LOG_DEBUG(1), "command: %s\n", cmd);
348   /* parse message */
349   while (misc_parse_next_kvp(ps) >= 0)
350   {
351     if(strcmp(ps->key, "sreq_period") == 0 )
352     {
353       elog(LOG_DEBUG(1), "key: %s is sreq-period so new period is %d\n", 
354           ps->key, atoi(ps->value));
355       ctx.sevent_req_period_secs = atoi(ps->value);
356       change_sreq_period(ctx.sevent_req_period_secs * 1000);
357     }
358     else elog(LOG_ERR, "Unrecognized key: %s\n", ps->key);
359   }
360   misc_parse_cleanup(ps);
361   return EVENT_RENEW;
362 }
363 
364 char* sevent_usage(void *data)
365 {
366   buf_t buf = {};
367   bufprintf(&buf, "To change period of sympathy-requests write (pd=0 => stopsending sympathy requests):\n" \
368       "\t\t sreq_period=<int>[seconds] \n");
369   bufprintf(&buf, "Current value sreq_period=%d[secs]\n", ctx.sevent_req_period_secs);
370   return buf.buf;
371 }
372 
373 static
374 int sevent_rcv_pkt(lu_context_t *link, link_pkt_t *hdr, ssize_t data_len)
375 {
376   buf_t* buf2 = buf_new();
377   multihop_hdr_t* mhdr = (multihop_hdr_t *)hdr->data;
378   Saddr_t src = mhdr->src;
379   Spkt_t* pkt;
380   void* tpkt;
381   ssize_t pkt_len = data_len - sizeof(multihop_hdr_t);
382 
383   elog(LOG_DEBUG(1),"spkt-len: %d, ext_type: %d, hdr-type: %d len = %d, src = %d\n", 
384       pkt_len, hdr->ext_type, hdr->type, data_len, src);
385   misc_hexdump_to_buf(buf2, (char *)pkt, pkt_len, "NR");
386   elog(LOG_DEBUG(1), "DATA SPKT: %s\n", buf2->buf);
387 
388   /* NR get correct type info! */
389   if (hdr->ext_type == 4) 
390   {
391     tpkt = mhdr + 1;
392     pkt = (Spkt_t *)tpkt;
393     if (mhdr->type == MULTIHOP_SYMPATHY)
394     {
395       if (pkt->type == SEVENT_RESPONSE) handle_event_data(src, pkt, pkt_len);
396     }
397   }
398   buf_free(buf2);
399   return EVENT_RENEW;
400 }
401 
402 void usage(char *name)
403 {
404    misc_print_usage
405       (name, "-U <link> -W<link>",
406             "  --uses <link>: specify the link to send sympathy-requests\n  --watch <link>: specify the link to listen to packets on");
407       exit(1);
408 } 
409 
410 static
411 void sevent_shutdown(void *data)
412 {
413     elog(LOG_NOTICE, "sympathy-sevent shutting down");
414       exit(0);
415 }
416 
417 static
418 int status_receive(lu_context_t* lu, link_pkt_t* pkt, ssize_t data_len)
419 {
420   elog(LOG_DEBUG(1), "src %d, Pkt w type/comp: %d/%d, datalen: %d\n",
421        pkt->src.id, pkt->type, pkt->ext_type, data_len);
422   /* NR todo eventually, need to decode packets that we receive of a
423    * certain comp-stats type! should be specified on the command-line!*/
424 
425   g_free(pkt);
426   return EVENT_RENEW;
427 }
428 
429 int main(int argc, char** argv)
430 {
431   char devname[100];
432   emrun_opts_t emrun_opts = {
433     shutdown: sevent_shutdown,
434     silent: 1
435   };
436 
437         status_dev_opts_t sopts = {
438     device: {
439                 },
440         };
441 
442   cmd_dev_opts_t copts = {
443     device: {
444                 },
445     command: sevent_command,
446     usage: sevent_usage,
447         };
448 
449   lu_opts_t opts = {
450     opts: {
451        pkt_type: MULTIHOP_SYMPATHY
452     },
453   };
454 
455   lu_opts_t opts_status = {
456     opts: {
457       name: SYMPATHY_STATS_DEVICE
458     },
459     receive: status_receive
460   };
461 
462   misc_init(&argc, argv, CVSTAG);
463 
464   /* Open command device */
465   sprintf(devname, "%s/%s", SSTATUS_APP_BASE, SEVENTS_CMD);
466   copts.device.devname = sim_path(devname);
467   if (g_command_dev(&copts, NULL) < 0) {
468     elog(LOG_ERR, "Unable to open command-device: %s\n", copts.device.devname);
469   }
470 
471   if (lu_open(&opts_status, &ctx.update_lu) < 0) {
472     elog(LOG_DEBUG(1), "Unable to open link: %s:%m\n", 
473         opts_status.opts.name);
474   }
475 
476         /* Open status-devices */
477   sprintf(devname, "%s/%s", SSTATUS_APP_BASE,SEVENTS_STATUS);
478   sopts.device.devname = sim_path(devname);
479         sopts.printable = sympathy_print_events;
480   if (g_status_dev(&sopts, NULL) < 0) {
481                 elog(LOG_ERR, "Unable to open status device: %s\n", sopts.device.devname);
482         }
483 
484   /* Open link-device */
485   if (!(opts.opts.name = link_parse_uses(&argc, argv, NULL))) {
486     elog(LOG_WARNING, "WARNING: WILL NOT be sending sympathy-requests because didn't specify -U!\n");
487   }
488   else {
489      elog(LOG_DEBUG(1), "link-name U: %s\n", opts.opts.name);
490      if (lu_open(&opts, &ctx.mh_link) < 0) {
491        elog(LOG_CRIT, "Unable to open link %s: %m", link_name(&(opts.opts), NULL));
492       usage(argv[0]);
493      }
494   }
495 
496   opts.opts.pkt_type = PKT_TYPE_TOS;
497   opts.receive = sevent_rcv_pkt;
498   if (!(opts.opts.name = misc_parse_out_option(&argc, argv, "watch", 'W'))) {
499     elog(LOG_CRIT, "Please specify --watch!");
500     usage(argv[0]);
501   } 
502   else {
503      elog(LOG_DEBUG(1), "link-name W: %s\n", opts.opts.name);
504      if (lu_open(&opts, NULL) < 0) {
505        elog(LOG_CRIT, "Unable to open link %s: %m", link_name(&(opts.opts), NULL));
506        exit(0);
507      }
508   }
509 
510   change_sreq_period(ctx.sevent_req_period_secs * 1000);
511   if (NUM_RB_ELEMS == 0)
512   {
513           elog(LOG_ERR, "num-rb-elems (%d) must be > 0!!\n", NUM_RB_ELEMS);
514     exit(1);
515   }
516 
517   emrun_init(&emrun_opts);
518   g_main();
519   return 0;
520 }
521 

~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~ [ freetext search ] ~ [ file search ] ~

This page was automatically generated by the LXR engine.
Visit the LXR main site for more information.