1 /* ex: set tabstop=2 expandtab shiftwidth=2 softtabstop=2: */
2 /*
3 *
4 * Copyright (c) 2003 The Regents of the University of California. All
5 * rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * - Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 *
14 * - Neither the name of the University nor the names of its
15 * contributors may be used to endorse or promote products derived
16 * from this software without specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS''
19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
20 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
21 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR
22 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
26 * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 *
30 */
31
32 /*
33 *
34 * Author: Nithya Ramanthan
35 *
36 */
37
38 #include "sympathy_app.h"
39 #include <tos-contrib/multihop/tos/lib/MultihopTypes.h>
40 #include <tos-contrib/include/protocols.h>
41
42 #define NUM_RB_ELEMS RING_BUF_NUM_ELEMS
43
44 uint8_t ssize = sizeof(Spkt_t);
45 uint8_t rsize = sizeof(RBelem_t);
46 uint8_t seqno = 0;
47
48 sevent_ctx_t ctx = {
49 sevent_req_period_secs: EVENT_REQUEST_PERIOD_MSEC/1000,
50 };
51
52 /*** STATIC FUNCTIONS ***/
53
54 static int find_event_info(if_id_t src)
55 {
56 int i;
57 for (i = 0; i < ctx.num_srcs; i++)
58 {
59 if (ctx.events[i].addr == src) return i;
60 }
61 return -1;
62 }
63
64 static
65 void print_rb_elem(buf_t* tbuf, RBelem_t* rb)
66 {
67 bufprintf(tbuf, "%d %s %d ", rb->time,
68 (rb->Sevent.type == NEIGHBOR_ADDED ? "add-neigh":
69 rb->Sevent.type == NEIGHBOR_DROPPED ? "remv-neigh":
70 rb->Sevent.type == CHANGE_IN_INGRESS ? "change-ingress":
71 rb->Sevent.type == CHANGE_IN_EGRESS ? "change-egress":
72 rb->Sevent.type == NEXT_HOP_CHANGED ? "nh-changed":
73 rb->Sevent.type == NEXT_HOP_QUALITY_CHANGED ? "nh-qual-changed":
74 rb->Sevent.type == NEXT_HOP_NOT_NEIGHBOR ? "nh-not-neighbor":
75 ""),
76 rb->Sevent.address);
77
78 if ((rb->Sevent.type == NEIGHBOR_ADDED)
79 || (rb->Sevent.type == NEIGHBOR_DROPPED))
80 {
81 new_neighbor_t* n = (new_neighbor_t *) &(rb->Sevent.data[0]);
82 bufprintf(tbuf, "%d\t %d\t %d\n", n->ingress, n->egress,
83 n->num_current_neighbors);
84 }
85 else if ((rb->Sevent.type == CHANGE_IN_INGRESS)
86 || (rb->Sevent.type == CHANGE_IN_EGRESS))
87 {
88 lq_node_t* n = (lq_node_t *) &(rb->Sevent.data[0]);
89 bufprintf(tbuf, "%d %d\n", n->quality, n->change);
90 }
91
92 else if ((rb->Sevent.type == NEXT_HOP_CHANGED)
93 || (rb->Sevent.type == NEXT_HOP_QUALITY_CHANGED)
94 || (rb->Sevent.type == NEXT_HOP_NOT_NEIGHBOR))
95 {
96 nh_node_t* n = (nh_node_t *) &(rb->Sevent.data[0]);
97 bufprintf(tbuf, "%d %d\n", n->quality, n->sink);
98 }
99 else bufprintf(tbuf, "\n");
100 }
101
102 static
103 int get_minutes_since_event(struct timeval* time)
104 {
105 struct timeval t;
106 gettimeofday(&t, NULL);
107 return ((int) (t.tv_sec - time->tv_sec) / 60);
108 }
109
110 static
111 void print_events(buf_t* buf, sevent_rb_t* stat)
112 {
113 int i, minutes = get_minutes_since_event(&stat->event_time);
114
115 bufprintf(buf, "\n**************\nNode %d, number-elems: %d\n",
116 stat->addr, stat->rb_num_elems);
117 bufprintf(buf, "Packets-rx from node: %d\n",
118 stat->pkts_rx_from_node);
119 bufprintf(buf, "Number requests tx: %d\n", ctx.num_sevent_reqs);
120 bufprintf(buf, "Minutes since last pkt-rx: %d mins\n", minutes);
121 bufprintf(buf, "\nTimestamp of last pkt rx: %d mins\n\n",
122 stat->last_event_time_mins);
123
124 if (stat->rb_num_elems > 0)
125 {
126 bufprintf(buf, "Time\t Type\t\t Addr\t Quality\t Change/Sink\n");
127 bufprintf(buf, "Time\t Type\t\t Addr\t Ingress\t Egress\t Num-neighbors\n");
128 for (i = 0; i < stat->rb_num_elems; i++)
129 {
130 print_rb_elem(buf, &stat->rb[i]);
131 }
132 }
133 }
134
135 int sympathy_print_events(status_context_t *info, buf_t *buf)
136 {
137 int i;
138 for (i = 0; i < ctx.num_srcs; i++)
139 {
140 print_events(buf, &ctx.events[i]);
141 }
142 return STATUS_MSG_COMPLETE;
143 }
144 static int abs_subtract(int a, int b)
145 {
146 if (a > b) return a - b;
147 return b - a;
148 }
149
150 static RBelem_t* get_last_element(sevent_rb_t* stat)
151 {
152 /* Get a pointer to the last-element. If the head-ptr is 0, then
153 * it is either because the rb is empty or because it has wrapped
154 * around */
155 if (stat->rb_head == 0)
156 {
157 if (stat->rb_full) return(&stat->rb[NUM_RB_ELEMS - 1]);
158 else return NULL;
159 }
160 /* No wrap-around has ocurred yet, but the buffer isn't full */
161 else return(&stat->rb[stat->rb_head - 1]);
162 return NULL;
163 }
164
165 /**** Public functions ****/
166 static
167 int add_event_info(if_id_t node)
168 {
169 int sctr = ctx.num_srcs;
170 if (node == 0) return -1;
171
172 ctx.events[sctr].addr = node;
173 elog(LOG_DEBUG(1), "Inserting node %d in slot %d\n", node, sctr);
174 inc_mod(&ctx.node_ctr, 1, SMAX_SRCS);
175 ctx.num_srcs = min(ctx.num_srcs + 1, SMAX_SRCS);
176 return sctr;
177 }
178
179 static void update_node(sevent_rb_t* stat)
180 {
181 sympathy_status_info_t info = {
182 pkt_tx:ctx.num_sevent_reqs,
183 pkt_rx:stat->pkts_rx_from_node,
184 pkt_expected_rx:ctx.num_sevent_reqs,
185 time_rx_last_pkt: stat->event_time,
186 };
187 link_pkt_t pkt = {
188 dst: {
189 id: LINK_BROADCAST
190 },
191 src: {
192 id: stat->addr
193 },
194 ext_type: SCOMP_STATS6,
195 type: SSINK_UPDATE,
196 };
197 buf_t* buf = buf_new();
198
199 bufcpy(buf, &pkt, sizeof(link_pkt_t));
200 bufcpy(buf, &info, sizeof(sympathy_status_info_t));
201
202 elog(LOG_DEBUG(1), "updating for node %d\n", stat->addr);
203 if (lu_send(ctx.update_lu, (link_pkt_t *)buf->buf, buf->len - sizeof(link_pkt_t)) < 0)
204 {
205 elog(LOG_ERR, "Unable to send pkt: %m!\n");
206 }
207 buf_free(buf);
208 }
209
210 /* Returns the number of elements copied over */
211 uint16_t handle_event_data(if_id_t node_addr, Spkt_t* mpkt, uint8_t len)
212 {
213 uint8_t num_copied = 0;
214 uint8_t start_index = 0; /* start copying from pkt's rb */
215 uint8_t num_elem_in_pkt = (len - ssize) / rsize;
216 RBelem_t* last_elem, *tmpE = NULL;
217 int diff, sctr, last_t = 0;
218 sevent_rb_t* stat;
219
220 elog(LOG_DEBUG(1), "from node %d, packet %dB\n", node_addr, len);
221 if ((sctr = find_event_info(node_addr)) < 0)
222 {
223 sctr = add_event_info(node_addr);
224 }
225 stat = &ctx.events[sctr];
226 if (!stat) return 0;
227 gettimeofday(&stat->event_time, NULL);
228 stat->pkts_rx_from_node++;
229
230 if ((last_elem = get_last_element(stat))) last_t = last_elem->time;
231 while (start_index < num_elem_in_pkt)
232 {
233 tmpE = (RBelem_t *)&mpkt->data[start_index*rsize];
234
235 /* Only take the difference if this is NOT a wrap-around case */
236 if (last_t >= tmpE->time) diff = 0;
237 else diff = abs_subtract(tmpE->time, last_t);
238
239 if (diff > TIME_DIFF_THRESH)
240 {
241 elog(LOG_DEBUG(1), "WARN, skip elem w/time %d, its %d > than last-time (%d)\n",
242 tmpE->time, diff, last_t);
243 }
244
245 /* Insert elements if ring-buffer is empty, or if the time
246 * of the element is > than the last-time - after passing the
247 * "garbage check" */
248 else if (tmpE->time >= last_t)
249 {
250 memcpy(&stat->rb[stat->rb_head], &mpkt->data[rsize * start_index], rsize);
251 inc_mod(&(stat->rb_head), 1, NUM_RB_ELEMS);
252 if (stat->rb_head == 0) stat->rb_full = 1;
253 last_t = tmpE->time;
254 stat->last_event_time_mins = tmpE->time / 60;
255 num_copied++;
256 }
257 start_index++;
258 }
259
260 elog(LOG_DEBUG(1), "copying %d elements\n", num_copied);
261 if (num_copied > 0)
262 {
263 stat->rb_num_elems = min16(stat->rb_num_elems + num_copied, NUM_RB_ELEMS);
264 update_node(stat);
265 }
266 return (num_copied);
267 }
268
269 static
270 int sevent_send_pkt(Spkt_t* pkt, void* data, ssize_t data_len)
271 {
272 buf_t* buf = buf_new();
273 int retval = 0;
274 link_pkt_t hdr = {
275 dst: {
276 id: LINK_BROADCAST,
277 },
278 ext_type: MULTIHOP_SYMPATHY,
279 type: MULTIHOP_SYMPATHY
280 };
281
282 bufcpy(buf, &hdr, sizeof(hdr));
283 bufcpy(buf, pkt, sizeof(Spkt_t));
284 if (data_len > 0) bufcpy(buf, data, data_len);
285 if ((lu_send(ctx.mh_link, (link_pkt_t*)buf->buf, buf->len - sizeof(link_pkt_t))) < 0)
286 {
287 elog(LOG_ERR, "Unable to send packet of type: %d!\n", pkt->type);
288 retval = -1;
289 goto done;
290 }
291 elog(LOG_DEBUG(1), "will send packet with type: %d, len: %d\n", pkt->type,
292 buf->len - sizeof(link_pkt_t));
293 done:
294 buf_free(buf);
295 return retval;
296 }
297
298 static int send_event_request(void* data, int interval, g_event_t* event)
299 {
300 Spkt_t pkt = {
301 type: SREQUEST_EVENTS
302 };
303 Srequest_t req = {
304 seqno: seqno++,
305 };
306 int i;
307 sevent_rb_t* stat;
308
309 elog(LOG_DEBUG(1), "num-srcs: %d, sending seqno: %d\n",
310 ctx.num_srcs, req.seqno);
311 if (sevent_send_pkt(&pkt, (void *)&req, sizeof(Srequest_t)) == 0)
312 {
313 ctx.num_sevent_reqs++;
314 for (i = 0; i < ctx.num_srcs; i++)
315 {
316 stat = &ctx.events[i];
317 elog(LOG_DEBUG(1), "%d: calling update for node %d\n", i, stat->addr);
318 update_node(stat);
319 }
320 elog(LOG_DEBUG(1),"Sending sympathy request!\n");
321 }
322 else
323 {
324 elog(LOG_ERR, "Unable to send sympathy request!\n");
325 }
326 return EVENT_RENEW;
327 }
328
329 void change_sreq_period(int period_msecs)
330 {
331 elog(LOG_DEBUG(1), "New period_msecs = %d\n", period_msecs);
332 if (ctx.sreq_timer)
333 {
334 if (period_msecs == 0) g_event_destroy(ctx.sreq_timer);
335 g_timer_resched(ctx.sreq_timer, period_msecs);
336 return;
337 }
338 elog(LOG_DEBUG(1), "Adding new timer!\n");
339 if (period_msecs > 0) g_timer_add(period_msecs, send_event_request, NULL, NULL, &ctx.sreq_timer);
340 }
341
342 static
343 int sevent_command(char* cmd, size_t size, void* data)
344 {
345 parser_state_t *ps = misc_parse_init(cmd, MISC_PARSE_COLON_SCHEME);
346
347 elog(LOG_DEBUG(1), "command: %s\n", cmd);
348 /* parse message */
349 while (misc_parse_next_kvp(ps) >= 0)
350 {
351 if(strcmp(ps->key, "sreq_period") == 0 )
352 {
353 elog(LOG_DEBUG(1), "key: %s is sreq-period so new period is %d\n",
354 ps->key, atoi(ps->value));
355 ctx.sevent_req_period_secs = atoi(ps->value);
356 change_sreq_period(ctx.sevent_req_period_secs * 1000);
357 }
358 else elog(LOG_ERR, "Unrecognized key: %s\n", ps->key);
359 }
360 misc_parse_cleanup(ps);
361 return EVENT_RENEW;
362 }
363
364 char* sevent_usage(void *data)
365 {
366 buf_t buf = {};
367 bufprintf(&buf, "To change period of sympathy-requests write (pd=0 => stopsending sympathy requests):\n" \
368 "\t\t sreq_period=<int>[seconds] \n");
369 bufprintf(&buf, "Current value sreq_period=%d[secs]\n", ctx.sevent_req_period_secs);
370 return buf.buf;
371 }
372
373 static
374 int sevent_rcv_pkt(lu_context_t *link, link_pkt_t *hdr, ssize_t data_len)
375 {
376 buf_t* buf2 = buf_new();
377 multihop_hdr_t* mhdr = (multihop_hdr_t *)hdr->data;
378 Saddr_t src = mhdr->src;
379 Spkt_t* pkt;
380 void* tpkt;
381 ssize_t pkt_len = data_len - sizeof(multihop_hdr_t);
382
383 elog(LOG_DEBUG(1),"spkt-len: %d, ext_type: %d, hdr-type: %d len = %d, src = %d\n",
384 pkt_len, hdr->ext_type, hdr->type, data_len, src);
385 misc_hexdump_to_buf(buf2, (char *)pkt, pkt_len, "NR");
386 elog(LOG_DEBUG(1), "DATA SPKT: %s\n", buf2->buf);
387
388 /* NR get correct type info! */
389 if (hdr->ext_type == 4)
390 {
391 tpkt = mhdr + 1;
392 pkt = (Spkt_t *)tpkt;
393 if (mhdr->type == MULTIHOP_SYMPATHY)
394 {
395 if (pkt->type == SEVENT_RESPONSE) handle_event_data(src, pkt, pkt_len);
396 }
397 }
398 buf_free(buf2);
399 return EVENT_RENEW;
400 }
401
402 void usage(char *name)
403 {
404 misc_print_usage
405 (name, "-U <link> -W<link>",
406 " --uses <link>: specify the link to send sympathy-requests\n --watch <link>: specify the link to listen to packets on");
407 exit(1);
408 }
409
410 static
411 void sevent_shutdown(void *data)
412 {
413 elog(LOG_NOTICE, "sympathy-sevent shutting down");
414 exit(0);
415 }
416
417 static
418 int status_receive(lu_context_t* lu, link_pkt_t* pkt, ssize_t data_len)
419 {
420 elog(LOG_DEBUG(1), "src %d, Pkt w type/comp: %d/%d, datalen: %d\n",
421 pkt->src.id, pkt->type, pkt->ext_type, data_len);
422 /* NR todo eventually, need to decode packets that we receive of a
423 * certain comp-stats type! should be specified on the command-line!*/
424
425 g_free(pkt);
426 return EVENT_RENEW;
427 }
428
429 int main(int argc, char** argv)
430 {
431 char devname[100];
432 emrun_opts_t emrun_opts = {
433 shutdown: sevent_shutdown,
434 silent: 1
435 };
436
437 status_dev_opts_t sopts = {
438 device: {
439 },
440 };
441
442 cmd_dev_opts_t copts = {
443 device: {
444 },
445 command: sevent_command,
446 usage: sevent_usage,
447 };
448
449 lu_opts_t opts = {
450 opts: {
451 pkt_type: MULTIHOP_SYMPATHY
452 },
453 };
454
455 lu_opts_t opts_status = {
456 opts: {
457 name: SYMPATHY_STATS_DEVICE
458 },
459 receive: status_receive
460 };
461
462 misc_init(&argc, argv, CVSTAG);
463
464 /* Open command device */
465 sprintf(devname, "%s/%s", SSTATUS_APP_BASE, SEVENTS_CMD);
466 copts.device.devname = sim_path(devname);
467 if (g_command_dev(&copts, NULL) < 0) {
468 elog(LOG_ERR, "Unable to open command-device: %s\n", copts.device.devname);
469 }
470
471 if (lu_open(&opts_status, &ctx.update_lu) < 0) {
472 elog(LOG_DEBUG(1), "Unable to open link: %s:%m\n",
473 opts_status.opts.name);
474 }
475
476 /* Open status-devices */
477 sprintf(devname, "%s/%s", SSTATUS_APP_BASE,SEVENTS_STATUS);
478 sopts.device.devname = sim_path(devname);
479 sopts.printable = sympathy_print_events;
480 if (g_status_dev(&sopts, NULL) < 0) {
481 elog(LOG_ERR, "Unable to open status device: %s\n", sopts.device.devname);
482 }
483
484 /* Open link-device */
485 if (!(opts.opts.name = link_parse_uses(&argc, argv, NULL))) {
486 elog(LOG_WARNING, "WARNING: WILL NOT be sending sympathy-requests because didn't specify -U!\n");
487 }
488 else {
489 elog(LOG_DEBUG(1), "link-name U: %s\n", opts.opts.name);
490 if (lu_open(&opts, &ctx.mh_link) < 0) {
491 elog(LOG_CRIT, "Unable to open link %s: %m", link_name(&(opts.opts), NULL));
492 usage(argv[0]);
493 }
494 }
495
496 opts.opts.pkt_type = PKT_TYPE_TOS;
497 opts.receive = sevent_rcv_pkt;
498 if (!(opts.opts.name = misc_parse_out_option(&argc, argv, "watch", 'W'))) {
499 elog(LOG_CRIT, "Please specify --watch!");
500 usage(argv[0]);
501 }
502 else {
503 elog(LOG_DEBUG(1), "link-name W: %s\n", opts.opts.name);
504 if (lu_open(&opts, NULL) < 0) {
505 elog(LOG_CRIT, "Unable to open link %s: %m", link_name(&(opts.opts), NULL));
506 exit(0);
507 }
508 }
509
510 change_sreq_period(ctx.sevent_req_period_secs * 1000);
511 if (NUM_RB_ELEMS == 0)
512 {
513 elog(LOG_ERR, "num-rb-elems (%d) must be > 0!!\n", NUM_RB_ELEMS);
514 exit(1);
515 }
516
517 emrun_init(&emrun_opts);
518 g_main();
519 return 0;
520 }
521
This page was automatically generated by the
LXR engine.
Visit the LXR main site for more
information.