~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~ [ freetext search ] ~ [ file search ] ~

Linux Cross Reference
cvs/emstar/devel/aml/doa_server.c


  1 /*
  2  * This is very simple server for aml reception.
  3  *
  4  * The main, event based part accepts connections from clients (nodes)
  5  * Data arrives in TCP segments. When a full payload has been
  6  * received,
  7  */
  8 
  9 #include <libmisc/misc.h>
 10 #include <libevent/event_tcp_server.h>
 11 #include <pthread.h>
 12 #include "doa_server.h"
 13 /* server headers? */
 14 
 15 /* accepted a new client */
 16 void accept_notify(ev_tcp_peer_t *peer, int errno_value)
 17 {
 18   doa_peer_data_t *state = g_new0(doa_peer_data_t,1);
 19   state->buffer = buf_new();
 20   ev_tcp_peer_set_peer_data(peer, state); 
 21   elog(LOG_NOTICE, "Accept %d %s", 
 22        errno_value, ev_tcp_peer_get_peer_name(peer));
 23 }
 24 
 25 /* called when we lose a client */
 26 void close_notify(ev_tcp_peer_t *peer, int errno_value)
 27 {
 28   doa_peer_data_t *state = (doa_peer_data_t *)ev_tcp_peer_get_peer_data(peer);
 29   elog(LOG_NOTICE, "Close %d %s", errno_value, ev_tcp_peer_get_peer_name(peer));
 30   g_event_destroy(state->timer);
 31   free(state);
 32 }
 33 
 34 /* receive buffer is drained */
 35 void drained(ev_tcp_peer_t *peer)
 36 {
 37   elog(LOG_NOTICE, "Drained %s", ev_tcp_peer_get_peer_name(peer));
 38 }
 39 
 40 /* data is ready from client */
 41 int data_ready(ev_tcp_peer_t *peer, buf_t *data)
 42 {
 43   /* get client specific data and our state */
 44   doa_peer_data_t *state = (doa_peer_data_t *)ev_tcp_peer_get_peer_data(peer);
 45   doa_server_state_t *ss = (doa_server_state_t *)ev_tcp_peer_get_server_data(peer);
 46 
 47   /* add whatever data just arrived into the buffer in peer data struct */
 48   bufcat(state->buffer, data);
 49 
 50   /* check to see if we have a full payload */
 51   if (state->buffer->len == sizeof(struct doa_result_pkt)) {
 52 
 53     /* copy it, and push data to work queue */
 54     buf_t *q_item = buf_new();
 55     bufcpy(q_item, state->buffer->buf, state->buffer->len);    
 56     if (g_msg_queue_push(ss->work_queue, q_item) < 0) {
 57       elog(LOG_NOTICE,"failed to enqueue");
 58       buf_free(q_item);
 59     }
 60 
 61     /* close peer connection */
 62     buf_free(state->buffer);
 63     ev_tcp_peer_close(peer,0);
 64   }
 65   /* clean up the data buffer we were given in this callback */
 66   buf_free(data);
 67   return EVENT_RENEW;
 68 }
 69 
 70 /* the thread that does the work */
 71 void *work_thread(void *arg)
 72 {
 73   doa_server_state_t *ss = (doa_server_state_t *)arg;
 74 
 75   /* spin waiting for new messages on queue */
 76   while(1) {
 77     buf_t *item = g_msg_queue_wait(ss->work_queue, -1);
 78     if (item == NULL) {
 79       elog(LOG_NOTICE,"work queue stopped");
 80       break;
 81     }
 82 
 83     /* do work here */
 84     struct doa_result_pkt *doa_data = (struct doa_result_pkt *)item->buf;
 85     elog(LOG_NOTICE, "New data received from node %d",doa_data->node_id);
 86     elog(LOG_NOTICE, "DOA = %f",doa_data->doa_result;
 87   } 
 88   return 0;
 89 }
 90 
 91 int main(int argc, char **argv)
 92 {
 93   misc_init(&argc, argv, CVSTAG);
 94 
 95   /* create a server */
 96   doa_server_state_t ss = {
 97   server:NULL  /* required for correct init */
 98   };
 99 
100   /* work queue, where data is sent after it is received */
101   msg_queue_opts_t q_opts = {
102     name: "work queue",
103     synchronous: 1
104   };
105   if (g_msg_queue(&q_opts, &ss.work_queue) < 0) {
106     elog(LOG_NOTICE,"couldn't create queue");
107     exit(1);
108   }
109 
110   /* work thread: reads incoming messages off the work queue */
111   if (pthread_create(&(ss.work_thread), NULL, work_thread, &ss) < 0) {
112     elog(LOG_CRIT,"couldn't launch thread");
113     exit(1);
114   }
115 
116   emrun_opts_t emrun_opts;
117   
118   /* server options - sets up the callbacks for events */
119   ev_tcp_server_opts_t s_opts = {
120     port: 8080,
121     server_name: "192.168.11.150", /* FIXME: change to dynamic */
122     on_accept: accept_notify,
123     default_peer_opts: {
124       data_ready: data_ready,
125       on_close: close_notify,
126       write_buf_drained: drained,
127     },
128     server_info:&ss
129   };
130   
131   if (ev_tcp_server_new(&s_opts, &(ss.server)) < 0) {
132     elog(LOG_CRIT, "can't create server %m");
133     exit(1);
134   }
135   
136   emrun_init(&emrun_opts);
137    
138   g_main();
139   return 1;
140 }
141 

~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~ [ freetext search ] ~ [ file search ] ~

This page was automatically generated by the LXR engine.
Visit the LXR main site for more information.