|
|
Jump to this file's LXR Page |
|
|
File: [CENS] / emstar / fusd / fusdd / fusdd_socket.c
(download)
/
(as text)
Revision: 1.14, Tue Jan 18 03:37:29 2005 UTC (4 years, 10 months ago) by girod Branch: MAIN CVS Tags: rdd_alpha_version_1, pregeonet, mote, acoustic-05-18-06, PRE_TOSNIC_FIX, PRE_64BIT, MOTENIC_PRE_BUGFIX_20050415, LAURA_CALIBRATION_EXPERIMENTS, HEAD, ESS_RELEASE_3_5, ESS_RELEASE_3_4, ESS_RELEASE_3_3, ESS_RELEASE_3_2, ESS_RELEASE_3_1, ESS_RELEASE_3_0, ESS_RELEASE_2_0, ESS_CONNECTIVITY, ESS_CENTROUTE_TESTING, ESS2-CMS-V1_5_pretest, ESS2-CMS-V1_4cMergeSympathy_2, ESS2-CMS-V1_4c, ESS2-CMS-V1_4b, ESS2-CMS-V1_4a, ESS2-CMS-V1_3, ESS2-CMS-V1_2, ESS2-CMS-V1_1, ESS2-CMS-V1_0, EMSTAR_RELEASE_2_5, EMSTAR_RELEASE_2_1_BRANCH, EMSTAR_RELEASE_2_1, CYCLOPS_RELEASE_CANDIDATE_2_0, CYCLOPS_PRERELEASE_STABLE, CENTROUTE_EMSTAR_SOCKETS, BG_1_0, BANGLADESH_ARSENIC_1_2, BANGLADESH_ARSENIC_1_1, AMARSS_JR_DEPLOYMENT_6_05_07 Changes since 1.13: +6 -1 lines * fixed a bug in fusdnet client, caused when not in remap mode * maybe fixed a memory error in fusdnet_server. could not reproduce memory problem after the fix. |
/*
*
* Copyright (c) 2003 The Regents of the University of California. All
* rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* - Neither the name of the University nor the names of its
* contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS''
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
* THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
* OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <fusdd_net_i.h>
/*
* Utility functions
*/
void fusd_msg_free(fusd_msg_t *msg)
{
if (msg) {
if (msg->data) free(msg->data);
free(msg);
}
}
/*
* the Read side...
*/
static void fn_socket_read_process(fn_socket_t *fs);
static int fn_socket_write_process(fn_socket_t *fs);
int fn_socket_read_blocked(fn_socket_t *fs)
{
int enabled;
g_event_get_enable(fs->read_event, &enabled);
return !enabled;
}
void fn_socket_read_set_blocked(fn_socket_t *fs, int block)
{
/* $$$ we may not want to actually block reads.. just block
* delivery to the app */
g_event_set_enable(fs->read_event, !block);
if (!block)
fn_socket_read_process(fs);
}
static
void fn_socket_read_process(fn_socket_t *fs)
{
while (fs->input_buffer && !fn_socket_read_blocked(fs)) {
fusd_msg_t *hdr = (fusd_msg_t *)(fs->input_buffer->buf);
int total_len = hdr->datalen + sizeof(fusd_msg_t);
char *msg_data = NULL;
fusd_msg_t *msg = NULL;
/* not a whole message? */
if ((fs->input_buffer->len < sizeof(fusd_msg_t)) ||
(fs->input_buffer->len < total_len))
break;
/* arrived! */
/* allocate */
msg = malloc(sizeof(fusd_msg_t));
if (hdr->datalen)
msg_data = malloc(hdr->datalen);
/* copy */
memmove(msg, hdr, sizeof(fusd_msg_t));
if (hdr->datalen > 0)
memmove(msg_data, fs->input_buffer->buf + sizeof(fusd_msg_t),
hdr->datalen);
msg->data = msg_data;
/* shift extra */
if (fs->input_buffer->len > total_len) {
memmove(fs->input_buffer->buf, fs->input_buffer->buf + total_len,
fs->input_buffer->len - total_len);
buf_shorten(fs->input_buffer, total_len);
}
else {
buf_free(fs->input_buffer);
fs->input_buffer = NULL;
}
/* call arrived */
fs->arrived(fs, msg);
}
}
static
int fn_socket_handle_read(void *data, int fd, int cond, g_event_t *event)
{
fn_socket_t *fs = (fn_socket_t *)data;
char buf[4096];
int status;
if (event == NULL) {
elog(LOG_WARNING, "*** Event is NULL!");
return EVENT_DONE;
}
status = read(fd, buf, sizeof(buf));
/* handle errors */
if (status < 0) {
if (!((errno == EINTR) || (errno == EAGAIN))) {
elog(LOG_WARNING, "Unexpected read error on socket %s: %m", fs->socket_name);
fs->closed(fs, errno);
}
goto done;
}
/* handle close */
if (status == 0) {
fs->closed(fs, 0);
goto done;
}
/* handle new data */
if (fs->input_buffer == NULL)
fs->input_buffer = buf_new();
bufcpy(fs->input_buffer, buf, status);
/* process more data ? */
fn_socket_read_process(fs);
done:
return EVENT_RENEW;
}
/*
* Write part
*/
static
int fn_socket_write_process(fn_socket_t *fs)
{
int status=0;
if (fs->output_buffer == NULL)
goto not_blocked;
status = write(fs->socket_fd, fs->output_buffer->buf, fs->output_buffer->len);
if (status < 0) {
if (!((errno == EINTR) || (errno == EAGAIN))) {
elog(LOG_WARNING, "Unexpected read error on socket %s: %m", fs->socket_name);
fs->closed(fs, errno);
}
goto done;
}
/* partial write */
if (status < fs->output_buffer->len) {
memmove(fs->output_buffer->buf, fs->output_buffer->buf+status,
fs->output_buffer->len - status);
buf_shorten(fs->output_buffer, status);
g_event_set_enable(fs->write_event, 1);
status = 0;
goto done;
}
not_blocked:
if (fs->output_buffer)
buf_free(fs->output_buffer);
fs->output_buffer = NULL;
g_event_set_enable(fs->write_event, 0);
done:
return status;
}
static
int fn_socket_write_ready(void *data, int fd, int cond, g_event_t *event)
{
fn_socket_t *fs = (fn_socket_t *)data;
fn_socket_write_process(fs);
return EVENT_RENEW;
}
int fn_socket_write(fn_socket_t *fs, fusd_msg_t *msg)
{
if (fs->output_buffer == NULL)
fs->output_buffer = buf_new();
bufcpy(fs->output_buffer, msg, sizeof(fusd_msg_t));
bufcpy(fs->output_buffer, msg->data, msg->datalen);
return fn_socket_write_process(fs);
}
/*
* destructor
*/
void fn_socket_destroy(fn_socket_t *sock)
{
if (sock) {
close(sock->socket_fd);
g_event_destroy(sock->write_event);
g_event_destroy(sock->read_event);
if (sock->input_buffer) buf_free(sock->input_buffer);
if (sock->output_buffer) buf_free(sock->output_buffer);
sock->input_buffer = NULL;
sock->output_buffer = NULL;
}
}
/*
* Initializes the events, etc.
* call this function with close, arrived, private_data,
* socket_fd and socket_name set
*
* Caller is responsible to free socket_name and private_data.
*/
int fn_socket_config(fn_socket_t *sock)
{
if (g_event_add(sock->socket_fd, FUSD_NOTIFY_INPUT, fn_socket_handle_read,
sock, NULL, &(sock->read_event)) < 0) {
elog(LOG_CRIT, "Can't create event for new socket: %m");
return -1;
}
if (g_event_add(sock->socket_fd, FUSD_NOTIFY_OUTPUT, fn_socket_write_ready,
sock, NULL, &(sock->write_event)) < 0) {
elog(LOG_CRIT, "Can't create event for new socket: %m");
return -1;
}
return 0;
}
| CENS CVS Mailing List |
Powered by ViewCVS 0.9.2 |