(file) Return to fusdd_socket.c CVS log (file) Jump to this file's LXR Page (dir) Up to [CENS] / emstar / fusd / fusdd

File: [CENS] / emstar / fusd / fusdd / fusdd_socket.c (download) / (as text)
Revision: 1.14, Tue Jan 18 03:37:29 2005 UTC (4 years, 10 months ago) by girod
Branch: MAIN
CVS Tags: rdd_alpha_version_1, pregeonet, mote, acoustic-05-18-06, PRE_TOSNIC_FIX, PRE_64BIT, MOTENIC_PRE_BUGFIX_20050415, LAURA_CALIBRATION_EXPERIMENTS, HEAD, ESS_RELEASE_3_5, ESS_RELEASE_3_4, ESS_RELEASE_3_3, ESS_RELEASE_3_2, ESS_RELEASE_3_1, ESS_RELEASE_3_0, ESS_RELEASE_2_0, ESS_CONNECTIVITY, ESS_CENTROUTE_TESTING, ESS2-CMS-V1_5_pretest, ESS2-CMS-V1_4cMergeSympathy_2, ESS2-CMS-V1_4c, ESS2-CMS-V1_4b, ESS2-CMS-V1_4a, ESS2-CMS-V1_3, ESS2-CMS-V1_2, ESS2-CMS-V1_1, ESS2-CMS-V1_0, EMSTAR_RELEASE_2_5, EMSTAR_RELEASE_2_1_BRANCH, EMSTAR_RELEASE_2_1, CYCLOPS_RELEASE_CANDIDATE_2_0, CYCLOPS_PRERELEASE_STABLE, CENTROUTE_EMSTAR_SOCKETS, BG_1_0, BANGLADESH_ARSENIC_1_2, BANGLADESH_ARSENIC_1_1, AMARSS_JR_DEPLOYMENT_6_05_07
Changes since 1.13: +6 -1 lines
* fixed a bug in fusdnet client, caused when not in remap mode
* maybe fixed a memory error in fusdnet_server.  could not reproduce
  memory problem after the fix.

/*
 *
 * Copyright (c) 2003 The Regents of the University of California.  All 
 * rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 *
 * - Redistributions of source code must retain the above copyright
 *   notice, this list of conditions and the following disclaimer.
 *
 * - Neither the name of the University nor the names of its
 *   contributors may be used to endorse or promote products derived
 *   from this software without specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS''
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
 * PARTICULAR  PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
 * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 *
 */

#include <fusdd_net_i.h>


/*
 *  Utility functions
 */

void fusd_msg_free(fusd_msg_t *msg)
{
  if (msg) {
    if (msg->data) free(msg->data);
    free(msg);
  }
}


/*
 *  the Read side...
 */

static void fn_socket_read_process(fn_socket_t *fs);
static int fn_socket_write_process(fn_socket_t *fs);


int fn_socket_read_blocked(fn_socket_t *fs)
{
  int enabled;
  g_event_get_enable(fs->read_event, &enabled);
  return !enabled;
}


void fn_socket_read_set_blocked(fn_socket_t *fs, int block)
{
  /* $$$ we may not want to actually block reads.. just block
   *     delivery to the app */
  g_event_set_enable(fs->read_event, !block);
  if (!block)
    fn_socket_read_process(fs);
}


static 
void fn_socket_read_process(fn_socket_t *fs)
{
  while (fs->input_buffer && !fn_socket_read_blocked(fs)) {
    fusd_msg_t *hdr = (fusd_msg_t *)(fs->input_buffer->buf);
    int total_len = hdr->datalen + sizeof(fusd_msg_t);
    char *msg_data = NULL;
    fusd_msg_t *msg = NULL;

    /* not a whole message? */
    if ((fs->input_buffer->len < sizeof(fusd_msg_t)) ||
	(fs->input_buffer->len < total_len)) 
      break;
    
    /* arrived! */
	
    /* allocate */
    msg = malloc(sizeof(fusd_msg_t));
    if (hdr->datalen)
      msg_data = malloc(hdr->datalen);
    
    /* copy */
    memmove(msg, hdr, sizeof(fusd_msg_t));
    if (hdr->datalen > 0)
      memmove(msg_data, fs->input_buffer->buf + sizeof(fusd_msg_t),
	      hdr->datalen);
    msg->data = msg_data;
    
    /* shift extra */
    if (fs->input_buffer->len > total_len) {
      memmove(fs->input_buffer->buf, fs->input_buffer->buf + total_len, 
	      fs->input_buffer->len - total_len);
      buf_shorten(fs->input_buffer, total_len);
    }
    else {
      buf_free(fs->input_buffer);
      fs->input_buffer = NULL;
    }
    
    /* call arrived */
    fs->arrived(fs, msg);
  }
}


