1 /*
2 * This is very simple server for aml reception.
3 *
4 * The main, event based part accepts connections from clients (nodes)
5 * Data arrives in TCP segments. When a full payload has been
6 * received,
7 */
8
9 #include <libmisc/misc.h>
10 #include <libevent/event_tcp_server.h>
11 #include <pthread.h>
12 #include "doa_server.h"
13 /* server headers? */
14
15 /* accepted a new client */
16 void accept_notify(ev_tcp_peer_t *peer, int errno_value)
17 {
18 doa_peer_data_t *state = g_new0(doa_peer_data_t,1);
19 state->buffer = buf_new();
20 ev_tcp_peer_set_peer_data(peer, state);
21 elog(LOG_NOTICE, "Accept %d %s",
22 errno_value, ev_tcp_peer_get_peer_name(peer));
23 }
24
25 /* called when we lose a client */
26 void close_notify(ev_tcp_peer_t *peer, int errno_value)
27 {
28 doa_peer_data_t *state = (doa_peer_data_t *)ev_tcp_peer_get_peer_data(peer);
29 elog(LOG_NOTICE, "Close %d %s", errno_value, ev_tcp_peer_get_peer_name(peer));
30 g_event_destroy(state->timer);
31 free(state);
32 }
33
34 /* receive buffer is drained */
35 void drained(ev_tcp_peer_t *peer)
36 {
37 elog(LOG_NOTICE, "Drained %s", ev_tcp_peer_get_peer_name(peer));
38 }
39
40 /* data is ready from client */
41 int data_ready(ev_tcp_peer_t *peer, buf_t *data)
42 {
43 /* get client specific data and our state */
44 doa_peer_data_t *state = (doa_peer_data_t *)ev_tcp_peer_get_peer_data(peer);
45 doa_server_state_t *ss = (doa_server_state_t *)ev_tcp_peer_get_server_data(peer);
46
47 /* add whatever data just arrived into the buffer in peer data struct */
48 bufcat(state->buffer, data);
49
50 /* check to see if we have a full payload */
51 if (state->buffer->len == sizeof(struct doa_result_pkt)) {
52
53 /* copy it, and push data to work queue */
54 buf_t *q_item = buf_new();
55 bufcpy(q_item, state->buffer->buf, state->buffer->len);
56 if (g_msg_queue_push(ss->work_queue, q_item) < 0) {
57 elog(LOG_NOTICE,"failed to enqueue");
58 buf_free(q_item);
59 }
60
61 /* close peer connection */
62 buf_free(state->buffer);
63 ev_tcp_peer_close(peer,0);
64 }
65 /* clean up the data buffer we were given in this callback */
66 buf_free(data);
67 return EVENT_RENEW;
68 }
69
70 /* the thread that does the work */
71 void *work_thread(void *arg)
72 {
73 doa_server_state_t *ss = (doa_server_state_t *)arg;
74
75 /* spin waiting for new messages on queue */
76 while(1) {
77 buf_t *item = g_msg_queue_wait(ss->work_queue, -1);
78 if (item == NULL) {
79 elog(LOG_NOTICE,"work queue stopped");
80 break;
81 }
82
83 /* do work here */
84 struct doa_result_pkt *doa_data = (struct doa_result_pkt *)item->buf;
85 elog(LOG_NOTICE, "New data received from node %d",doa_data->node_id);
86 elog(LOG_NOTICE, "DOA = %f",doa_data->doa_result;
87 }
88 return 0;
89 }
90
91 int main(int argc, char **argv)
92 {
93 misc_init(&argc, argv, CVSTAG);
94
95 /* create a server */
96 doa_server_state_t ss = {
97 server:NULL /* required for correct init */
98 };
99
100 /* work queue, where data is sent after it is received */
101 msg_queue_opts_t q_opts = {
102 name: "work queue",
103 synchronous: 1
104 };
105 if (g_msg_queue(&q_opts, &ss.work_queue) < 0) {
106 elog(LOG_NOTICE,"couldn't create queue");
107 exit(1);
108 }
109
110 /* work thread: reads incoming messages off the work queue */
111 if (pthread_create(&(ss.work_thread), NULL, work_thread, &ss) < 0) {
112 elog(LOG_CRIT,"couldn't launch thread");
113 exit(1);
114 }
115
116 emrun_opts_t emrun_opts;
117
118 /* server options - sets up the callbacks for events */
119 ev_tcp_server_opts_t s_opts = {
120 port: 8080,
121 server_name: "192.168.11.150", /* FIXME: change to dynamic */
122 on_accept: accept_notify,
123 default_peer_opts: {
124 data_ready: data_ready,
125 on_close: close_notify,
126 write_buf_drained: drained,
127 },
128 server_info:&ss
129 };
130
131 if (ev_tcp_server_new(&s_opts, &(ss.server)) < 0) {
132 elog(LOG_CRIT, "can't create server %m");
133 exit(1);
134 }
135
136 emrun_init(&emrun_opts);
137
138 g_main();
139 return 1;
140 }
141
This page was automatically generated by the
LXR engine.
Visit the LXR main site for more
information.