(file) Return to sympathy_events.c CVS log (file) Jump to this file's LXR Page (dir) Up to [CENS] / emstar / devel / sympathy_devel

File: [CENS] / emstar / devel / sympathy_devel / sympathy_events.c (download) / (as text)
Revision: 1.22, Sat Sep 3 00:15:56 2005 UTC (4 years, 2 months ago) by nithya
Branch: MAIN
CVS Tags: pregeonet, acoustic-05-18-06, PRE_TOSNIC_FIX, PRE_64BIT, HEAD, ESS_RELEASE_3_5, ESS_RELEASE_3_4, ESS_RELEASE_3_2, ESS_RELEASE_3_1, ESS_RELEASE_3_0, ESS_CONNECTIVITY, ESS_CENTROUTE_TESTING, EMSTAR_RELEASE_2_5, CYCLOPS_RELEASE_CANDIDATE_2_0, CYCLOPS_PRERELEASE_STABLE, CENTROUTE_EMSTAR_SOCKETS, BG_1_0, BANGLADESH_ARSENIC_1_2, BANGLADESH_ARSENIC_1_1, AMARSS_JR_DEPLOYMENT_6_05_07
Changes since 1.21: +36 -0 lines
Added Boilerplate

/* ex: set tabstop=2 expandtab shiftwidth=2 softtabstop=2: */
/*
 *
 * Copyright (c) 2003 The Regents of the University of California.  All 
 * rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 *
 * - Redistributions of source code must retain the above copyright
 *   notice, this list of conditions and the following disclaimer.
 *
 * - Neither the name of the University nor the names of its
 *   contributors may be used to endorse or promote products derived
 *   from this software without specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS''
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
 * PARTICULAR  PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
 * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 *
 */

 /*
  *
  * Author: Nithya Ramanthan
  *
  */

#include "sympathy_app.h"
#include <tos-contrib/multihop/tos/lib/MultihopTypes.h>
#include <tos-contrib/include/protocols.h>

#define NUM_RB_ELEMS RING_BUF_NUM_ELEMS

uint8_t ssize = sizeof(Spkt_t);
uint8_t rsize = sizeof(RBelem_t);
uint8_t seqno = 0;

sevent_ctx_t ctx = {
    sevent_req_period_secs: EVENT_REQUEST_PERIOD_MSEC/1000,
};

/*** STATIC FUNCTIONS ***/

static int find_event_info(if_id_t src)
{
  int i;
  for (i = 0; i < ctx.num_srcs; i++)
  {
    if (ctx.events[i].addr == src) return i;
  }
  return -1;
}

static
void print_rb_elem(buf_t* tbuf, RBelem_t* rb)
{
  bufprintf(tbuf, "%d   %s   %d    ", rb->time, 
    (rb->Sevent.type == NEIGHBOR_ADDED ? "add-neigh":
	    rb->Sevent.type == NEIGHBOR_DROPPED ? "remv-neigh":
	      rb->Sevent.type == CHANGE_IN_INGRESS ? "change-ingress": 
	        rb->Sevent.type == CHANGE_IN_EGRESS ? "change-egress": 
	          rb->Sevent.type == NEXT_HOP_CHANGED ? "nh-changed": 
	            rb->Sevent.type == NEXT_HOP_QUALITY_CHANGED ? "nh-qual-changed":
	              rb->Sevent.type == NEXT_HOP_NOT_NEIGHBOR ? "nh-not-neighbor":
	 ""),
      rb->Sevent.address);

  if ((rb->Sevent.type == NEIGHBOR_ADDED) 
    || (rb->Sevent.type == NEIGHBOR_DROPPED)) 
  { 
    new_neighbor_t* n = (new_neighbor_t *) &(rb->Sevent.data[0]);
    bufprintf(tbuf, "%d\t %d\t %d\n", n->ingress, n->egress, 
        n->num_current_neighbors);
  }
  else if ((rb->Sevent.type == CHANGE_IN_INGRESS)  
     || (rb->Sevent.type == CHANGE_IN_EGRESS)) 
  { 
    lq_node_t* n = (lq_node_t *) &(rb->Sevent.data[0]);
    bufprintf(tbuf, "%d    %d\n", n->quality, n->change);
  }

  else if ((rb->Sevent.type == NEXT_HOP_CHANGED)  
    || (rb->Sevent.type == NEXT_HOP_QUALITY_CHANGED) 
    || (rb->Sevent.type == NEXT_HOP_NOT_NEIGHBOR)) 
  { 
    nh_node_t* n = (nh_node_t *) &(rb->Sevent.data[0]);
    bufprintf(tbuf, "%d    %d\n", n->quality, n->sink);
  }
  else bufprintf(tbuf, "\n"); 
}