static
int fn_socket_handle_read(void *data, int fd, int cond, g_event_t *event)
{
  fn_socket_t *fs = (fn_socket_t *)data;
  char buf[4096];
  int status;

  if (event == NULL) {
    elog(LOG_WARNING, "*** Event is NULL!");
    return EVENT_DONE;
  }

  status = read(fd, buf, sizeof(buf));

  /* handle errors */
  if (status < 0) {
    if (!((errno == EINTR) || (errno == EAGAIN))) {
      elog(LOG_WARNING, "Unexpected read error on socket %s: %m", fs->socket_name);
      fs->closed(fs, errno);
    }
    goto done;
  }

  /* handle close */
  if (status == 0) {
    fs->closed(fs, 0);
    goto done;
  }

  /* handle new data */
  if (fs->input_buffer == NULL)
    fs->input_buffer = buf_new();
  bufcpy(fs->input_buffer, buf, status);

  /* process more data ? */
  fn_socket_read_process(fs);

 done:
  return EVENT_RENEW;
}

/*
 *  Write part
 */


static 
int fn_socket_write_process(fn_socket_t *fs)
{
  int status=0;

  if (fs->output_buffer == NULL) 
    goto not_blocked;

  status = write(fs->socket_fd, fs->output_buffer->buf, fs->output_buffer->len);
  if (status < 0) {
    if (!((errno == EINTR) || (errno == EAGAIN))) {
      elog(LOG_WARNING, "Unexpected read error on socket %s: %m", fs->socket_name);
      fs->closed(fs, errno);
    }
    goto done;
  }
  
  /* partial write */
  if (status < fs->output_buffer->len) {
    memmove(fs->output_buffer->buf, fs->output_buffer->buf+status,
	    fs->output_buffer->len - status);
    buf_shorten(fs->output_buffer, status);
    g_event_set_enable(fs->write_event, 1);
    status = 0;
    goto done;
  }

 not_blocked:
  if (fs->output_buffer) 
    buf_free(fs->output_buffer);
  fs->output_buffer = NULL;
  g_event_set_enable(fs->write_event, 0);
  
 done:
  return status;
}


static
int fn_socket_write_ready(void *data, int fd, int cond, g_event_t *event)
{
  fn_socket_t *fs = (fn_socket_t *)data;
  fn_socket_write_process(fs);
  return EVENT_RENEW;
}


int fn_socket_write(fn_socket_t *fs, fusd_msg_t *msg)
{
  if (fs->output_buffer == NULL)
    fs->output_buffer = buf_new();

  bufcpy(fs->output_buffer, msg, sizeof(fusd_msg_t));
  bufcpy(fs->output_buffer, msg->data, msg->datalen);
  
  return fn_socket_write_process(fs);
}


/*
 *  destructor
 */

void fn_socket_destroy(fn_socket_t *sock)
{
  if (sock) {
    close(sock->socket_fd);
    g_event_destroy(sock->write_event);
    g_event_destroy(sock->read_event);
    if (sock->input_buffer) buf_free(sock->input_buffer);
    if (sock->output_buffer) buf_free(sock->output_buffer);
    sock->input_buffer = NULL;
    sock->output_buffer = NULL;
  }
}


/*
 *  Initializes the events, etc.
 *    call this function with close, arrived, private_data,
 *    socket_fd and socket_name set
 *
 *  Caller is responsible to free socket_name and private_data.
 */

int fn_socket_config(fn_socket_t *sock)
{
  if (g_event_add(sock->socket_fd, FUSD_NOTIFY_INPUT, fn_socket_handle_read, 		
		  sock, NULL, &(sock->read_event)) < 0) {
    elog(LOG_CRIT, "Can't create event for new socket: %m");
    return -1;
  }

  if (g_event_add(sock->socket_fd, FUSD_NOTIFY_OUTPUT, fn_socket_write_ready, 		
		  sock, NULL, &(sock->write_event)) < 0) {
    elog(LOG_CRIT, "Can't create event for new socket: %m");
    return -1;
  }

  return 0;
}

CENS CVS Mailing List
Powered by
ViewCVS 0.9.2