1 /*
2 *
3 * Copyright (c) 2003 The Regents of the University of California. All
4 * rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 *
10 * - Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 *
13 * - Neither the name of the University nor the names of its
14 * contributors may be used to endorse or promote products derived
15 * from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS''
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
19 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
20 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
22 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
23 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
24 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
25 * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28 *
29 */
30
31 #include <fusdd_net_i.h>
32
33
34 /*
35 * Utility functions
36 */
37
38 void fusd_msg_free(fusd_msg_t *msg)
39 {
40 if (msg) {
41 if (msg->data) free(msg->data);
42 free(msg);
43 }
44 }
45
46
47 /*
48 * the Read side...
49 */
50
51 static void fn_socket_read_process(fn_socket_t *fs);
52 static int fn_socket_write_process(fn_socket_t *fs);
53
54
55 int fn_socket_read_blocked(fn_socket_t *fs)
56 {
57 int enabled;
58 g_event_get_enable(fs->read_event, &enabled);
59 return !enabled;
60 }
61
62
63 void fn_socket_read_set_blocked(fn_socket_t *fs, int block)
64 {
65 /* $$$ we may not want to actually block reads.. just block
66 * delivery to the app */
67 g_event_set_enable(fs->read_event, !block);
68 if (!block)
69 fn_socket_read_process(fs);
70 }
71
72
73 static
74 void fn_socket_read_process(fn_socket_t *fs)
75 {
76 while (fs->input_buffer && !fn_socket_read_blocked(fs)) {
77 fusd_msg_t *hdr = (fusd_msg_t *)(fs->input_buffer->buf);
78 int total_len = hdr->datalen + sizeof(fusd_msg_t);
79 char *msg_data = NULL;
80 fusd_msg_t *msg = NULL;
81
82 /* not a whole message? */
83 if ((fs->input_buffer->len < sizeof(fusd_msg_t)) ||
84 (fs->input_buffer->len < total_len))
85 break;
86
87 /* arrived! */
88
89 /* allocate */
90 msg = malloc(sizeof(fusd_msg_t));
91 if (hdr->datalen)
92 msg_data = malloc(hdr->datalen);
93
94 /* copy */
95 memmove(msg, hdr, sizeof(fusd_msg_t));
96 if (hdr->datalen > 0)
97 memmove(msg_data, fs->input_buffer->buf + sizeof(fusd_msg_t),
98 hdr->datalen);
99 msg->data = msg_data;
100
101 /* shift extra */
102 if (fs->input_buffer->len > total_len) {
103 memmove(fs->input_buffer->buf, fs->input_buffer->buf + total_len,
104 fs->input_buffer->len - total_len);
105 buf_shorten(fs->input_buffer, total_len);
106 }
107 else {
108 buf_free(fs->input_buffer);
109 fs->input_buffer = NULL;
110 }
111
112 /* call arrived */
113 fs->arrived(fs, msg);
114 }
115 }
116
117
118 static
119 int fn_socket_handle_read(void *data, int fd, int cond, g_event_t *event)
120 {
121 fn_socket_t *fs = (fn_socket_t *)data;
122 char buf[4096];
123 int status;
124
125 if (event == NULL) {
126 elog(LOG_WARNING, "*** Event is NULL!");
127 return EVENT_DONE;
128 }
129
130 status = read(fd, buf, sizeof(buf));
131
132 /* handle errors */
133 if (status < 0) {
134 if (!((errno == EINTR) || (errno == EAGAIN))) {
135 elog(LOG_WARNING, "Unexpected read error on socket %s: %m", fs->socket_name);
136 fs->closed(fs, errno);
137 }
138 goto done;
139 }
140
141 /* handle close */
142 if (status == 0) {
143 fs->closed(fs, 0);
144 goto done;
145 }
146
147 /* handle new data */
148 if (fs->input_buffer == NULL)
149 fs->input_buffer = buf_new();
150 bufcpy(fs->input_buffer, buf, status);
151
152 /* process more data ? */
153 fn_socket_read_process(fs);
154
155 done:
156 return EVENT_RENEW;
157 }
158
159 /*
160 * Write part
161 */
162
163
164 static
165 int fn_socket_write_process(fn_socket_t *fs)
166 {
167 int status=0;
168
169 if (fs->output_buffer == NULL)
170 goto not_blocked;
171
172 status = write(fs->socket_fd, fs->output_buffer->buf, fs->output_buffer->len);
173 if (status < 0) {
174 if (!((errno == EINTR) || (errno == EAGAIN))) {
175 elog(LOG_WARNING, "Unexpected read error on socket %s: %m", fs->socket_name);
176 fs->closed(fs, errno);
177 }
178 goto done;
179 }
180
181 /* partial write */
182 if (status < fs->output_buffer->len) {
183 memmove(fs->output_buffer->buf, fs->output_buffer->buf+status,
184 fs->output_buffer->len - status);
185 buf_shorten(fs->output_buffer, status);
186 g_event_set_enable(fs->write_event, 1);
187 status = 0;
188 goto done;
189 }
190
191 not_blocked:
192 if (fs->output_buffer)
193 buf_free(fs->output_buffer);
194 fs->output_buffer = NULL;
195 g_event_set_enable(fs->write_event, 0);
196
197 done:
198 return status;
199 }
200
201
202 static
203 int fn_socket_write_ready(void *data, int fd, int cond, g_event_t *event)
204 {
205 fn_socket_t *fs = (fn_socket_t *)data;
206 fn_socket_write_process(fs);
207 return EVENT_RENEW;
208 }
209
210
211 int fn_socket_write(fn_socket_t *fs, fusd_msg_t *msg)
212 {
213 if (fs->output_buffer == NULL)
214 fs->output_buffer = buf_new();
215
216 bufcpy(fs->output_buffer, msg, sizeof(fusd_msg_t));
217 bufcpy(fs->output_buffer, msg->data, msg->datalen);
218
219 return fn_socket_write_process(fs);
220 }
221
222
223 /*
224 * destructor
225 */
226
227 void fn_socket_destroy(fn_socket_t *sock)
228 {
229 if (sock) {
230 close(sock->socket_fd);
231 g_event_destroy(sock->write_event);
232 g_event_destroy(sock->read_event);
233 if (sock->input_buffer) buf_free(sock->input_buffer);
234 if (sock->output_buffer) buf_free(sock->output_buffer);
235 sock->input_buffer = NULL;
236 sock->output_buffer = NULL;
237 }
238 }
239
240
241 /*
242 * Initializes the events, etc.
243 * call this function with close, arrived, private_data,
244 * socket_fd and socket_name set
245 *
246 * Caller is responsible to free socket_name and private_data.
247 */
248
249 int fn_socket_config(fn_socket_t *sock)
250 {
251 if (g_event_add(sock->socket_fd, FUSD_NOTIFY_INPUT, fn_socket_handle_read,
252 sock, NULL, &(sock->read_event)) < 0) {
253 elog(LOG_CRIT, "Can't create event for new socket: %m");
254 return -1;
255 }
256
257 if (g_event_add(sock->socket_fd, FUSD_NOTIFY_OUTPUT, fn_socket_write_ready,
258 sock, NULL, &(sock->write_event)) < 0) {
259 elog(LOG_CRIT, "Can't create event for new socket: %m");
260 return -1;
261 }
262
263 return 0;
264 }
265
This page was automatically generated by the
LXR engine.
Visit the LXR main site for more
information.