static
int get_minutes_since_event(struct timeval* time)
{
  struct timeval t;
  gettimeofday(&t, NULL);
  return ((int) (t.tv_sec - time->tv_sec) / 60);
}

static
void print_events(buf_t* buf, sevent_rb_t* stat)
{
  int i, minutes = get_minutes_since_event(&stat->event_time);

  bufprintf(buf, "\n**************\nNode %d, number-elems: %d\n", 
      stat->addr, stat->rb_num_elems);
  bufprintf(buf, "Packets-rx from node: %d\n", 
      stat->pkts_rx_from_node);
  bufprintf(buf, "Number requests tx: %d\n", ctx.num_sevent_reqs);
  bufprintf(buf, "Minutes since last pkt-rx: %d mins\n", minutes);
  bufprintf(buf, "\nTimestamp of last pkt rx: %d mins\n\n", 
      stat->last_event_time_mins);

  if (stat->rb_num_elems > 0)
  {
    bufprintf(buf, "Time\t Type\t\t Addr\t Quality\t Change/Sink\n");
    bufprintf(buf, "Time\t Type\t\t Addr\t Ingress\t Egress\t Num-neighbors\n");
    for (i = 0; i < stat->rb_num_elems; i++)
    {
      print_rb_elem(buf, &stat->rb[i]);
    }
  }
}

int sympathy_print_events(status_context_t *info, buf_t *buf)
{
  int i;
	for (i = 0; i < ctx.num_srcs; i++)
	{
    print_events(buf, &ctx.events[i]);
	}
  return STATUS_MSG_COMPLETE;
}
static int abs_subtract(int a, int b)
{
  if (a > b) return a - b;
  return b - a;
}

static RBelem_t* get_last_element(sevent_rb_t* stat)
{
	/* Get a pointer to the last-element. If the head-ptr is 0, then
	 * it is either because the rb is empty or because it has wrapped
	 * around */
  if (stat->rb_head == 0)
  {
	  if (stat->rb_full) return(&stat->rb[NUM_RB_ELEMS - 1]);
    else return NULL;
  }
	/* No wrap-around has ocurred yet, but the buffer isn't full */
  else return(&stat->rb[stat->rb_head - 1]);
	return NULL;
}

/**** Public functions ****/
static
int add_event_info(if_id_t node)
{
  int sctr = ctx.num_srcs;
  if (node == 0) return -1;

  ctx.events[sctr].addr = node;
  elog(LOG_DEBUG(1), "Inserting node %d in slot %d\n", node, sctr);
  inc_mod(&ctx.node_ctr, 1, SMAX_SRCS);
  ctx.num_srcs = min(ctx.num_srcs + 1, SMAX_SRCS);
  return sctr;
}

static void update_node(sevent_rb_t* stat)
{
  sympathy_status_info_t info = {
    pkt_tx:ctx.num_sevent_reqs,
    pkt_rx:stat->pkts_rx_from_node,
    pkt_expected_rx:ctx.num_sevent_reqs,
    time_rx_last_pkt: stat->event_time,
  };
  link_pkt_t pkt = {
    dst: {
      id: LINK_BROADCAST
    },
    src: {
      id: stat->addr
    },
    ext_type: SCOMP_STATS6,
    type: SSINK_UPDATE,
  };
  buf_t* buf = buf_new();

  bufcpy(buf, &pkt, sizeof(link_pkt_t));
  bufcpy(buf, &info, sizeof(sympathy_status_info_t));

  elog(LOG_DEBUG(1), "updating for node %d\n", stat->addr);
  if (lu_send(ctx.update_lu, (link_pkt_t *)buf->buf, buf->len - sizeof(link_pkt_t)) < 0)
  {
     elog(LOG_ERR, "Unable to send pkt: %m!\n");
  }
  buf_free(buf);
}

