|
|
Jump to this file's LXR Page |
|
|
File: [CENS] / emstar / devel / sympathy_devel / sympathy_events.c
(download)
/
(as text)
Revision: 1.22, Sat Sep 3 00:15:56 2005 UTC (4 years, 2 months ago) by nithya Branch: MAIN CVS Tags: pregeonet, acoustic-05-18-06, PRE_TOSNIC_FIX, PRE_64BIT, HEAD, ESS_RELEASE_3_5, ESS_RELEASE_3_4, ESS_RELEASE_3_2, ESS_RELEASE_3_1, ESS_RELEASE_3_0, ESS_CONNECTIVITY, ESS_CENTROUTE_TESTING, EMSTAR_RELEASE_2_5, CYCLOPS_RELEASE_CANDIDATE_2_0, CYCLOPS_PRERELEASE_STABLE, CENTROUTE_EMSTAR_SOCKETS, BG_1_0, BANGLADESH_ARSENIC_1_2, BANGLADESH_ARSENIC_1_1, AMARSS_JR_DEPLOYMENT_6_05_07 Changes since 1.21: +36 -0 lines Added Boilerplate |
/* ex: set tabstop=2 expandtab shiftwidth=2 softtabstop=2: */
/*
*
* Copyright (c) 2003 The Regents of the University of California. All
* rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* - Neither the name of the University nor the names of its
* contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS''
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
* THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
* OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
/*
*
* Author: Nithya Ramanthan
*
*/
#include "sympathy_app.h"
#include <tos-contrib/multihop/tos/lib/MultihopTypes.h>
#include <tos-contrib/include/protocols.h>
#define NUM_RB_ELEMS RING_BUF_NUM_ELEMS
uint8_t ssize = sizeof(Spkt_t);
uint8_t rsize = sizeof(RBelem_t);
uint8_t seqno = 0;
sevent_ctx_t ctx = {
sevent_req_period_secs: EVENT_REQUEST_PERIOD_MSEC/1000,
};
/*** STATIC FUNCTIONS ***/
static int find_event_info(if_id_t src)
{
int i;
for (i = 0; i < ctx.num_srcs; i++)
{
if (ctx.events[i].addr == src) return i;
}
return -1;
}
static
void print_rb_elem(buf_t* tbuf, RBelem_t* rb)
{
bufprintf(tbuf, "%d %s %d ", rb->time,
(rb->Sevent.type == NEIGHBOR_ADDED ? "add-neigh":
rb->Sevent.type == NEIGHBOR_DROPPED ? "remv-neigh":
rb->Sevent.type == CHANGE_IN_INGRESS ? "change-ingress":
rb->Sevent.type == CHANGE_IN_EGRESS ? "change-egress":
rb->Sevent.type == NEXT_HOP_CHANGED ? "nh-changed":
rb->Sevent.type == NEXT_HOP_QUALITY_CHANGED ? "nh-qual-changed":
rb->Sevent.type == NEXT_HOP_NOT_NEIGHBOR ? "nh-not-neighbor":
""),
rb->Sevent.address);
if ((rb->Sevent.type == NEIGHBOR_ADDED)
|| (rb->Sevent.type == NEIGHBOR_DROPPED))
{
new_neighbor_t* n = (new_neighbor_t *) &(rb->Sevent.data[0]);
bufprintf(tbuf, "%d\t %d\t %d\n", n->ingress, n->egress,
n->num_current_neighbors);
}
else if ((rb->Sevent.type == CHANGE_IN_INGRESS)
|| (rb->Sevent.type == CHANGE_IN_EGRESS))
{
lq_node_t* n = (lq_node_t *) &(rb->Sevent.data[0]);
bufprintf(tbuf, "%d %d\n", n->quality, n->change);
}
else if ((rb->Sevent.type == NEXT_HOP_CHANGED)
|| (rb->Sevent.type == NEXT_HOP_QUALITY_CHANGED)
|| (rb->Sevent.type == NEXT_HOP_NOT_NEIGHBOR))
{
nh_node_t* n = (nh_node_t *) &(rb->Sevent.data[0]);
bufprintf(tbuf, "%d %d\n", n->quality, n->sink);
}
else bufprintf(tbuf, "\n");
}
static
int get_minutes_since_event(struct timeval* time)
{
struct timeval t;
gettimeofday(&t, NULL);
return ((int) (t.tv_sec - time->tv_sec) / 60);
}
static
void print_events(buf_t* buf, sevent_rb_t* stat)
{
int i, minutes = get_minutes_since_event(&stat->event_time);
bufprintf(buf, "\n**************\nNode %d, number-elems: %d\n",
stat->addr, stat->rb_num_elems);
bufprintf(buf, "Packets-rx from node: %d\n",
stat->pkts_rx_from_node);
bufprintf(buf, "Number requests tx: %d\n", ctx.num_sevent_reqs);
bufprintf(buf, "Minutes since last pkt-rx: %d mins\n", minutes);
bufprintf(buf, "\nTimestamp of last pkt rx: %d mins\n\n",
stat->last_event_time_mins);
if (stat->rb_num_elems > 0)
{
bufprintf(buf, "Time\t Type\t\t Addr\t Quality\t Change/Sink\n");
bufprintf(buf, "Time\t Type\t\t Addr\t Ingress\t Egress\t Num-neighbors\n");
for (i = 0; i < stat->rb_num_elems; i++)
{
print_rb_elem(buf, &stat->rb[i]);
}
}
}
int sympathy_print_events(status_context_t *info, buf_t *buf)
{
int i;
for (i = 0; i < ctx.num_srcs; i++)
{
print_events(buf, &ctx.events[i]);
}
return STATUS_MSG_COMPLETE;
}
static int abs_subtract(int a, int b)
{
if (a > b) return a - b;
return b - a;
}
static RBelem_t* get_last_element(sevent_rb_t* stat)
{
/* Get a pointer to the last-element. If the head-ptr is 0, then
* it is either because the rb is empty or because it has wrapped
* around */
if (stat->rb_head == 0)
{
if (stat->rb_full) return(&stat->rb[NUM_RB_ELEMS - 1]);
else return NULL;
}
/* No wrap-around has ocurred yet, but the buffer isn't full */
else return(&stat->rb[stat->rb_head - 1]);
return NULL;
}
/**** Public functions ****/
static
int add_event_info(if_id_t node)
{
int sctr = ctx.num_srcs;
if (node == 0) return -1;
ctx.events[sctr].addr = node;
elog(LOG_DEBUG(1), "Inserting node %d in slot %d\n", node, sctr);
inc_mod(&ctx.node_ctr, 1, SMAX_SRCS);
ctx.num_srcs = min(ctx.num_srcs + 1, SMAX_SRCS);
return sctr;
}
static void update_node(sevent_rb_t* stat)
{
sympathy_status_info_t info = {
pkt_tx:ctx.num_sevent_reqs,
pkt_rx:stat->pkts_rx_from_node,
pkt_expected_rx:ctx.num_sevent_reqs,
time_rx_last_pkt: stat->event_time,
};
link_pkt_t pkt = {
dst: {
id: LINK_BROADCAST
},
src: {
id: stat->addr
},
ext_type: SCOMP_STATS6,
type: SSINK_UPDATE,
};
buf_t* buf = buf_new();
bufcpy(buf, &pkt, sizeof(link_pkt_t));
bufcpy(buf, &info, sizeof(sympathy_status_info_t));
elog(LOG_DEBUG(1), "updating for node %d\n", stat->addr);
if (lu_send(ctx.update_lu, (link_pkt_t *)buf->buf, buf->len - sizeof(link_pkt_t)) < 0)
{
elog(LOG_ERR, "Unable to send pkt: %m!\n");
}
buf_free(buf);
}
/* Returns the number of elements copied over */
uint16_t handle_event_data(if_id_t node_addr, Spkt_t* mpkt, uint8_t len)
{
uint8_t num_copied = 0;
uint8_t start_index = 0; /* start copying from pkt's rb */
uint8_t num_elem_in_pkt = (len - ssize) / rsize;
RBelem_t* last_elem, *tmpE = NULL;
int diff, sctr, last_t = 0;
sevent_rb_t* stat;
elog(LOG_DEBUG(1), "from node %d, packet %dB\n", node_addr, len);
if ((sctr = find_event_info(node_addr)) < 0)
{
sctr = add_event_info(node_addr);
}
stat = &ctx.events[sctr];
if (!stat) return 0;
gettimeofday(&stat->event_time, NULL);
stat->pkts_rx_from_node++;
if ((last_elem = get_last_element(stat))) last_t = last_elem->time;
while (start_index < num_elem_in_pkt)
{
tmpE = (RBelem_t *)&mpkt->data[start_index*rsize];
/* Only take the difference if this is NOT a wrap-around case */
if (last_t >= tmpE->time) diff = 0;
else diff = abs_subtract(tmpE->time, last_t);
if (diff > TIME_DIFF_THRESH)
{
elog(LOG_DEBUG(1), "WARN, skip elem w/time %d, its %d > than last-time (%d)\n",
tmpE->time, diff, last_t);
}
/* Insert elements if ring-buffer is empty, or if the time
* of the element is > than the last-time - after passing the
* "garbage check" */
else if (tmpE->time >= last_t)
{
memcpy(&stat->rb[stat->rb_head], &mpkt->data[rsize * start_index], rsize);
inc_mod(&(stat->rb_head), 1, NUM_RB_ELEMS);
if (stat->rb_head == 0) stat->rb_full = 1;
last_t = tmpE->time;
stat->last_event_time_mins = tmpE->time / 60;
num_copied++;
}
start_index++;
}
elog(LOG_DEBUG(1), "copying %d elements\n", num_copied);
if (num_copied > 0)
{
stat->rb_num_elems = min16(stat->rb_num_elems + num_copied, NUM_RB_ELEMS);
update_node(stat);
}
return (num_copied);
}
static
int sevent_send_pkt(Spkt_t* pkt, void* data, ssize_t data_len)
{
buf_t* buf = buf_new();
int retval = 0;
link_pkt_t hdr = {
dst: {
id: LINK_BROADCAST,
},
ext_type: MULTIHOP_SYMPATHY,
type: MULTIHOP_SYMPATHY
};
bufcpy(buf, &hdr, sizeof(hdr));
bufcpy(buf, pkt, sizeof(Spkt_t));
if (data_len > 0) bufcpy(buf, data, data_len);
if ((lu_send(ctx.mh_link, (link_pkt_t*)buf->buf, buf->len - sizeof(link_pkt_t))) < 0)
{
elog(LOG_ERR, "Unable to send packet of type: %d!\n", pkt->type);
retval = -1;
goto done;
}
elog(LOG_DEBUG(1), "will send packet with type: %d, len: %d\n", pkt->type,
buf->len - sizeof(link_pkt_t));
done:
buf_free(buf);
return retval;
}
static int send_event_request(void* data, int interval, g_event_t* event)
{
Spkt_t pkt = {
type: SREQUEST_EVENTS
};
Srequest_t req = {
seqno: seqno++,
};
int i;
sevent_rb_t* stat;
elog(LOG_DEBUG(1), "num-srcs: %d, sending seqno: %d\n",
ctx.num_srcs, req.seqno);
if (sevent_send_pkt(&pkt, (void *)&req, sizeof(Srequest_t)) == 0)
{
ctx.num_sevent_reqs++;
for (i = 0; i < ctx.num_srcs; i++)
{
stat = &ctx.events[i];
elog(LOG_DEBUG(1), "%d: calling update for node %d\n", i, stat->addr);
update_node(stat);
}
elog(LOG_DEBUG(1),"Sending sympathy request!\n");
}
else
{
elog(LOG_ERR, "Unable to send sympathy request!\n");
}
return EVENT_RENEW;
}
void change_sreq_period(int period_msecs)
{
elog(LOG_DEBUG(1), "New period_msecs = %d\n", period_msecs);
if (ctx.sreq_timer)
{
if (period_msecs == 0) g_event_destroy(ctx.sreq_timer);
g_timer_resched(ctx.sreq_timer, period_msecs);
return;
}
elog(LOG_DEBUG(1), "Adding new timer!\n");
if (period_msecs > 0) g_timer_add(period_msecs, send_event_request, NULL, NULL, &ctx.sreq_timer);
}
static
int sevent_command(char* cmd, size_t size, void* data)
{
parser_state_t *ps = misc_parse_init(cmd, MISC_PARSE_COLON_SCHEME);
elog(LOG_DEBUG(1), "command: %s\n", cmd);
/* parse message */
while (misc_parse_next_kvp(ps) >= 0)
{
if(strcmp(ps->key, "sreq_period") == 0 )
{
elog(LOG_DEBUG(1), "key: %s is sreq-period so new period is %d\n",
ps->key, atoi(ps->value));
ctx.sevent_req_period_secs = atoi(ps->value);
change_sreq_period(ctx.sevent_req_period_secs * 1000);
}
else elog(LOG_ERR, "Unrecognized key: %s\n", ps->key);
}
misc_parse_cleanup(ps);
return EVENT_RENEW;
}
char* sevent_usage(void *data)
{
buf_t buf = {};
bufprintf(&buf, "To change period of sympathy-requests write (pd=0 => stopsending sympathy requests):\n" \
"\t\t sreq_period=<int>[seconds] \n");
bufprintf(&buf, "Current value sreq_period=%d[secs]\n", ctx.sevent_req_period_secs);
return buf.buf;
}
static
int sevent_rcv_pkt(lu_context_t *link, link_pkt_t *hdr, ssize_t data_len)
{
buf_t* buf2 = buf_new();
multihop_hdr_t* mhdr = (multihop_hdr_t *)hdr->data;
Saddr_t src = mhdr->src;
Spkt_t* pkt;
void* tpkt;
ssize_t pkt_len = data_len - sizeof(multihop_hdr_t);
elog(LOG_DEBUG(1),"spkt-len: %d, ext_type: %d, hdr-type: %d len = %d, src = %d\n",
pkt_len, hdr->ext_type, hdr->type, data_len, src);
misc_hexdump_to_buf(buf2, (char *)pkt, pkt_len, "NR");
elog(LOG_DEBUG(1), "DATA SPKT: %s\n", buf2->buf);
/* NR get correct type info! */
if (hdr->ext_type == 4)
{
tpkt = mhdr + 1;
pkt = (Spkt_t *)tpkt;
if (mhdr->type == MULTIHOP_SYMPATHY)
{
if (pkt->type == SEVENT_RESPONSE) handle_event_data(src, pkt, pkt_len);
}
}
buf_free(buf2);
return EVENT_RENEW;
}
void usage(char *name)
{
misc_print_usage
(name, "-U <link> -W<link>",
" --uses <link>: specify the link to send sympathy-requests\n --watch <link>: specify the link to listen to packets on");
exit(1);
}
static
void sevent_shutdown(void *data)
{
elog(LOG_NOTICE, "sympathy-sevent shutting down");
exit(0);
}
static
int status_receive(lu_context_t* lu, link_pkt_t* pkt, ssize_t data_len)
{
elog(LOG_DEBUG(1), "src %d, Pkt w type/comp: %d/%d, datalen: %d\n",
pkt->src.id, pkt->type, pkt->ext_type, data_len);
/* NR todo eventually, need to decode packets that we receive of a
* certain comp-stats type! should be specified on the command-line!*/
g_free(pkt);
return EVENT_RENEW;
}
int main(int argc, char** argv)
{
char devname[100];
emrun_opts_t emrun_opts = {
shutdown: sevent_shutdown,
silent: 1
};
status_dev_opts_t sopts = {
device: {
},
};
cmd_dev_opts_t copts = {
device: {
},
command: sevent_command,
usage: sevent_usage,
};
lu_opts_t opts = {
opts: {
pkt_type: MULTIHOP_SYMPATHY
},
};
lu_opts_t opts_status = {
opts: {
name: SYMPATHY_STATS_DEVICE
},
receive: status_receive
};
misc_init(&argc, argv, CVSTAG);
/* Open command device */
sprintf(devname, "%s/%s", SSTATUS_APP_BASE, SEVENTS_CMD);
copts.device.devname = sim_path(devname);
if (g_command_dev(&copts, NULL) < 0) {
elog(LOG_ERR, "Unable to open command-device: %s\n", copts.device.devname);
}
if (lu_open(&opts_status, &ctx.update_lu) < 0) {
elog(LOG_DEBUG(1), "Unable to open link: %s:%m\n",
opts_status.opts.name);
}
/* Open status-devices */
sprintf(devname, "%s/%s", SSTATUS_APP_BASE,SEVENTS_STATUS);
sopts.device.devname = sim_path(devname);
sopts.printable = sympathy_print_events;
if (g_status_dev(&sopts, NULL) < 0) {
elog(LOG_ERR, "Unable to open status device: %s\n", sopts.device.devname);
}
/* Open link-device */
if (!(opts.opts.name = link_parse_uses(&argc, argv, NULL))) {
elog(LOG_WARNING, "WARNING: WILL NOT be sending sympathy-requests because didn't specify -U!\n");
}
else {
elog(LOG_DEBUG(1), "link-name U: %s\n", opts.opts.name);
if (lu_open(&opts, &ctx.mh_link) < 0) {
elog(LOG_CRIT, "Unable to open link %s: %m", link_name(&(opts.opts), NULL));
usage(argv[0]);
}
}
opts.opts.pkt_type = PKT_TYPE_TOS;
opts.receive = sevent_rcv_pkt;
if (!(opts.opts.name = misc_parse_out_option(&argc, argv, "watch", 'W'))) {
elog(LOG_CRIT, "Please specify --watch!");
usage(argv[0]);
}
else {
elog(LOG_DEBUG(1), "link-name W: %s\n", opts.opts.name);
if (lu_open(&opts, NULL) < 0) {
elog(LOG_CRIT, "Unable to open link %s: %m", link_name(&(opts.opts), NULL));
exit(0);
}
}
change_sreq_period(ctx.sevent_req_period_secs * 1000);
if (NUM_RB_ELEMS == 0)
{
elog(LOG_ERR, "num-rb-elems (%d) must be > 0!!\n", NUM_RB_ELEMS);
exit(1);
}
emrun_init(&emrun_opts);
g_main();
return 0;
}
| CENS CVS Mailing List |
Powered by ViewCVS 0.9.2 |