1 /*
2 *
3 * Copyright (c) 2004 The Regents of the University of California. All
4 * rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 *
10 * - Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 *
13 * - Neither the name of the University nor the names of its
14 * contributors may be used to endorse or promote products derived
15 * from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS''
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
19 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
20 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
22 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
23 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
24 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
25 * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28 *
29 */
30
31 #include <libmisc/misc.h>
32 #include <link/link.h>
33
34 static char *ssync_versions[16] = {
35 "",
36 "FLOOD_V1",
37 "2",
38 "3",
39 "RETRANS_V2",
40 "5",
41 "6",
42 "7",
43 "8",
44 "9",
45 "10",
46 "11",
47 "12",
48 "13",
49 "14",
50 "15",
51 };
52
53 static char *ssync_opcodes[8] = {
54 "DATA",
55 "1",
56 "REQ",
57 "3",
58 "4",
59 "5",
60 "6",
61 "7"
62 };
63
64
65 char *dump_ssync_log_cmd(uint8_t cmd)
66 {
67 static char buf[32];
68
69 switch (cmd) {
70
71 case SSYNC_LOG_INIT: return "INIT";
72 case SSYNC_LOG_ADD: return "ADD";
73 case SSYNC_LOG_DEL: return "DEL";
74 case SSYNC_LOG_FRAG: return "FRAG";
75 case SSYNC_LOG_CONT: return "CONT";
76 case SSYNC_LOG_LAST: return "LAST";
77 case SSYNC_LOG_CHK: return "CHK";
78 case SSYNC_LOG_TERM: return "TERM";
79
80 case RETX2_CTRL_SEQNO: return "CTRL_SEQNO";
81 case RETX2_CTRL_LIST: return "CTRL_LIST";
82 case RETX2_CTRL_NACK: return "CTRL_NACK";
83 case RETX2_CTRL_ADDR: return "CTRL_ADDR";
84 case RETX2_CTRL_FLOW: return "CTRL_FLOW";
85 case RETX2_CTRL_NOP: return "CTRL_NOP";
86
87 default:
88 sprintf(buf, "??0x%x??", cmd);
89 return buf;
90 }
91 }
92
93
94 char *ssync_flowid_to_str(flow_id_t *fid)
95 {
96 DECLARE_STATIC_BUF_RING(buf, 10, 256);
97 sprintf(buf, "%s/%d->",
98 print_if_id(fid->src), fid->src_if);
99 sprintf(buf+strlen(buf), "%s/%d[%d/%d]",
100 print_if_id(fid->dst), fid->dst_if,
101 fid->flow_index, fid->max_hops);
102 return buf;
103 }
104
105
106 void dump_ssync_flowid(buf_t *buf, flow_id_t *fid)
107 {
108 bufprintf(buf, "%s", ssync_flowid_to_str(fid));
109 }
110
111
112 int dump_ssync(buf_t *buf, link_pkt_t *pkt, char *data_start,
113 int data_len, char *prefix, flow_id_t *fid)
114 {
115 ssync_hdr_t *hdr = (ssync_hdr_t *)data_start;
116 flow_id_t local_fid;
117 int reverse = 0;
118 int eat = 0;
119 char prefix2[32];
120
121 sprintf(prefix2, "%s ", prefix);
122
123 if (data_len < sizeof(ssync_hdr_t)) goto short_pkt;
124
125 bufprintf(buf, "%s%s(%s) %s\n",
126 prefix, ssync_versions[hdr->version],
127 ssync_opcodes[hdr->opcode],
128 hdr->unicast ? ", Unicast" : "");
129 eat += sizeof(ssync_hdr_t);
130
131 /* handle V2 */
132 switch (hdr->version) {
133 case SSYNC_RETRANS_V2: {
134 ssync_msg_iter_t *iter =
135 ssync_msg_iter_new((char *)hdr->data, data_len - sizeof(ssync_hdr_t), 0);
136
137 for (ssync_msg_iter_top(iter); ssync_msg_iter_valid(iter);
138 ssync_msg_iter_next(iter)) {
139 int print_data = 1;
140 int has_type = 0;
141
142 char target_if[32];
143 target_if[0]=0;
144 if (iter->if_addr) sprintf(target_if, "[target_if=%s]",
145 print_if_id(iter->if_addr));
146
147 switch (iter->command) {
148 case SSYNC_LOG_CHK:
149 case SSYNC_LOG_TERM:
150 case RETX2_CTRL_SEQNO:
151 case RETX2_CTRL_LIST:
152 case RETX2_CTRL_ADDR:
153 case RETX2_CTRL_FLOW:
154 print_data = 0;
155 /* fall through */
156
157 case SSYNC_LOG_ADD:
158 case SSYNC_LOG_DEL:
159 case SSYNC_LOG_FRAG:
160 has_type = (iter->length > 0);
161 /* fall through */
162
163 case RETX2_CTRL_NOP:
164 case SSYNC_LOG_INIT:
165 case SSYNC_LOG_CONT:
166 case SSYNC_LOG_LAST:
167 default:
168 bufprintf(buf, "%s%s(%d) target %s%d%s, flow %d, seqno %x:%u",
169 prefix, dump_ssync_log_cmd(iter->command), iter->length,
170 iter->target.ci.head ? "*" : "", iter->target.ci.index,
171 target_if, iter->flow_index, iter->list_index,
172 iter->seqno);
173
174 if (print_data) {
175 if (has_type)
176 bufprintf(buf, ", type_index %d", iter->msg[0]);
177 bufprintf(buf, ", data: \n");
178 if (has_type)
179 misc_hexdump_to_buf(buf, iter->msg+1, iter->length-1, prefix2);
180 else
181 misc_hexdump_to_buf(buf, iter->msg, iter->length, prefix2);
182 }
183
184 else {
185 if (iter->command == RETX2_CTRL_ADDR)
186 bufprintf(buf, ", Map to if_addr %s", print_if_id(iter->if_addr));
187 else if (iter->command == RETX2_CTRL_FLOW)
188 bufprintf(buf, ", Map to flow %s [%d hops]",
189 ssync_flowid_to_str(&(iter->curr_flow.id)),
190 iter->curr_flow.hops);
191 bufprintf(buf, "\n");
192 }
193
194 break;
195
196 case RETX2_CTRL_NACK:
197 bufprintf(buf, "%s%s(%d) target %s%d%s, flow %d",
198 prefix, dump_ssync_log_cmd(iter->command), iter->length,
199 iter->target.ci.head ? "*" : "", iter->target.ci.index,
200 target_if, iter->flow_index);
201 switch (iter->nack_init) {
202 case RETX2_NACK_INIT:
203 bufprintf(buf, " **INIT**\n"); break;
204 case RETX2_NACK_INIT_CHK:
205 bufprintf(buf, " **INIT CHECK**\n"); break;
206 default:
207 bufprintf(buf, ", seqno %x:%u, count=%d\n",
208 iter->nack_list_index,
209 iter->nack_seqno, iter->nack_count);
210 }
211 break;
212
213 }
214 }
215
216 eat = data_len - iter->garbage;
217 if (iter->garbage > 0)
218 bufprintf(buf, "%s***extra junk!\n", prefix);
219 ssync_msg_iter_destroy(iter);
220 break;
221 }
222
223 default:
224 /* create flow id */
225 if (fid)
226 local_fid = *fid;
227 else {
228 local_fid.src_if = pkt->link_index;
229 local_fid.dst_if = pkt->link_index;
230 local_fid.flow_index = 0;
231 local_fid.src = reverse ? pkt->dst.id : pkt->src.id;
232 local_fid.dst = reverse ? pkt->src.id : pkt->dst.id;
233 if (!hdr->unicast) local_fid.dst = LINK_BROADCAST;
234 }
235
236 bufprintf(buf, "%s", prefix);
237 dump_ssync_flowid(buf, &local_fid);
238 break;
239 }
240
241 return eat;
242
243 short_pkt:
244 bufprintf(buf, "%s***short packet!\n", prefix);
245 return 0;
246 }
247
248
249 /*
250 * retrans2 parsing code
251 */
252
253 ssync_msg_iter_t *ssync_msg_iter_new(char *data, int data_len, int MTU)
254 {
255 ssync_msg_iter_t *iter = g_new0(ssync_msg_iter_t, 1);
256 iter->orig = data;
257 iter->orig_len = data_len;
258 iter->MTU = MTU;
259 return iter;
260 }
261
262 void ssync_msg_iter_destroy(ssync_msg_iter_t *iter)
263 {
264 if (iter->grow_pkt)
265 buf_free(iter->grow_pkt);
266 free(iter);
267 }
268
269
270 void ssync_msg_iter_top(ssync_msg_iter_t *iter)
271 {
272 char *orig = iter->orig;
273 int orig_len = iter->orig_len;
274 memset(iter, 0, sizeof(*iter));
275 iter->data = iter->orig = orig;
276 iter->remain = iter->orig_len = orig_len;
277 ssync_msg_iter_next(iter);
278 }
279
280
281 int ssync_msg_iter_valid(ssync_msg_iter_t *iter)
282 {
283 return iter && iter->valid;
284 }
285
286
287 static
288 int ssync_parse_byte(ssync_msg_iter_t *iter, char *byte, char *reason)
289 {
290 if (iter->remain < 1) {
291 elog(LOG_WARNING, "bad format: %s", reason);
292 return -1;
293 }
294 *byte = iter->data[0];
295 iter->data++;
296 iter->remain--;
297 return 0;
298 }
299
300
301 void ssync_msg_iter_next(ssync_msg_iter_t *iter)
302 {
303 /* maybe inc and reset bits */
304 if (iter->inc_next) iter->seqno++;
305 iter->inc_next = 0;
306 iter->valid = 0;
307
308 uint8_t length = 0;
309
310 /* more data to parse? */
311 if (iter->data && iter->remain > 0) {
312
313 ssync_retx2_ctrl_t ctrl;
314
315 /* are we in a nack? */
316 if (iter->nack_cont)
317 goto do_nack;
318
319 /* first byte is the control byte */
320 if (ssync_parse_byte(iter, (char*)&ctrl, "???") < 0) goto fail;
321
322 if (ctrl.code_book) {
323 elog(LOG_WARNING, "codebook not supported");
324 goto fail;
325 }
326
327 /* parse the new flow index/targetd */
328 switch (ctrl.format) {
329
330 case RETX2_FMT_NEW_TARGET:
331 if (ssync_parse_byte(iter, (char *)&(iter->target.byte),
332 "no new target") < 0) goto fail;
333 /* fall through */
334
335 case RETX2_FMT_NEW_FID:
336 if (ssync_parse_byte(iter, (char *)&(iter->flow_index),
337 "no new flow index") < 0) goto fail;
338 break;
339
340 case RETX2_FMT_SAME:
341 case RETX2_FMT_ESC:
342 default:
343 break;
344 }
345
346 /* set the address from the map */
347 iter->if_addr = iter->addr_map[iter->target.byte];
348
349 /* set the flow from the map */
350 memmove(&(iter->curr_flow), &(iter->flow_map[iter->flow_index]),
351 sizeof(flow_entry_t));
352
353 /* pull out command */
354 iter->command = ctrl.command;
355
356 /* set inc_next */
357 switch (iter->command) {
358 case SSYNC_LOG_INIT:
359 case SSYNC_LOG_ADD:
360 case SSYNC_LOG_DEL:
361 case SSYNC_LOG_FRAG:
362 case SSYNC_LOG_CONT:
363 case SSYNC_LOG_LAST:
364 case SSYNC_LOG_CHK:
365 case SSYNC_LOG_TERM:
366 iter->inc_next = 1;
367 break;
368
369 default:
370 break;
371 }
372
373 /* if length is set, parse it */
374 if (ctrl.length_set) {
375
376 switch (iter->command) {
377 case SSYNC_LOG_CHK:
378 case SSYNC_LOG_TERM:
379 case RETX2_CTRL_NACK:
380 case RETX2_CTRL_SEQNO:
381 case RETX2_CTRL_LIST:
382 case RETX2_CTRL_ADDR:
383 case RETX2_CTRL_FLOW:
384 elog(LOG_WARNING, "bad format: command %d should not have length",
385 iter->command);
386 goto fail;
387
388 default:
389 break;
390 }
391
392 if (ssync_parse_byte(iter, (char *)&(iter->last_set_length),
393 "no length byte") < 0) goto fail;
394 }
395
396 /* map the default lengths */
397 iter->length = 0;
398 switch (iter->command) {
399 case SSYNC_LOG_INIT:
400 case SSYNC_LOG_ADD:
401 case SSYNC_LOG_DEL:
402 case SSYNC_LOG_FRAG:
403 case SSYNC_LOG_CONT:
404 case SSYNC_LOG_LAST:
405 case RETX2_CTRL_NOP:
406 iter->length = iter->last_set_length;
407 length = iter->length;
408 break;
409
410 case RETX2_CTRL_SEQNO:
411 length = sizeof(log_seqno_t);
412 break;
413
414 case RETX2_CTRL_LIST:
415 length = sizeof(ssync_seqno_entry_t);
416 break;
417
418 case RETX2_CTRL_ADDR:
419 length = sizeof(if_id_t);
420 break;
421
422 case RETX2_CTRL_FLOW:
423 length = sizeof(flow_entry_t);
424 break;
425
426 default:
427 break;
428 }
429
430 /* test for overrun */
431 if (iter->remain < length) {
432 elog(LOG_WARNING, "bad format: short message; ctrl=%x, length=%d, remain=%d",
433 *(char*)&ctrl, iter->length, iter->remain);
434 goto fail;
435 }
436
437 /* report command, data, length */
438 iter->msg = NULL;
439 switch (iter->command) {
440 default:
441 /* OK, report this one.. */
442 iter->msg = iter->data;
443 break;
444
445 case RETX2_CTRL_SEQNO:
446 memmove(&(iter->seqno), iter->data, sizeof(iter->seqno));
447 break;
448
449 case RETX2_CTRL_LIST: {
450 ssync_seqno_entry_t entry;
451 memmove(&entry, iter->data, sizeof(entry));
452 iter->list_index = entry.list_index;
453 iter->seqno = entry.seqno;
454 break;
455 }
456
457 case RETX2_CTRL_ADDR:
458 memmove(&(iter->addr_map[iter->target.byte]), iter->data, sizeof(if_id_t));
459 iter->if_addr = iter->addr_map[iter->target.byte];
460 break;
461
462 case RETX2_CTRL_FLOW:
463 memmove(&(iter->flow_map[iter->flow_index]), iter->data, sizeof(flow_entry_t));
464 iter->curr_flow = iter->flow_map[iter->flow_index];
465 break;
466
467 case RETX2_CTRL_NACK:
468 goto do_nack;
469 }
470
471 /* consume data from buffer and report */
472 iter->remain -= length;
473 iter->data += length;
474 iter->valid = 1;
475 return;
476
477 do_nack:
478 /* parse the nack command byte */
479 if (ssync_parse_byte(iter, (char*)&(iter->nack_cmd),
480 "no nack control byte") < 0) goto fail;
481
482 /* is this nack continued? */
483 iter->nack_cont = (!iter->nack_cmd.last_nack);
484
485 /* clear the init mode */
486 iter->nack_init = 0;
487
488 /* parse list index? */
489 if (iter->nack_cmd.mode == RETX2_NACK_LIST_SET) {
490 if (ssync_parse_byte(iter, (char*)&(iter->nack_list_index),
491 "no nack list index") < 0) goto fail;
492 }
493
494 /* record init bits */
495 else {
496 iter->nack_init = iter->nack_cmd.mode;
497 elog(LOG_DEBUG(0), "got init nack, type %d,%d", iter->nack_init,
498 iter->nack_cmd.mode);
499 }
500
501 /* parse length */
502 if (iter->nack_cmd.seq_count == 0) {
503 if (iter->remain < sizeof(int16_t)) {
504 elog(LOG_WARNING, "bad format: no nack count");
505 goto fail;
506 }
507 memmove(&(iter->nack_count), iter->data, sizeof(int16_t));
508 iter->data += sizeof(int16_t);
509 iter->remain -= sizeof(int16_t);
510 }
511 else
512 iter->nack_count = iter->nack_cmd.seq_count;
513
514 /* parse seqno */
515 if (iter->remain < sizeof(log_seqno_t)) {
516 elog(LOG_WARNING, "bad format: no nack seqno");
517 goto fail;
518 }
519 memmove(&(iter->nack_seqno), iter->data, sizeof(log_seqno_t));
520 iter->data += sizeof(log_seqno_t);
521 iter->remain -= sizeof(log_seqno_t);
522
523 /* done.. */
524 iter->valid = 1;
525 return;
526 }
527
528 fail:
529 iter->garbage = iter->remain;
530 iter->remain = 0;
531 iter->data = NULL;
532 return;
533 }
534
535
536 static
537 void nack_term(ssync_msg_iter_t *iter)
538 {
539 if (iter->nack_cont && iter->last_nack_ctrl) {
540 ((ssync_retx2_nack_ctrl_t *)(iter->last_nack_ctrl))->last_nack = 1;
541 }
542 iter->nack_cont = 0;
543 iter->last_nack_ctrl = NULL;
544 }
545
546
547 buf_t *ssync_msg_iter_finalize_msg(ssync_msg_iter_t *iter)
548 {
549 buf_t *retval = iter->grow_pkt;
550 nack_term(iter);
551 memset(iter, 0, sizeof(*iter));
552 return retval;
553 }
554
555
556 void ssync_msg_iter_init_msg(ssync_msg_iter_t *iter, int opcode, buf_t *header)
557 {
558 if (iter->grow_pkt) buf_free(iter->grow_pkt);
559 if (header)
560 iter->grow_pkt = header;
561 else
562 iter->grow_pkt = buf_new();
563
564 /* this amount is deducted before text vs. MTU */
565 iter->header_orig_len = iter->grow_pkt->len;
566
567 ssync_hdr_t hdr = {
568 version: SSYNC_RETRANS_V2,
569 opcode: opcode
570 };
571 bufcpy(iter->grow_pkt, &hdr, sizeof(hdr));
572 }
573
574
575 int ssync_msg_iter_get_len(ssync_msg_iter_t *iter)
576 {
577 if (iter && iter->grow_pkt)
578 return iter->grow_pkt->len - iter->header_orig_len;
579 return 0;
580 }
581
582
583 /* length < 0 ==> not set or use last */
584 int ssync_msg_iter_append_msg(ssync_msg_iter_t *iter,
585 uint8_t command, uint8_t flow_index, cl_index_t target,
586 uint8_t list_index, log_seqno_t seqno,
587 int nack_init, uint16_t nack_count,
588 char *msg, uint8_t length)
589 {
590 int space = 0;
591 ssync_retx2_nack_ctrl_t nack_ctrl = {};
592 ssync_retx2_ctrl_t ctrl = {
593 command: command
594 };
595
596 /* create packet */
597 if (iter->grow_pkt == NULL) {
598 elog(LOG_WARNING, "forgot to init packet??");
599 return -1;
600 }
601
602 int start_len = iter->grow_pkt->len;
603
604 /* compute length */
605 switch (command) {
606 case SSYNC_LOG_INIT:
607 case SSYNC_LOG_ADD:
608 case SSYNC_LOG_DEL:
609 case SSYNC_LOG_FRAG:
610 case SSYNC_LOG_CONT:
611 case SSYNC_LOG_LAST:
612 case RETX2_CTRL_NOP:
613 /* set length if different than last set */
614 if (iter->last_set_length != length) {
615 ctrl.length_set = 1;
616 iter->last_set_length = length;
617 }
618 break;
619
620 case RETX2_CTRL_SEQNO:
621 case RETX2_CTRL_LIST:
622 /* no need to push seqnos.. */
623 elog(LOG_WARNING, "not necessary to push seqnos");
624 return 0;
625
626 case RETX2_CTRL_ADDR:
627 case RETX2_CTRL_FLOW:
628 /* leave length as is */
629 break;
630
631 case RETX2_CTRL_NACK:
632 case SSYNC_LOG_CHK:
633 case SSYNC_LOG_TERM:
634 default:
635 length = 0;
636 break;
637 }
638
639 iter->length = length;
640 space = 1;
641
642 /* drop out of NACK mode if we need to */
643 if (command != RETX2_CTRL_NACK) nack_term(iter);
644
645 /* set format parameter */
646 if (target.byte != iter->target.byte) {
647 nack_term(iter);
648 ctrl.format = RETX2_FMT_NEW_TARGET;
649 iter->target = target;
650 iter->flow_index = flow_index;
651 space += 2;
652 }
653
654 else if (iter->flow_index != flow_index) {
655 nack_term(iter);
656 ctrl.format = RETX2_FMT_NEW_FID;
657 iter->flow_index = flow_index;
658 space++;
659 }
660
661 /* external length byte */
662 if (ctrl.length_set) space++;
663
664 /* figure out nack continuation */
665 if (command == RETX2_CTRL_NACK) {
666 /* if continued, no second control byte */
667 if (iter->nack_cont) space = 0;
668 }
669
670 /* if NACK, add nack control byte */
671 if (command == RETX2_CTRL_NACK) {
672 space += 3;
673
674 if (nack_count <= 0x1F) nack_ctrl.seq_count = nack_count;
675 else space += 2;
676
677 if (nack_init) {
678 nack_ctrl.mode = nack_init;
679 nack_ctrl.seq_count = 1;
680 }
681
682 else if (list_index != iter->nack_list_index) {
683 iter->nack_list_index = list_index;
684 nack_ctrl.mode = RETX2_NACK_LIST_SET;
685 space++;
686 }
687
688 }
689
690 else {
691 space += iter->length;
692 if (seqno != iter->seqno) space += 3;
693 if (list_index != iter->list_index) space++;
694 }
695
696 /* no space? */
697 if ((space + iter->grow_pkt->len) > (iter->MTU + iter->header_orig_len))
698 return -1;
699
700 /*
701 * ok, push the new data
702 */
703
704 /* if not a continued NACK */
705 if (!iter->nack_cont) {
706
707 /* seqno/listindex for NACKs are handled separately */
708 if (command != RETX2_CTRL_NACK) {
709 /* push new list index & seqno? */
710 if (list_index != iter->list_index) {
711 /* list control byte */
712 ssync_retx2_ctrl_t ctrl = {
713 command: RETX2_CTRL_LIST
714 };
715 bufcpy(iter->grow_pkt, &ctrl, sizeof(ctrl));
716 bufcpy(iter->grow_pkt, &list_index, sizeof(list_index));
717 bufcpy(iter->grow_pkt, &seqno, sizeof(seqno));
718 iter->seqno = seqno;
719 iter->list_index = list_index;
720 }
721
722 /* push new seqno? */
723 if (seqno != iter->seqno) {
724 /* seqno control byte */
725 ssync_retx2_ctrl_t ctrl = {
726 command: RETX2_CTRL_SEQNO
727 };
728 bufcpy(iter->grow_pkt, &ctrl, sizeof(ctrl));
729 bufcpy(iter->grow_pkt, &seqno, sizeof(seqno));
730 iter->seqno = seqno;
731 }
732 }
733
734 /* control byte */
735 elog(LOG_DEBUG(10), "control byte: cmd %d len %d format %d",
736 ctrl.command, ctrl.length_set, ctrl.format);
737 bufcpy(iter->grow_pkt, &ctrl, sizeof(ctrl));
738
739 /* target, flow index */
740 switch (ctrl.format) {
741 case RETX2_FMT_NEW_TARGET:
742 bufcpy(iter->grow_pkt, &(iter->target), sizeof(iter->target));
743 /* fall through */
744
745 case RETX2_FMT_NEW_FID:
746 bufcpy(iter->grow_pkt, &(iter->flow_index), sizeof(iter->flow_index));
747 break;
748
749 default:
750 break;
751 }
752
753 if (ctrl.length_set) {
754 bufcpy(iter->grow_pkt, &(iter->length), sizeof(iter->length));
755 }
756 }
757
758 switch (command) {
759 case RETX2_CTRL_NACK:
760
761 elog(LOG_DEBUG(10), "NACK control byte: count %d last %d mode %d",
762 nack_ctrl.seq_count, nack_ctrl.last_nack, nack_ctrl.mode);
763
764 bufcpy(iter->grow_pkt, &nack_ctrl, sizeof(nack_ctrl));
765 space = 1;
766
767 /* push the list index */
768 if (nack_ctrl.mode == RETX2_NACK_LIST_SET) {
769 space += sizeof(iter->nack_list_index);
770 bufcpy(iter->grow_pkt, &(iter->nack_list_index),
771 sizeof(iter->nack_list_index));
772 }
773
774 /* push the length */
775 if (nack_ctrl.seq_count == 0) {
776 space += sizeof(nack_count);
777 bufcpy(iter->grow_pkt, &nack_count,
778 sizeof(nack_count));
779 }
780
781 /* push the sequence number */
782 space += sizeof(seqno);
783 bufcpy(iter->grow_pkt, &(seqno), sizeof(seqno));
784
785 /* if nack, set the continuation bit and ctrl pointer */
786 iter->nack_cont = 1;
787 iter->last_nack_ctrl = iter->grow_pkt->buf + iter->grow_pkt->len - space;
788
789 elog(LOG_DEBUG(10), "NACK control byte: %x, last %x",
790 *(uint8_t*)&nack_ctrl, *iter->last_nack_ctrl);
791
792 break;
793
794 default:
795 if (iter->length > 0)
796 bufcpy(iter->grow_pkt, msg, iter->length);
797 break;
798 }
799
800 /* increment the seqno */
801 switch (command) {
802 case SSYNC_LOG_INIT:
803 case SSYNC_LOG_ADD:
804 case SSYNC_LOG_DEL:
805 case SSYNC_LOG_FRAG:
806 case SSYNC_LOG_CONT:
807 case SSYNC_LOG_LAST:
808 case SSYNC_LOG_CHK:
809 case SSYNC_LOG_TERM:
810 iter->seqno++;
811 break;
812
813 default:
814 break;
815 }
816
817 /* return appended byte count */
818 return (iter->grow_pkt->len - start_len);
819 }
820
821
822 int ssync_msg_iter_maybe_addr_map(ssync_msg_iter_t *iter,
823 cl_index_t *target, if_id_t if_id)
824 {
825 /* need to assign target? */
826 if (target->byte == 0) {
827
828 int i;
829 int first_slot = 0;
830
831 /* search and find first open slot */
832 for (i=1; i<256; i++) {
833 if (iter->addr_map[i] == if_id) {
834 target->byte = i;
835 elog(LOG_DEBUG(0), "resolved if_id %d to target %d",
836 if_id, target->byte);
837 return 0;
838 }
839 if (first_slot == 0 && iter->addr_map[i] == 0)
840 first_slot = i;
841 }
842
843 /* no room? */
844 if (first_slot == 0) {
845 elog(LOG_WARNING, "out of address map slots?");
846 return -1;
847 }
848
849 target->byte = first_slot;
850 }
851
852 if (iter->addr_map[target->byte] == if_id) return 0;
853 iter->addr_map[target->byte] = if_id;
854 elog(LOG_DEBUG(0), "mapped if_id %d to target %d",
855 if_id, target->byte);
856 return ssync_msg_iter_append_msg(iter, RETX2_CTRL_ADDR,
857 iter->flow_index, *target, iter->list_index, iter->seqno,
858 0, 0, (char*)(&if_id), sizeof(if_id));
859 }
860
861
862 int ssync_msg_iter_maybe_flow_map(ssync_msg_iter_t *iter,
863 uint8_t *flow_index, flow_id_t *flow, int hops)
864 {
865 /* need to assign target? */
866 if (*flow_index == 0) {
867
868 int i;
869 int first_slot = 0;
870
871 /* search and find first open slot */
872 for (i=1; i<256; i++) {
873 if (memcmp(&(iter->flow_map[i].id), flow, sizeof(*flow)) == 0) {
874 *flow_index = i;
875 elog(LOG_DEBUG(0), "resolved flow_id %s to target %d",
876 ssync_flowid_to_str(flow), *flow_index);
877 return 0;
878 }
879 if (first_slot == 0 && iter->flow_map[i].id.src == 0)
880 first_slot = i;
881 }
882
883 /* no room? */
884 if (first_slot == 0) {
885 elog(LOG_WARNING, "out of address map slots?");
886 return -1;
887 }
888
889 *flow_index = first_slot;
890 }
891
892 /* if it's already set OK, done */
893 if ((memcmp(&(iter->flow_map[*flow_index].id), flow, sizeof(*flow)) == 0) &&
894 iter->flow_map[*flow_index].hops == hops)
895 return 0;
896
897 /* otherwise set it now */
898 memmove(&(iter->flow_map[*flow_index].id), flow, sizeof(*flow));
899 iter->flow_map[*flow_index].hops = hops;
900 elog(LOG_DEBUG(0), "mapped flow_id %s to target %d",
901 ssync_flowid_to_str(flow), *flow_index);
902 return ssync_msg_iter_append_msg(iter, RETX2_CTRL_FLOW,
903 *flow_index, iter->target, iter->list_index, iter->seqno,
904 0, 0, (char*)(&(iter->flow_map[*flow_index])),
905 sizeof(flow_entry_t));
906 }
907
908
909
910 /*
911 * possible further optimizations
912 * INIT drop length byte
913 * nack INIT no seqno, add flow id?
914 */
915
This page was automatically generated by the
LXR engine.
Visit the LXR main site for more
information.