/* Returns the number of elements copied over */
uint16_t handle_event_data(if_id_t node_addr, Spkt_t* mpkt, uint8_t len)
{
  uint8_t num_copied = 0;
	uint8_t start_index = 0; /* start copying from pkt's rb */
	uint8_t num_elem_in_pkt = (len - ssize) / rsize;
	RBelem_t* last_elem, *tmpE = NULL;
	int diff, sctr, last_t = 0;
  sevent_rb_t* stat;

  elog(LOG_DEBUG(1), "from node %d, packet %dB\n", node_addr, len);
  if ((sctr = find_event_info(node_addr)) < 0)
  {
    sctr = add_event_info(node_addr);
  }
  stat = &ctx.events[sctr];
  if (!stat) return 0;
  gettimeofday(&stat->event_time, NULL);
  stat->pkts_rx_from_node++;

	if ((last_elem = get_last_element(stat))) last_t = last_elem->time;
	while (start_index < num_elem_in_pkt)
	{
		tmpE = (RBelem_t *)&mpkt->data[start_index*rsize];

		/* Only take the difference if this is NOT a wrap-around case */
		if (last_t >= tmpE->time) diff = 0;
		else diff = abs_subtract(tmpE->time, last_t);

		if (diff > TIME_DIFF_THRESH) 
		{
			elog(LOG_DEBUG(1), "WARN, skip elem w/time %d, its %d > than last-time (%d)\n",
	       tmpE->time, diff, last_t);
		} 

		/* Insert elements if ring-buffer is empty, or if the time
		 * of the element is > than the last-time - after passing the
		 * "garbage check" */
		else if (tmpE->time >= last_t)
		{
			memcpy(&stat->rb[stat->rb_head], &mpkt->data[rsize * start_index], rsize);
			inc_mod(&(stat->rb_head), 1, NUM_RB_ELEMS);
      if (stat->rb_head == 0) stat->rb_full = 1;
			last_t = tmpE->time;
      stat->last_event_time_mins = tmpE->time / 60;
			num_copied++;
		}
    start_index++;
	}

  elog(LOG_DEBUG(1), "copying %d elements\n", num_copied);
	if (num_copied > 0)  
	{
		stat->rb_num_elems = min16(stat->rb_num_elems + num_copied, NUM_RB_ELEMS);
    update_node(stat);
	}
  return (num_copied);
}

static
int sevent_send_pkt(Spkt_t* pkt, void* data, ssize_t data_len)
{
  buf_t* buf = buf_new();
  int retval = 0;
  link_pkt_t hdr = {
    dst: {
      id: LINK_BROADCAST,
    },
    ext_type: MULTIHOP_SYMPATHY,
    type: MULTIHOP_SYMPATHY
  };

  bufcpy(buf, &hdr, sizeof(hdr));
  bufcpy(buf, pkt, sizeof(Spkt_t));
  if (data_len > 0) bufcpy(buf, data, data_len);
  if ((lu_send(ctx.mh_link, (link_pkt_t*)buf->buf, buf->len - sizeof(link_pkt_t))) < 0)
  {
    elog(LOG_ERR, "Unable to send packet of type: %d!\n", pkt->type);
    retval = -1;
    goto done;
  }
  elog(LOG_DEBUG(1), "will send packet with type: %d, len: %d\n", pkt->type,
    buf->len - sizeof(link_pkt_t));   
done:
  buf_free(buf);
  return retval;
}

