~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~ [ freetext search ] ~ [ file search ] ~

Linux Cross Reference
cvs/emstar/link/liblink/link_parse.c


  1 /*
  2  *
  3  * Copyright (c) 2004 The Regents of the University of California.  All 
  4  * rights reserved.
  5  *
  6  * Redistribution and use in source and binary forms, with or without
  7  * modification, are permitted provided that the following conditions
  8  * are met:
  9  *
 10  * - Redistributions of source code must retain the above copyright
 11  *   notice, this list of conditions and the following disclaimer.
 12  *
 13  * - Neither the name of the University nor the names of its
 14  *   contributors may be used to endorse or promote products derived
 15  *   from this software without specific prior written permission.
 16  *
 17  * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS''
 18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
 19  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
 20  * PARTICULAR  PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR
 21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
 22  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
 23  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
 24  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
 25  * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 26  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 27  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 28  *
 29  */
 30 
 31 #include <libmisc/misc.h>
 32 #include <link/link.h>
 33 
 34 static char *ssync_versions[16] = {
 35   "",
 36   "FLOOD_V1",
 37   "2",
 38   "3",
 39   "RETRANS_V2",
 40   "5",
 41   "6",
 42   "7",
 43   "8",
 44   "9",
 45   "10",
 46   "11",
 47   "12",
 48   "13",
 49   "14",
 50   "15",
 51 };
 52 
 53 static char *ssync_opcodes[8] = {
 54   "DATA",
 55   "1",
 56   "REQ",
 57   "3",
 58   "4",
 59   "5",
 60   "6",
 61   "7"
 62 };
 63 
 64 
 65 char *dump_ssync_log_cmd(uint8_t cmd)
 66 {
 67   static char buf[32];
 68 
 69   switch (cmd) {
 70 
 71   case SSYNC_LOG_INIT: return "INIT";
 72   case SSYNC_LOG_ADD: return "ADD";
 73   case SSYNC_LOG_DEL: return "DEL"; 
 74   case SSYNC_LOG_FRAG: return "FRAG";
 75   case SSYNC_LOG_CONT: return "CONT";
 76   case SSYNC_LOG_LAST: return "LAST";
 77   case SSYNC_LOG_CHK: return "CHK";
 78   case SSYNC_LOG_TERM: return "TERM";
 79 
 80   case RETX2_CTRL_SEQNO: return "CTRL_SEQNO";
 81   case RETX2_CTRL_LIST: return "CTRL_LIST";
 82   case RETX2_CTRL_NACK: return "CTRL_NACK";
 83   case RETX2_CTRL_ADDR: return "CTRL_ADDR";
 84   case RETX2_CTRL_FLOW: return "CTRL_FLOW";
 85   case RETX2_CTRL_NOP: return "CTRL_NOP";
 86 
 87   default: 
 88     sprintf(buf, "??0x%x??", cmd);
 89     return buf;
 90   }
 91 }
 92 
 93 
 94 char *ssync_flowid_to_str(flow_id_t *fid)
 95 {
 96   DECLARE_STATIC_BUF_RING(buf, 10, 256);
 97   sprintf(buf, "%s/%d->",
 98           print_if_id(fid->src), fid->src_if); 
 99   sprintf(buf+strlen(buf), "%s/%d[%d/%d]",
100           print_if_id(fid->dst), fid->dst_if,
101           fid->flow_index, fid->max_hops);
102   return buf;
103 }
104 
105 
106 void dump_ssync_flowid(buf_t *buf, flow_id_t *fid)
107 {
108   bufprintf(buf, "%s", ssync_flowid_to_str(fid));
109 }
110 
111 
112 int dump_ssync(buf_t *buf, link_pkt_t *pkt, char *data_start, 
113                int data_len, char *prefix, flow_id_t *fid)
114 {
115   ssync_hdr_t *hdr = (ssync_hdr_t *)data_start;
116   flow_id_t local_fid;
117   int reverse = 0;
118   int eat = 0;
119   char prefix2[32];
120 
121   sprintf(prefix2, "%s  ", prefix);
122 
123   if (data_len < sizeof(ssync_hdr_t)) goto short_pkt;
124   
125   bufprintf(buf, "%s%s(%s) %s\n",
126             prefix, ssync_versions[hdr->version],
127             ssync_opcodes[hdr->opcode],
128             hdr->unicast ? ", Unicast" : "");
129   eat += sizeof(ssync_hdr_t);
130 
131   /* handle V2 */
132   switch (hdr->version) {
133   case SSYNC_RETRANS_V2: {
134     ssync_msg_iter_t *iter = 
135       ssync_msg_iter_new((char *)hdr->data, data_len - sizeof(ssync_hdr_t), 0);
136     
137     for (ssync_msg_iter_top(iter); ssync_msg_iter_valid(iter); 
138          ssync_msg_iter_next(iter)) {
139       int print_data = 1;
140       int has_type = 0;
141       
142       char target_if[32];
143       target_if[0]=0;
144       if (iter->if_addr) sprintf(target_if, "[target_if=%s]", 
145                                  print_if_id(iter->if_addr));
146       
147       switch (iter->command) {
148       case SSYNC_LOG_CHK: 
149       case SSYNC_LOG_TERM:
150       case RETX2_CTRL_SEQNO: 
151       case RETX2_CTRL_LIST: 
152       case RETX2_CTRL_ADDR: 
153       case RETX2_CTRL_FLOW: 
154         print_data = 0;
155         /* fall through */
156         
157       case SSYNC_LOG_ADD: 
158       case SSYNC_LOG_DEL: 
159       case SSYNC_LOG_FRAG:
160         has_type = (iter->length > 0);
161         /* fall through */
162         
163       case RETX2_CTRL_NOP: 
164       case SSYNC_LOG_INIT: 
165       case SSYNC_LOG_CONT:
166       case SSYNC_LOG_LAST:
167       default:
168         bufprintf(buf, "%s%s(%d) target %s%d%s, flow %d, seqno %x:%u", 
169                   prefix, dump_ssync_log_cmd(iter->command), iter->length,
170                   iter->target.ci.head ? "*" : "", iter->target.ci.index, 
171                   target_if, iter->flow_index, iter->list_index, 
172                   iter->seqno);
173 
174         if (print_data) {         
175           if (has_type) 
176             bufprintf(buf, ", type_index %d", iter->msg[0]);
177           bufprintf(buf, ", data: \n");
178           if (has_type) 
179             misc_hexdump_to_buf(buf, iter->msg+1, iter->length-1, prefix2);
180           else
181             misc_hexdump_to_buf(buf, iter->msg, iter->length, prefix2);
182         }
183         
184         else {
185           if (iter->command == RETX2_CTRL_ADDR) 
186             bufprintf(buf, ", Map to if_addr %s", print_if_id(iter->if_addr));
187           else if (iter->command == RETX2_CTRL_FLOW) 
188             bufprintf(buf, ", Map to flow %s [%d hops]", 
189                       ssync_flowid_to_str(&(iter->curr_flow.id)),
190                       iter->curr_flow.hops);
191           bufprintf(buf, "\n");
192         }
193         
194         break;
195         
196       case RETX2_CTRL_NACK:
197         bufprintf(buf, "%s%s(%d) target %s%d%s, flow %d",
198                   prefix, dump_ssync_log_cmd(iter->command), iter->length,
199                   iter->target.ci.head ? "*" : "", iter->target.ci.index, 
200                   target_if, iter->flow_index);
201         switch (iter->nack_init) {
202         case RETX2_NACK_INIT:
203           bufprintf(buf, " **INIT**\n"); break;
204         case RETX2_NACK_INIT_CHK:
205           bufprintf(buf, " **INIT CHECK**\n"); break;
206         default:
207           bufprintf(buf, ", seqno %x:%u, count=%d\n", 
208                     iter->nack_list_index, 
209                     iter->nack_seqno, iter->nack_count);
210         }
211         break;
212 
213       }
214     }
215     
216     eat = data_len - iter->garbage;
217     if (iter->garbage > 0)
218       bufprintf(buf, "%s***extra junk!\n", prefix);
219     ssync_msg_iter_destroy(iter);
220     break;
221   }
222   
223   default:
224     /* create flow id */
225     if (fid) 
226       local_fid = *fid;
227     else {
228       local_fid.src_if = pkt->link_index;
229       local_fid.dst_if = pkt->link_index;
230       local_fid.flow_index = 0;
231       local_fid.src = reverse ? pkt->dst.id : pkt->src.id;
232       local_fid.dst = reverse ? pkt->src.id : pkt->dst.id;
233       if (!hdr->unicast) local_fid.dst = LINK_BROADCAST;
234     }
235     
236     bufprintf(buf, "%s", prefix);
237     dump_ssync_flowid(buf, &local_fid);
238     break;
239   }
240   
241   return eat;
242 
243  short_pkt:
244   bufprintf(buf, "%s***short packet!\n", prefix);
245   return 0;
246 }
247 
248 
249 /*
250  *  retrans2 parsing code
251  */
252 
253 ssync_msg_iter_t *ssync_msg_iter_new(char *data, int data_len, int MTU)
254 {
255   ssync_msg_iter_t *iter = g_new0(ssync_msg_iter_t, 1);
256   iter->orig = data;
257   iter->orig_len = data_len;
258   iter->MTU = MTU;
259   return iter;
260 }
261 
262 void ssync_msg_iter_destroy(ssync_msg_iter_t *iter)
263 {
264   if (iter->grow_pkt)
265     buf_free(iter->grow_pkt);
266   free(iter);
267 }
268 
269 
270 void ssync_msg_iter_top(ssync_msg_iter_t *iter)
271 {
272   char *orig = iter->orig;
273   int orig_len = iter->orig_len;
274   memset(iter, 0, sizeof(*iter));
275   iter->data = iter->orig = orig;
276   iter->remain = iter->orig_len = orig_len;
277   ssync_msg_iter_next(iter);
278 }
279 
280 
281 int ssync_msg_iter_valid(ssync_msg_iter_t *iter)
282 {
283   return iter && iter->valid;
284 }
285 
286 
287 static
288 int ssync_parse_byte(ssync_msg_iter_t *iter, char *byte, char *reason)
289 {
290   if (iter->remain < 1) {
291     elog(LOG_WARNING, "bad format: %s", reason);
292     return -1;
293   }
294   *byte = iter->data[0];      
295   iter->data++;
296   iter->remain--;
297   return 0;
298 }
299 
300 
301 void ssync_msg_iter_next(ssync_msg_iter_t *iter)
302 {
303   /* maybe inc and reset bits */
304   if (iter->inc_next) iter->seqno++;
305   iter->inc_next = 0;
306   iter->valid = 0;
307 
308   uint8_t length = 0;
309 
310   /* more data to parse? */
311   if (iter->data && iter->remain > 0) {
312 
313     ssync_retx2_ctrl_t ctrl;
314 
315     /* are we in a nack? */
316     if (iter->nack_cont)
317       goto do_nack;
318 
319     /* first byte is the control byte */
320     if (ssync_parse_byte(iter, (char*)&ctrl, "???") < 0) goto fail;
321 
322     if (ctrl.code_book) {
323       elog(LOG_WARNING, "codebook not supported");
324       goto fail;
325     }
326 
327     /* parse the new flow index/targetd */
328     switch (ctrl.format) {
329 
330     case RETX2_FMT_NEW_TARGET:
331       if (ssync_parse_byte(iter, (char *)&(iter->target.byte), 
332                            "no new target") < 0) goto fail;
333       /* fall through */
334       
335     case RETX2_FMT_NEW_FID:
336       if (ssync_parse_byte(iter, (char *)&(iter->flow_index), 
337                            "no new flow index") < 0) goto fail;
338       break;
339 
340     case RETX2_FMT_SAME:
341     case RETX2_FMT_ESC:
342     default:
343       break;
344     }
345 
346     /* set the address from the map */
347     iter->if_addr = iter->addr_map[iter->target.byte];
348     
349     /* set the flow from the map */
350     memmove(&(iter->curr_flow), &(iter->flow_map[iter->flow_index]),
351             sizeof(flow_entry_t));
352     
353     /* pull out command */
354     iter->command = ctrl.command;
355 
356     /* set inc_next */
357     switch (iter->command) {
358     case SSYNC_LOG_INIT:
359     case SSYNC_LOG_ADD:
360     case SSYNC_LOG_DEL:
361     case SSYNC_LOG_FRAG:
362     case SSYNC_LOG_CONT:
363     case SSYNC_LOG_LAST:
364     case SSYNC_LOG_CHK:
365     case SSYNC_LOG_TERM:
366       iter->inc_next = 1;
367       break;
368       
369     default:
370       break;
371     }
372         
373     /* if length is set, parse it */
374     if (ctrl.length_set) {
375 
376       switch (iter->command) {
377       case SSYNC_LOG_CHK:
378       case SSYNC_LOG_TERM:
379       case RETX2_CTRL_NACK:
380       case RETX2_CTRL_SEQNO:
381       case RETX2_CTRL_LIST:
382       case RETX2_CTRL_ADDR:
383       case RETX2_CTRL_FLOW:
384         elog(LOG_WARNING, "bad format: command %d should not have length",
385              iter->command);
386         goto fail;
387 
388       default:
389         break;
390       }
391 
392       if (ssync_parse_byte(iter, (char *)&(iter->last_set_length), 
393                            "no length byte") < 0) goto fail;
394     }
395     
396     /* map the default lengths */
397     iter->length = 0;
398     switch (iter->command) {
399     case SSYNC_LOG_INIT:
400     case SSYNC_LOG_ADD:
401     case SSYNC_LOG_DEL:
402     case SSYNC_LOG_FRAG:
403     case SSYNC_LOG_CONT:
404     case SSYNC_LOG_LAST:
405     case RETX2_CTRL_NOP:
406       iter->length = iter->last_set_length;
407       length = iter->length;
408       break;
409       
410     case RETX2_CTRL_SEQNO:
411       length = sizeof(log_seqno_t);
412       break;
413 
414     case RETX2_CTRL_LIST:
415       length = sizeof(ssync_seqno_entry_t);
416       break;
417 
418     case RETX2_CTRL_ADDR:
419       length = sizeof(if_id_t);
420       break;
421 
422     case RETX2_CTRL_FLOW:
423       length = sizeof(flow_entry_t);
424       break;
425 
426     default:
427       break;
428     }
429     
430     /* test for overrun */
431     if (iter->remain < length) {
432       elog(LOG_WARNING, "bad format: short message; ctrl=%x, length=%d, remain=%d",
433            *(char*)&ctrl, iter->length, iter->remain);
434       goto fail;          
435     }
436 
437     /* report command, data, length */
438     iter->msg = NULL;
439     switch (iter->command) {
440     default:
441       /* OK, report this one.. */
442       iter->msg = iter->data;
443       break;
444       
445     case RETX2_CTRL_SEQNO:
446       memmove(&(iter->seqno), iter->data, sizeof(iter->seqno));
447       break;
448       
449     case RETX2_CTRL_LIST: {
450       ssync_seqno_entry_t entry;
451       memmove(&entry, iter->data, sizeof(entry));
452       iter->list_index = entry.list_index;
453       iter->seqno = entry.seqno;
454       break;
455     }
456 
457     case RETX2_CTRL_ADDR:
458       memmove(&(iter->addr_map[iter->target.byte]), iter->data, sizeof(if_id_t));
459       iter->if_addr = iter->addr_map[iter->target.byte];
460       break;
461 
462     case RETX2_CTRL_FLOW:
463       memmove(&(iter->flow_map[iter->flow_index]), iter->data, sizeof(flow_entry_t));
464       iter->curr_flow = iter->flow_map[iter->flow_index];
465       break;
466 
467     case RETX2_CTRL_NACK:
468       goto do_nack;
469     }
470     
471     /* consume data from buffer and report */
472     iter->remain -= length;
473     iter->data += length;
474     iter->valid = 1;
475     return;
476 
477   do_nack:
478     /* parse the nack command byte */
479     if (ssync_parse_byte(iter, (char*)&(iter->nack_cmd), 
480                          "no nack control byte") < 0) goto fail;
481 
482     /* is this nack continued? */
483     iter->nack_cont = (!iter->nack_cmd.last_nack);
484 
485     /* clear the init mode */
486     iter->nack_init = 0;
487     
488     /* parse list index? */
489     if (iter->nack_cmd.mode == RETX2_NACK_LIST_SET) {
490       if (ssync_parse_byte(iter, (char*)&(iter->nack_list_index), 
491                            "no nack list index") < 0) goto fail;
492     }
493     
494     /* record init bits */
495     else {
496       iter->nack_init = iter->nack_cmd.mode;
497       elog(LOG_DEBUG(0), "got init nack, type %d,%d", iter->nack_init,
498            iter->nack_cmd.mode);
499     }
500     
501     /* parse length */
502     if (iter->nack_cmd.seq_count == 0) {    
503       if (iter->remain < sizeof(int16_t)) {
504         elog(LOG_WARNING, "bad format: no nack count");
505         goto fail;
506       }
507       memmove(&(iter->nack_count), iter->data, sizeof(int16_t));
508       iter->data += sizeof(int16_t);
509       iter->remain -= sizeof(int16_t);
510     }
511     else
512       iter->nack_count = iter->nack_cmd.seq_count;
513     
514     /* parse seqno */
515     if (iter->remain < sizeof(log_seqno_t)) {
516       elog(LOG_WARNING, "bad format: no nack seqno");
517       goto fail;
518     }
519     memmove(&(iter->nack_seqno), iter->data, sizeof(log_seqno_t));
520     iter->data += sizeof(log_seqno_t);
521     iter->remain -= sizeof(log_seqno_t);
522     
523     /* done.. */
524     iter->valid = 1;
525     return;    
526   }
527 
528  fail:
529   iter->garbage = iter->remain;
530   iter->remain = 0;
531   iter->data = NULL;
532   return;
533 }
534 
535 
536 static
537 void nack_term(ssync_msg_iter_t *iter) 
538 {
539   if (iter->nack_cont && iter->last_nack_ctrl) {
540     ((ssync_retx2_nack_ctrl_t *)(iter->last_nack_ctrl))->last_nack = 1;
541   }
542   iter->nack_cont = 0;
543   iter->last_nack_ctrl = NULL;
544 }
545 
546 
547 buf_t *ssync_msg_iter_finalize_msg(ssync_msg_iter_t *iter)
548 {
549   buf_t *retval = iter->grow_pkt;
550   nack_term(iter);
551   memset(iter, 0, sizeof(*iter));
552   return retval;
553 }
554 
555 
556 void ssync_msg_iter_init_msg(ssync_msg_iter_t *iter, int opcode, buf_t *header)
557 {
558   if (iter->grow_pkt) buf_free(iter->grow_pkt);
559   if (header)
560     iter->grow_pkt = header;
561   else
562     iter->grow_pkt = buf_new();
563 
564   /* this amount is deducted before text vs. MTU */
565   iter->header_orig_len = iter->grow_pkt->len;
566 
567   ssync_hdr_t hdr = {
568     version: SSYNC_RETRANS_V2,
569     opcode: opcode
570   };
571   bufcpy(iter->grow_pkt, &hdr, sizeof(hdr));
572 }
573 
574 
575 int ssync_msg_iter_get_len(ssync_msg_iter_t *iter)
576 {
577   if (iter && iter->grow_pkt) 
578     return iter->grow_pkt->len - iter->header_orig_len;
579   return 0;
580 }
581 
582 
583 /* length < 0 ==> not set or use last */
584 int ssync_msg_iter_append_msg(ssync_msg_iter_t *iter,
585                               uint8_t command, uint8_t flow_index, cl_index_t target,
586                               uint8_t list_index, log_seqno_t seqno,
587                               int nack_init, uint16_t nack_count,
588                               char *msg, uint8_t length)
589 {
590   int space = 0;
591   ssync_retx2_nack_ctrl_t nack_ctrl = {};
592   ssync_retx2_ctrl_t ctrl = {
593     command: command
594   };
595 
596   /* create packet */
597   if (iter->grow_pkt == NULL) {
598     elog(LOG_WARNING, "forgot to init packet??");
599     return -1;
600   }
601 
602   int start_len = iter->grow_pkt->len;
603 
604   /* compute length */
605   switch (command) {
606   case SSYNC_LOG_INIT:
607   case SSYNC_LOG_ADD:
608   case SSYNC_LOG_DEL:
609   case SSYNC_LOG_FRAG:
610   case SSYNC_LOG_CONT:
611   case SSYNC_LOG_LAST:
612   case RETX2_CTRL_NOP: 
613     /* set length if different than last set */
614     if (iter->last_set_length != length) {
615       ctrl.length_set = 1;
616       iter->last_set_length = length;
617     }
618     break;
619     
620   case RETX2_CTRL_SEQNO:
621   case RETX2_CTRL_LIST:
622     /* no need to push seqnos.. */
623     elog(LOG_WARNING, "not necessary to push seqnos");
624     return 0;
625 
626   case RETX2_CTRL_ADDR:
627   case RETX2_CTRL_FLOW:
628     /* leave length as is */
629     break;
630 
631   case RETX2_CTRL_NACK:
632   case SSYNC_LOG_CHK:
633   case SSYNC_LOG_TERM:
634   default:
635     length = 0;
636     break;
637   }
638 
639   iter->length = length;
640   space = 1;
641 
642   /* drop out of NACK mode if we need to */
643   if (command != RETX2_CTRL_NACK) nack_term(iter);
644   
645   /* set format parameter */
646   if (target.byte != iter->target.byte) {
647     nack_term(iter);
648     ctrl.format = RETX2_FMT_NEW_TARGET;
649     iter->target = target;
650     iter->flow_index = flow_index;
651     space += 2;
652   }
653 
654   else if (iter->flow_index != flow_index) {
655     nack_term(iter);
656     ctrl.format = RETX2_FMT_NEW_FID;
657     iter->flow_index = flow_index;    
658     space++;
659   }
660 
661   /* external length byte */
662   if (ctrl.length_set) space++;
663   
664   /* figure out nack continuation */
665   if (command == RETX2_CTRL_NACK) {
666     /* if continued, no second control byte */
667     if (iter->nack_cont) space = 0;
668   }
669 
670   /* if NACK, add nack control byte */
671   if (command == RETX2_CTRL_NACK) {
672     space += 3;
673     
674     if (nack_count <= 0x1F) nack_ctrl.seq_count = nack_count;
675     else space += 2;
676     
677     if (nack_init) {
678       nack_ctrl.mode = nack_init;
679       nack_ctrl.seq_count = 1;
680     }
681 
682     else if (list_index != iter->nack_list_index) {
683       iter->nack_list_index = list_index;
684       nack_ctrl.mode = RETX2_NACK_LIST_SET;
685       space++;
686     }
687     
688   }
689 
690   else {
691     space += iter->length;
692     if (seqno != iter->seqno) space += 3;
693     if (list_index != iter->list_index) space++;
694   }
695   
696   /* no space? */
697   if ((space + iter->grow_pkt->len) > (iter->MTU + iter->header_orig_len)) 
698     return -1;
699 
700   /* 
701    * ok, push the new data 
702    */
703 
704   /* if not a continued NACK */
705   if (!iter->nack_cont) {
706 
707     /* seqno/listindex for NACKs are handled separately */
708     if (command != RETX2_CTRL_NACK) {
709       /* push new list index & seqno? */
710       if (list_index != iter->list_index) {
711         /* list control byte */
712         ssync_retx2_ctrl_t ctrl = {
713           command: RETX2_CTRL_LIST
714         };
715         bufcpy(iter->grow_pkt, &ctrl, sizeof(ctrl));
716         bufcpy(iter->grow_pkt, &list_index, sizeof(list_index));
717         bufcpy(iter->grow_pkt, &seqno, sizeof(seqno));
718         iter->seqno = seqno;
719         iter->list_index = list_index;
720       }
721       
722       /* push new seqno? */
723       if (seqno != iter->seqno) {
724         /* seqno control byte */
725         ssync_retx2_ctrl_t ctrl = {
726           command: RETX2_CTRL_SEQNO
727         };
728         bufcpy(iter->grow_pkt, &ctrl, sizeof(ctrl));
729         bufcpy(iter->grow_pkt, &seqno, sizeof(seqno));
730         iter->seqno = seqno;
731       }
732     }
733     
734     /* control byte */
735     elog(LOG_DEBUG(10), "control byte: cmd %d len %d format %d",
736          ctrl.command, ctrl.length_set, ctrl.format);
737     bufcpy(iter->grow_pkt, &ctrl, sizeof(ctrl));
738     
739     /* target, flow index */
740     switch (ctrl.format) {
741     case RETX2_FMT_NEW_TARGET:
742       bufcpy(iter->grow_pkt, &(iter->target), sizeof(iter->target));    
743       /* fall through */
744       
745     case RETX2_FMT_NEW_FID:
746       bufcpy(iter->grow_pkt, &(iter->flow_index), sizeof(iter->flow_index));
747       break;
748       
749     default:
750       break;
751     }
752     
753     if (ctrl.length_set) {
754       bufcpy(iter->grow_pkt, &(iter->length), sizeof(iter->length));
755     }    
756   }
757 
758   switch (command) {
759   case RETX2_CTRL_NACK:
760 
761     elog(LOG_DEBUG(10), "NACK control byte: count %d last %d mode %d",
762          nack_ctrl.seq_count, nack_ctrl.last_nack, nack_ctrl.mode);
763     
764     bufcpy(iter->grow_pkt, &nack_ctrl, sizeof(nack_ctrl));
765     space = 1;
766     
767     /* push the list index */
768     if (nack_ctrl.mode == RETX2_NACK_LIST_SET) {
769       space += sizeof(iter->nack_list_index);
770       bufcpy(iter->grow_pkt, &(iter->nack_list_index), 
771              sizeof(iter->nack_list_index));
772     }
773 
774     /* push the length */
775     if (nack_ctrl.seq_count == 0) {
776       space += sizeof(nack_count);
777       bufcpy(iter->grow_pkt, &nack_count,
778              sizeof(nack_count));
779     }
780 
781     /* push the sequence number */
782     space += sizeof(seqno);
783     bufcpy(iter->grow_pkt, &(seqno), sizeof(seqno));
784     
785     /* if nack, set the continuation bit and ctrl pointer */
786     iter->nack_cont = 1;
787     iter->last_nack_ctrl = iter->grow_pkt->buf + iter->grow_pkt->len - space;      
788 
789     elog(LOG_DEBUG(10), "NACK control byte: %x, last %x",
790          *(uint8_t*)&nack_ctrl, *iter->last_nack_ctrl);
791 
792     break;
793 
794   default:
795     if (iter->length > 0) 
796       bufcpy(iter->grow_pkt, msg, iter->length);
797     break;
798   }
799 
800   /* increment the seqno */
801   switch (command) {
802   case SSYNC_LOG_INIT:
803   case SSYNC_LOG_ADD:
804   case SSYNC_LOG_DEL:
805   case SSYNC_LOG_FRAG:
806   case SSYNC_LOG_CONT:
807   case SSYNC_LOG_LAST:
808   case SSYNC_LOG_CHK:
809   case SSYNC_LOG_TERM:
810     iter->seqno++;
811     break;
812 
813   default:
814     break;
815   }
816 
817   /* return appended byte count */
818   return (iter->grow_pkt->len - start_len);
819 }
820 
821 
822 int ssync_msg_iter_maybe_addr_map(ssync_msg_iter_t *iter, 
823                                   cl_index_t *target, if_id_t if_id)
824 {
825   /* need to assign target? */
826   if (target->byte == 0) {
827 
828     int i;
829     int first_slot = 0;
830 
831     /* search and find first open slot */
832     for (i=1; i<256; i++) {
833       if (iter->addr_map[i] == if_id) {
834         target->byte = i;
835         elog(LOG_DEBUG(0), "resolved if_id %d to target %d", 
836              if_id, target->byte);
837         return 0;
838       }
839       if (first_slot == 0 && iter->addr_map[i] == 0) 
840         first_slot = i;
841     }
842 
843     /* no room? */
844     if (first_slot == 0) {
845       elog(LOG_WARNING, "out of address map slots?");
846       return -1;
847     }
848     
849     target->byte = first_slot;
850   }
851 
852   if (iter->addr_map[target->byte] == if_id) return 0;
853   iter->addr_map[target->byte] = if_id;
854   elog(LOG_DEBUG(0), "mapped if_id %d to target %d", 
855        if_id, target->byte);
856   return ssync_msg_iter_append_msg(iter, RETX2_CTRL_ADDR,
857                                    iter->flow_index, *target, iter->list_index, iter->seqno, 
858                                    0, 0, (char*)(&if_id), sizeof(if_id));
859 }
860 
861 
862 int ssync_msg_iter_maybe_flow_map(ssync_msg_iter_t *iter, 
863                                   uint8_t *flow_index, flow_id_t *flow, int hops)
864 {
865   /* need to assign target? */
866   if (*flow_index == 0) {
867     
868     int i;
869     int first_slot = 0;
870 
871     /* search and find first open slot */
872     for (i=1; i<256; i++) {
873       if (memcmp(&(iter->flow_map[i].id), flow, sizeof(*flow)) == 0) {
874         *flow_index = i;
875         elog(LOG_DEBUG(0), "resolved flow_id %s to target %d", 
876              ssync_flowid_to_str(flow), *flow_index);
877         return 0;
878       }
879       if (first_slot == 0 && iter->flow_map[i].id.src == 0) 
880         first_slot = i;
881     }
882 
883     /* no room? */
884     if (first_slot == 0) {
885       elog(LOG_WARNING, "out of address map slots?");
886       return -1;
887     }
888     
889     *flow_index = first_slot;
890   }
891 
892   /* if it's already set OK, done */
893   if ((memcmp(&(iter->flow_map[*flow_index].id), flow, sizeof(*flow)) == 0) &&
894       iter->flow_map[*flow_index].hops == hops)
895     return 0;
896   
897   /* otherwise set it now */
898   memmove(&(iter->flow_map[*flow_index].id), flow, sizeof(*flow));
899   iter->flow_map[*flow_index].hops = hops;
900   elog(LOG_DEBUG(0), "mapped flow_id %s to target %d", 
901        ssync_flowid_to_str(flow), *flow_index);
902   return ssync_msg_iter_append_msg(iter, RETX2_CTRL_FLOW,
903                                    *flow_index, iter->target, iter->list_index, iter->seqno, 
904                                    0, 0, (char*)(&(iter->flow_map[*flow_index])),
905                                    sizeof(flow_entry_t));
906 }
907 
908 
909 
910 /*
911  *  possible further optimizations
912  *   INIT drop length byte
913  *   nack INIT no seqno, add flow id?
914  */
915 

~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~ [ freetext search ] ~ [ file search ] ~

This page was automatically generated by the LXR engine.
Visit the LXR main site for more information.