~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~ [ freetext search ] ~ [ file search ] ~

Linux Cross Reference
cvs/emstar/fusd/fusdd/fusdd_socket.c


  1 /*
  2  *
  3  * Copyright (c) 2003 The Regents of the University of California.  All 
  4  * rights reserved.
  5  *
  6  * Redistribution and use in source and binary forms, with or without
  7  * modification, are permitted provided that the following conditions
  8  * are met:
  9  *
 10  * - Redistributions of source code must retain the above copyright
 11  *   notice, this list of conditions and the following disclaimer.
 12  *
 13  * - Neither the name of the University nor the names of its
 14  *   contributors may be used to endorse or promote products derived
 15  *   from this software without specific prior written permission.
 16  *
 17  * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS''
 18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
 19  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
 20  * PARTICULAR  PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR
 21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
 22  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
 23  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
 24  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
 25  * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 26  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 27  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 28  *
 29  */
 30 
 31 #include <fusdd_net_i.h>
 32 
 33 
 34 /*
 35  *  Utility functions
 36  */
 37 
 38 void fusd_msg_free(fusd_msg_t *msg)
 39 {
 40   if (msg) {
 41     if (msg->data) free(msg->data);
 42     free(msg);
 43   }
 44 }
 45 
 46 
 47 /*
 48  *  the Read side...
 49  */
 50 
 51 static void fn_socket_read_process(fn_socket_t *fs);
 52 static int fn_socket_write_process(fn_socket_t *fs);
 53 
 54 
 55 int fn_socket_read_blocked(fn_socket_t *fs)
 56 {
 57   int enabled;
 58   g_event_get_enable(fs->read_event, &enabled);
 59   return !enabled;
 60 }
 61 
 62 
 63 void fn_socket_read_set_blocked(fn_socket_t *fs, int block)
 64 {
 65   /* $$$ we may not want to actually block reads.. just block
 66    *     delivery to the app */
 67   g_event_set_enable(fs->read_event, !block);
 68   if (!block)
 69     fn_socket_read_process(fs);
 70 }
 71 
 72 
 73 static 
 74 void fn_socket_read_process(fn_socket_t *fs)
 75 {
 76   while (fs->input_buffer && !fn_socket_read_blocked(fs)) {
 77     fusd_msg_t *hdr = (fusd_msg_t *)(fs->input_buffer->buf);
 78     int total_len = hdr->datalen + sizeof(fusd_msg_t);
 79     char *msg_data = NULL;
 80     fusd_msg_t *msg = NULL;
 81 
 82     /* not a whole message? */
 83     if ((fs->input_buffer->len < sizeof(fusd_msg_t)) ||
 84         (fs->input_buffer->len < total_len)) 
 85       break;
 86     
 87     /* arrived! */
 88         
 89     /* allocate */
 90     msg = malloc(sizeof(fusd_msg_t));
 91     if (hdr->datalen)
 92       msg_data = malloc(hdr->datalen);
 93     
 94     /* copy */
 95     memmove(msg, hdr, sizeof(fusd_msg_t));
 96     if (hdr->datalen > 0)
 97       memmove(msg_data, fs->input_buffer->buf + sizeof(fusd_msg_t),
 98               hdr->datalen);
 99     msg->data = msg_data;
100     
101     /* shift extra */
102     if (fs->input_buffer->len > total_len) {
103       memmove(fs->input_buffer->buf, fs->input_buffer->buf + total_len, 
104               fs->input_buffer->len - total_len);
105       buf_shorten(fs->input_buffer, total_len);
106     }
107     else {
108       buf_free(fs->input_buffer);
109       fs->input_buffer = NULL;
110     }
111     
112     /* call arrived */
113     fs->arrived(fs, msg);
114   }
115 }
116 
117 
118 static
119 int fn_socket_handle_read(void *data, int fd, int cond, g_event_t *event)
120 {
121   fn_socket_t *fs = (fn_socket_t *)data;
122   char buf[4096];
123   int status;
124 
125   if (event == NULL) {
126     elog(LOG_WARNING, "*** Event is NULL!");
127     return EVENT_DONE;
128   }
129 
130   status = read(fd, buf, sizeof(buf));
131 
132   /* handle errors */
133   if (status < 0) {
134     if (!((errno == EINTR) || (errno == EAGAIN))) {
135       elog(LOG_WARNING, "Unexpected read error on socket %s: %m", fs->socket_name);
136       fs->closed(fs, errno);
137     }
138     goto done;
139   }
140 
141   /* handle close */
142   if (status == 0) {
143     fs->closed(fs, 0);
144     goto done;
145   }
146 
147   /* handle new data */
148   if (fs->input_buffer == NULL)
149     fs->input_buffer = buf_new();
150   bufcpy(fs->input_buffer, buf, status);
151 
152   /* process more data ? */
153   fn_socket_read_process(fs);
154 
155  done:
156   return EVENT_RENEW;
157 }
158 
159 /*
160  *  Write part
161  */
162 
163 
164 static 
165 int fn_socket_write_process(fn_socket_t *fs)
166 {
167   int status=0;
168 
169   if (fs->output_buffer == NULL) 
170     goto not_blocked;
171 
172   status = write(fs->socket_fd, fs->output_buffer->buf, fs->output_buffer->len);
173   if (status < 0) {
174     if (!((errno == EINTR) || (errno == EAGAIN))) {
175       elog(LOG_WARNING, "Unexpected read error on socket %s: %m", fs->socket_name);
176       fs->closed(fs, errno);
177     }
178     goto done;
179   }
180   
181   /* partial write */
182   if (status < fs->output_buffer->len) {
183     memmove(fs->output_buffer->buf, fs->output_buffer->buf+status,
184             fs->output_buffer->len - status);
185     buf_shorten(fs->output_buffer, status);
186     g_event_set_enable(fs->write_event, 1);
187     status = 0;
188     goto done;
189   }
190 
191  not_blocked:
192   if (fs->output_buffer) 
193     buf_free(fs->output_buffer);
194   fs->output_buffer = NULL;
195   g_event_set_enable(fs->write_event, 0);
196   
197  done:
198   return status;
199 }
200 
201 
202 static
203 int fn_socket_write_ready(void *data, int fd, int cond, g_event_t *event)
204 {
205   fn_socket_t *fs = (fn_socket_t *)data;
206   fn_socket_write_process(fs);
207   return EVENT_RENEW;
208 }
209 
210 
211 int fn_socket_write(fn_socket_t *fs, fusd_msg_t *msg)
212 {
213   if (fs->output_buffer == NULL)
214     fs->output_buffer = buf_new();
215 
216   bufcpy(fs->output_buffer, msg, sizeof(fusd_msg_t));
217   bufcpy(fs->output_buffer, msg->data, msg->datalen);
218   
219   return fn_socket_write_process(fs);
220 }
221 
222 
223 /*
224  *  destructor
225  */
226 
227 void fn_socket_destroy(fn_socket_t *sock)
228 {
229   if (sock) {
230     close(sock->socket_fd);
231     g_event_destroy(sock->write_event);
232     g_event_destroy(sock->read_event);
233     if (sock->input_buffer) buf_free(sock->input_buffer);
234     if (sock->output_buffer) buf_free(sock->output_buffer);
235     sock->input_buffer = NULL;
236     sock->output_buffer = NULL;
237   }
238 }
239 
240 
241 /*
242  *  Initializes the events, etc.
243  *    call this function with close, arrived, private_data,
244  *    socket_fd and socket_name set
245  *
246  *  Caller is responsible to free socket_name and private_data.
247  */
248 
249 int fn_socket_config(fn_socket_t *sock)
250 {
251   if (g_event_add(sock->socket_fd, FUSD_NOTIFY_INPUT, fn_socket_handle_read,            
252                   sock, NULL, &(sock->read_event)) < 0) {
253     elog(LOG_CRIT, "Can't create event for new socket: %m");
254     return -1;
255   }
256 
257   if (g_event_add(sock->socket_fd, FUSD_NOTIFY_OUTPUT, fn_socket_write_ready,           
258                   sock, NULL, &(sock->write_event)) < 0) {
259     elog(LOG_CRIT, "Can't create event for new socket: %m");
260     return -1;
261   }
262 
263   return 0;
264 }
265 

~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~ [ freetext search ] ~ [ file search ] ~

This page was automatically generated by the LXR engine.
Visit the LXR main site for more information.