static int send_event_request(void* data, int interval, g_event_t* event)
{
  Spkt_t pkt = {
    type: SREQUEST_EVENTS 
  };   
  Srequest_t req = {
    seqno: seqno++,
  };
  int i;
  sevent_rb_t* stat;

  elog(LOG_DEBUG(1), "num-srcs: %d, sending seqno: %d\n", 
      ctx.num_srcs, req.seqno);
  if (sevent_send_pkt(&pkt, (void *)&req, sizeof(Srequest_t)) == 0) 
  {
    ctx.num_sevent_reqs++;
    for (i = 0; i < ctx.num_srcs; i++)
    {
      stat = &ctx.events[i];
      elog(LOG_DEBUG(1), "%d: calling update for node %d\n", i, stat->addr);
      update_node(stat);
    }
    elog(LOG_DEBUG(1),"Sending sympathy request!\n");
  }
  else
  {
    elog(LOG_ERR, "Unable to send sympathy request!\n");
  }
  return EVENT_RENEW;
}

void change_sreq_period(int period_msecs)
{
  elog(LOG_DEBUG(1), "New period_msecs = %d\n", period_msecs);
  if (ctx.sreq_timer) 
  {
    if (period_msecs == 0) g_event_destroy(ctx.sreq_timer);
    g_timer_resched(ctx.sreq_timer, period_msecs);
    return;
  }
  elog(LOG_DEBUG(1), "Adding new timer!\n");
  if (period_msecs > 0) g_timer_add(period_msecs, send_event_request, NULL, NULL, &ctx.sreq_timer);
}

static
int sevent_command(char* cmd, size_t size, void* data)
{
  parser_state_t *ps = misc_parse_init(cmd, MISC_PARSE_COLON_SCHEME);

  elog(LOG_DEBUG(1), "command: %s\n", cmd);
  /* parse message */
  while (misc_parse_next_kvp(ps) >= 0)
  {
    if(strcmp(ps->key, "sreq_period") == 0 )
    {
      elog(LOG_DEBUG(1), "key: %s is sreq-period so new period is %d\n", 
          ps->key, atoi(ps->value));
      ctx.sevent_req_period_secs = atoi(ps->value);
      change_sreq_period(ctx.sevent_req_period_secs * 1000);
    }
    else elog(LOG_ERR, "Unrecognized key: %s\n", ps->key);
  }
  misc_parse_cleanup(ps);
  return EVENT_RENEW;
}

char* sevent_usage(void *data)
{
  buf_t buf = {};
  bufprintf(&buf, "To change period of sympathy-requests write (pd=0 => stopsending sympathy requests):\n" \
      "\t\t sreq_period=<int>[seconds] \n");
  bufprintf(&buf, "Current value sreq_period=%d[secs]\n", ctx.sevent_req_period_secs);
  return buf.buf;
}

static
int sevent_rcv_pkt(lu_context_t *link, link_pkt_t *hdr, ssize_t data_len)
{
  buf_t* buf2 = buf_new();
  multihop_hdr_t* mhdr = (multihop_hdr_t *)hdr->data;
  Saddr_t src = mhdr->src;
  Spkt_t* pkt;
  void* tpkt;
  ssize_t pkt_len = data_len - sizeof(multihop_hdr_t);

  elog(LOG_DEBUG(1),"spkt-len: %d, ext_type: %d, hdr-type: %d len = %d, src = %d\n", 
      pkt_len, hdr->ext_type, hdr->type, data_len, src);
  misc_hexdump_to_buf(buf2, (char *)pkt, pkt_len, "NR");
  elog(LOG_DEBUG(1), "DATA SPKT: %s\n", buf2->buf);

  /* NR get correct type info! */
  if (hdr->ext_type == 4) 
  {
    tpkt = mhdr + 1;
    pkt = (Spkt_t *)tpkt;
    if (mhdr->type == MULTIHOP_SYMPATHY)
    {
      if (pkt->type == SEVENT_RESPONSE) handle_event_data(src, pkt, pkt_len);
    }
  }
  buf_free(buf2);
  return EVENT_RENEW;
}

void usage(char *name)
{
   misc_print_usage
      (name, "-U <link> -W<link>",
            "  --uses <link>: specify the link to send sympathy-requests\n  --watch <link>: specify the link to listen to packets on");
      exit(1);
} 

static
void sevent_shutdown(void *data)
{
    elog(LOG_NOTICE, "sympathy-sevent shutting down");
      exit(0);
}

static
int status_receive(lu_context_t* lu, link_pkt_t* pkt, ssize_t data_len)
{
  elog(LOG_DEBUG(1), "src %d, Pkt w type/comp: %d/%d, datalen: %d\n",
       pkt->src.id, pkt->type, pkt->ext_type, data_len);
  /* NR todo eventually, need to decode packets that we receive of a
   * certain comp-stats type! should be specified on the command-line!*/

  g_free(pkt);
  return EVENT_RENEW;
}

int main(int argc, char** argv)
{
  char devname[100];
  emrun_opts_t emrun_opts = {
    shutdown: sevent_shutdown,
    silent: 1
  };

	status_dev_opts_t sopts = {
    device: {
		},
	};

  cmd_dev_opts_t copts = {
    device: {
		},
    command: sevent_command,
    usage: sevent_usage,
	};

  lu_opts_t opts = {
    opts: {
       pkt_type: MULTIHOP_SYMPATHY
    },
  };

  lu_opts_t opts_status = {
    opts: {
      name: SYMPATHY_STATS_DEVICE
    },
    receive: status_receive
  };

  misc_init(&argc, argv, CVSTAG);

  /* Open command device */
  sprintf(devname, "%s/%s", SSTATUS_APP_BASE, SEVENTS_CMD);
  copts.device.devname = sim_path(devname);
  if (g_command_dev(&copts, NULL) < 0) {
    elog(LOG_ERR, "Unable to open command-device: %s\n", copts.device.devname);
  }

  if (lu_open(&opts_status, &ctx.update_lu) < 0) {
    elog(LOG_DEBUG(1), "Unable to open link: %s:%m\n", 
        opts_status.opts.name);
  }

	/* Open status-devices */
  sprintf(devname, "%s/%s", SSTATUS_APP_BASE,SEVENTS_STATUS);
  sopts.device.devname = sim_path(devname);
	sopts.printable = sympathy_print_events;
  if (g_status_dev(&sopts, NULL) < 0) {
		elog(LOG_ERR, "Unable to open status device: %s\n", sopts.device.devname);
	}

  /* Open link-device */
  if (!(opts.opts.name = link_parse_uses(&argc, argv, NULL))) {
    elog(LOG_WARNING, "WARNING: WILL NOT be sending sympathy-requests because didn't specify -U!\n");
  }
  else {
     elog(LOG_DEBUG(1), "link-name U: %s\n", opts.opts.name);
     if (lu_open(&opts, &ctx.mh_link) < 0) {
       elog(LOG_CRIT, "Unable to open link %s: %m", link_name(&(opts.opts), NULL));
      usage(argv[0]);
     }
  }

  opts.opts.pkt_type = PKT_TYPE_TOS;
  opts.receive = sevent_rcv_pkt;
  if (!(opts.opts.name = misc_parse_out_option(&argc, argv, "watch", 'W'))) {
    elog(LOG_CRIT, "Please specify --watch!");
    usage(argv[0]);
  } 
  else {
     elog(LOG_DEBUG(1), "link-name W: %s\n", opts.opts.name);
     if (lu_open(&opts, NULL) < 0) {
       elog(LOG_CRIT, "Unable to open link %s: %m", link_name(&(opts.opts), NULL));
       exit(0);
     }
  }

  change_sreq_period(ctx.sevent_req_period_secs * 1000);
  if (NUM_RB_ELEMS == 0)
  {
	  elog(LOG_ERR, "num-rb-elems (%d) must be > 0!!\n", NUM_RB_ELEMS);
    exit(1);
  }

  emrun_init(&emrun_opts);
  g_main();
  return 0;
}

CENS CVS Mailing List
Powered by
ViewCVS 0.9.2