Skip to content →

Tag: UDP

Code

The code looks as follows.


#include 
#include 
#include 
#include 
#include
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

#include "event.h"
#include "rudp.h"
#include "rudp_api.h"

//#define DROP 3

rudp_socket_t rs_head = NULL;

peer_t find_peer(peer_t *firstp, structsockaddr_in* sa) {
    peer_t p;
    //int pc = 0;
    for (p = *firstp; p != NULL; p = p->next) {
        //printf("peer count: %d\n", ++pc);
        if (memcmp(p->sa, sa, sizeof (struct sockaddr_in)) == 0)
            return p;
    }
    //printf("peer not found, creating new peer\n");
    p = malloc(sizeof (struct peer));
    if (p == NULL) {
        perror("find_peer: malloc");
        return NULL;
    }
    struct sockaddr_in*sin;
    sin = malloc(sizeof (struct sockaddr_in));
    if (sin == NULL) {
        perror("peerfind: malloc");
        return NULL;
    }
    memcpy(sin, sa, sizeof (struct sockaddr_in));
    p->sa = sin;
    p->packets = NULL;
    p->status = PEER_STATUS_INIT;
    p->sb = 0;
    p->sm = RUDP_WINDOW - 1;
    p->sn = 0;
    p->next = (*firstp);
    *firstp = p;
    return p;
}

int remove_peer(peer_t *firstp,peer_t rp) {
    peer_t p, *pp;
    pp = firstp;
    for (p = *firstp; p != NULL; p = p->next) {
        if (p == rp) {
            *pp = rp->next;
            free(rp->sa);
            free(rp);
            return 0;
        }
        pp = &p->next;
    }
    return -1;
}

/*
 * Add packet into a linked list.
 * Inserted in a way that packets with smaller seqno are always placed in front
 */
int add_packet(packet_t*packet_head, packet_t *pkt) {
    if (*packet_head == NULL) {
        (*pkt)->next = *packet_head;
        *packet_head = *pkt;
    } else {
        packet_t *p = malloc(sizeof (packet_t));
        for ((*p) = *packet_head; (*p) != NULL; (*p) = (*p)->next) {
            if ((*pkt)->header->seqno == (*p)->header->seqno + 1) {
                (*pkt)->next = (*p)->next;
                (*p)->next = *pkt;
                break;
}
        }
        //return -1;
    }
    //printf("packet added: %d type %d\n", (*pkt)->header->seqno, (*pkt)->header->type);
    return 0;
}

/*
 * Remove a packet from a linked list
 */
int remove_packet(packet_t *packet_head, packet_t pkt) {
    packet_t p, *pp;
    pp = packet_head;
    for (p = *packet_head; p != NULL; p = p->next) {
        if (pkt == p) {
            //printf("packet seq no %d removed\n",pkt->header->seqno);
            *pp = p->next;
            free(p);
            return 0;
        }
        pp = &p->next;
    }
    return -1;
}

/*
 * rudp_socket: Create a RUDP socket.
 * May use a random port by setting port to zero.
 */

rudp_socket_t rudp_socket(int port32) {
    int sockfd;
    sockfd = socket(PF_INET, SOCK_DGRAM, 0);
    if (sockfd < 0) {
        perror("rudp_socket: socket");
return NULL;
    }
    struct sockaddr_in *sin;
    sin = malloc(sizeof (struct sockaddr_in));
    if (sin == NULL) {
        perror("rudp_socket: malloc");
        return NULL;
    }
    sin->sin_addr.s_addr = htonl(INADDR_ANY);
    //sin->sin_addr.s_addr = inet_addr("127.0.0.1");
    sin->sin_family = AF_INET;
    uint16_t port = port32;
    if (port == 0) {
        srand(time(0));
        port = (rand() % (65535 - 1024)) + 1024;
sin->sin_port = htons(port);
        while (bind(sockfd, (struct sockaddr*) sin, sizeof (struct sockaddr_in)) < 0) {
            port = (rand() % (65535 - 1024)) + 1024;
            sin->sin_port = htons(port);
        }
    } else {
        sin->sin_port = htons(port);
        if (bind(sockfd, (struct sockaddr*) sin, sizeof (struct sockaddr_in)) < 0) {
            perror("rudp_socket: bind");
            return NULL;
        }
}
    //printf("rudp_socket: bind successfully on port: %d\n", port);
    rudp_socket_t rs;
    rs = malloc(sizeof (struct rudp_socket));
    if (rs == NULL) {
        perror("rudp_socket: malloc");
        return NULL;
    }
    rs->sockfd = sockfd;
    rs->peers = NULL;
    rs->next = rs_head;
    rs_head = rs;
    /*
        int rc = 0;
        rudp_socket_t prs;
        for(prs = rs_head; prs != NULL; prs =prs->next)
            printf("rudp socket count: %d\n", ++rc);
     */
    event_fd(rs->sockfd, rudp_listen, rs, "rudp_listen");
    return rs;
}

/*
 * Listens on the socket
 */
int rudp_listen(int fd, void *arg) {
    rudp_socket_t rs;
    for (rs = rs_head; rs != NULL; rs = rs->next) {
        if (rs->sockfd == fd)
            break;
    }
    if (rs == NULL) {
        return -1;
    }
    int (*handler)(rudp_socket_t,struct sockaddr_in *, char *, int);

    /*
        client_addr.sin_family = AF_INET;
        client_addr.sin_addr.s_addr = htonl(INADDR_ANY);
        client_addr.sin_port = htons(0);
     */
    unsigned int clientlen = sizeof(struct sockaddr);
    struct sockaddr client_sockaddr;
    int bufsize = RUDP_MAXPKTSIZE + sizeof (struct rudp_hdr);
    int msglen = 0;
    char message[bufsize];
    memset(message, 0, bufsize);
    msglen =recvfrom(rs->sockfd, message, bufsize, 0, & client_sockaddr, &clientlen);
    if (msglen < 0) {
        perror("rudp_listen: recvfrom");
        return -1;
    }
    struct sockaddr_in client_addr;
    memcpy(&client_addr, &client_sockaddr, clientlen);
    if (ntohs(client_addr.sin_port) == 0) {
        perror("rudp_listen: recvfrom port 0");
        return -1;
    }
    //printf("received from port: %d\n",ntohs(client_addr.sin_port));
    peer_t p = find_peer(&(rs->peers), &client_addr);
    if (p == NULL) {
        return -1;
    }
    //printf("peer: %d\n", ntohs(p->sa->sin_port));
    if (msglen < 0) {
        perror("rudp_recvfrom: recvfrom");
        return -1;
    } else if (msglen < (sizeof (struct rudp_hdr))) {
        fprintf(stderr, "rudp_recvfrom: error receiving packet");
        return -1;
    }
    structrudp_hdr *msgheader = malloc(sizeof (struct rudp_hdr));
    if (msgheader == NULL) {
        perror("rudp_recvfrom: malloc");
        return -1;
    }
    memcpy(msgheader, message, sizeof (struct rudp_hdr));

    //printf("status=%d sn=%d\n", p->status, p->sn);
    //printf("message from port %d, type=%d seqno=%d\n", ntohs(p->sa->sin_port), msgheader->type, msgheader->seqno);
    struct rudp_hdr *replyheader = malloc(sizeof (structrudp_hdr));
    if (replyheader == NULL) {
        perror("rudp_recvfrom: malloc");
        return -1;
    }
    replyheader->version = RUDP_VERSION;
    switch (p->status) {
        case PEER_STATUS_INIT:
            if (msgheader->type == RUDP_SYN) {
                p->status = PEER_STATUS_ACTIVE;
                p->sn = msgheader->seqno + 1;
                replyheader->seqno = msgheader->seqno + 1;
replyheader->type = RUDP_ACK;
                sendto(fd, replyheader, sizeof (struct rudp_hdr), 0,
                        (struct sockaddr*) & client_addr, clientlen);
                //printf("ACK sent. Turns active\n");
            }
            break;
        case PEER_STATUS_PASSIVE:
            if (msgheader->type == RUDP_ACK && msgheader->seqno == 1) {
                p->status = PEER_STATUS_ACTIVE;
                packet_t pkt;
for (pkt = p->packets; pkt != NULL; pkt = pkt->next) {
                    if (pkt->header->seqno == msgheader->seqno - 1) {
                        pkt->status = ACKED;
                        //printf("syn packet no. %d acked\n", pkt->header->seqno);
                        break;
                    }
                }
                //p->sn = 1;
                p->sb = 1;
                p->sm = RUDP_WINDOW;
    }
            break;
        case PEER_STATUS_ACTIVE:
            switch (msgheader->type) {
                case RUDP_ACK:
                    //printf("sb: %d sm: %d\n", p->sb, p->sm);
                    if (msgheader->seqno >= p->sb) {
                        packet_t pkt;
                        for (pkt = p->packets; pkt != NULL; pkt = pkt->next) {
                            if (pkt->header->seqno == msgheader->seqno -    1) {
                                pkt->status = ACKED;
                                //printf("data packet no. %d acked\n", pkt->header->seqno);
                                break;
                            }
                        }
                        p->sm += msgheader->seqno - p->sb;
                        p->sb = msgheader->seqno;
                    }
                    break;
    case RUDP_DATA:
#ifdef DROP
                    srand(time(0));
                    if (rand() % DROP == 0) {
                        printf("packet dropped\n");
                        break;
                    }
#endif
                    if (msgheader->seqno == p->sn) {
                        p->sn = msgheader->seqno + 1;
                        replyheader->seqno = msgheader->seqno + 1;
    replyheader->type = RUDP_ACK;
                        handler = malloc(sizeof (int (*)()));
                        if (handler == NULL) {
                            perror("rudp_listen: malloc");
                            return -1;
                        }
                        handler = (int (*)(rudp_socket_t, struct sockaddr_in*, char *, int))arg;
                        void *data;
                        data = malloc(msglen -    sizeof (struct rudp_hdr));
                        if (data == NULL) {
                            perror("rudp_listen: malloc");
                            return -1;
                        }
                        memcpy(data, message + sizeof (struct rudp_hdr),
                                msglen - sizeof (struct rudp_hdr));
                        handler(rs, &client_addr, data, msglen - sizeof (struct    rudp_hdr));
                        sendto(fd, replyheader, sizeof (struct rudp_hdr), 0,
                                (struct sockaddr*) & client_addr, clientlen);
                    }
                    break;
                case RUDP_SYN:
                    p->status = PEER_STATUS_ACTIVE;
                    p->sn = msgheader->seqno + 1;
                    replyheader->seqno = msgheader->seqno + 1;
    replyheader->type = RUDP_ACK;
                    sendto(fd, replyheader, sizeof (struct rudp_hdr), 0,
                            (struct sockaddr*) & client_addr, clientlen);
                    break;
                case RUDP_FIN:
                    p->status = PEER_STATUS_CLOSED;
                    p->sn = msgheader->seqno + 1;
                    replyheader->seqno = msgheader->seqno + 1;
    replyheader->type = RUDP_ACK;
                    sendto(fd, replyheader, sizeof (struct rudp_hdr), 0,
                            (struct sockaddr*) & client_addr, clientlen);
                    break;
                default:
                    fprintf(stderr, "rudp_listen: invalid RUDP type %d\n",
                            msgheader->type);
                    break;
            }
            break;
        case    PEER_STATUS_CLOSING:
            if (msgheader->type == RUDP_ACK && msgheader->seqno == p->sn + 1) {
                p->status = PEER_STATUS_CLOSED;
                //printf("peer closed\n");
            }
            break;
        case PEER_STATUS_CLOSED:
            if (msgheader->type == RUDP_FIN) {
                p->status = PEER_STATUS_CLOSED;
                p->sn = msgheader->seqno + 1;
    replyheader->seqno = msgheader->seqno + 1;
                replyheader->type = RUDP_ACK;
                sendto(fd, replyheader, sizeof (struct rudp_hdr), 0,
                        (struct sockaddr*) & client_addr, clientlen);
            }
            /*
                        if (msgheader->type == RUDP_SYN) {
                            p->status = PEER_STATUS_ACTIVE;
    p->sn = msgheader->seqno + 1;
                            replyheader->seqno = msgheader->seqno + 1;
                            replyheader->type = RUDP_ACK;
                            sendto(fd, replyheader, sizeof (struct rudp_hdr), 0,
                                    (struct sockaddr*) & client_addr, clientlen);
                        } else if (msgheader->type == RUDP_FIN) {
                            p->status =    PEER_STATUS_CLOSED;
                            p->sn = msgheader->seqno + 1;
                            replyheader->seqno = msgheader->seqno + 1;
                            replyheader->type = RUDP_ACK;
                            sendto(fd, replyheader, sizeof (struct rudp_hdr), 0,
                                    (struct sockaddr*) & client_addr, clientlen);
                        }
             */
            break;
    default:
            fprintf(stderr, "rudp_listen: invalid peer status %d\n",
                    p->status);
            break;
    }
    return 0;
}

/*
 * Keep checking if there are packets to be sent out
 */
int rudp_send(int fd, void *arg) {
    event_timeout_delete(rudp_send, arg);
    struct timeval now, interval, t;
    gettimeofday(&now, NULL);
    interval.tv_sec = 0;
    interval.tv_usec = RUDP_CHECK_INTERVAL;
    timeradd(&now, &interval, &t);
    rudp_socket_t rs;
    peer_t p;
    packet_t pkt;
    void *msg;
    int len = sizeof (struct rudp_hdr);
    //int rsc = 0;
    if (rs_head == NULL) {
        return 0;
    }
    for (rs = rs_head; rs != NULL; rs = rs->next) {
        //printf("rudp socket count: %d\n", ++rsc);
        for (p = rs->peers; p != NULL; p = p->next) {
            if (p->status == PEER_STATUS_CLOSED) {
    if (remove_peer(&(rs->peers), p) < 0) {
                    perror("rudp_send: remove_peer");
                    return -1;
                } else {
                    event_timeout(t, rudp_send, NULL, "rudp_send");
                }
                return 0;
            } else if (p->status != PEER_STATUS_ACTIVE) {
                event_timeout(t, rudp_send, NULL, "rudp_send");
                return 0;
            }
        //int pc = 0;
            for (pkt = p->packets; pkt != NULL; pkt = pkt->next) {
                //printf("pakcet %d: seq number %d status %d\n", ++pc, pkt->header->seqno, pkt->status);
                if (pkt->status == ACKED) {
                    if (remove_packet(&(p->packets), pkt) == -1) {
                        return -1;
                    }
                    event_timeout(t, rudp_send, NULL, "rudp_send");
        return 0;
                }
                if ((pkt->status == IN_QUEUE || pkt->status == TIMEOUT) &&
                        pkt->header->seqno >= p->sb &&
                        pkt->header->seqno <= p->sm) {
                    if (pkt->header->type == RUDP_FIN) {
                        p->status = PEER_STATUS_CLOSING;
                    }
                    len = sizeof (struct        rudp_hdr) + pkt->datalen;
                    msg = malloc(len);
                    if (msg == NULL) {
                        perror("rudp_send: malloc");
                        return -1;
                    }
                    memcpy(msg, pkt->header, sizeof (struct rudp_hdr));
                    memcpy(msg + sizeof (struct rudp_hdr), pkt->data, pkt->datalen);
                    sendto(rs->sockfd, msg,        len, 0,
                            (struct sockaddr*) p->sa, sizeof (struct sockaddr));
                    pkt->status = SENT;
                    pkt->trans_count++;
                    struct timeval now, interval, t;
                    gettimeofday(&now, NULL);
                    if (RUDP_TIMEOUT >= 1000) {
                        interval.tv_sec = RUDP_TIMEOUT / 1000;
                        interval.tv_usec = 0;
        } else {
                        interval.tv_sec = 0;
                        interval.tv_usec = RUDP_TIMEOUT * 1000;
                    }
                    timeradd(&now, &interval, &t);
                    event_timeout(t, rudp_timeout, pkt, "rudp_timeout");
                }

            }
        }
    }
    event_timeout(t, rudp_send, NULL, "rudp_send");
    return 0;
}

int rudp_timeout(int a, void *arg) {
        packet_t pkt = (packet_t) arg;
    event_timeout_delete(rudp_timeout, pkt);
    if (pkt->status != ACKED) {
        //printf("Timeout: %d\n", pkt->header->seqno);
        pkt->status = TIMEOUT;
        if (pkt->trans_count >= RUDP_MAXRETRANS)
            pkt->status = FAILED;
    }
    return 0;
}

/*
 *rudp_close: Close socket
 */

int rudp_close(rudp_socket_t rsocket) {

    struct timeval now, interval,        t;
    gettimeofday(&now, NULL);
    interval.tv_sec = 0;
    interval.tv_usec = RUDP_CHECK_INTERVAL;
    timeradd(&now, &interval, &t);
    event_timeout(t, rudp_close_session, rsocket, "rudp_close_socket");
    return 0;
    /*
        peer_t p;
        packet_t pkt;
        struct rudp_hdr *hdr;
        hdr = malloc(sizeof (struct rudp_hdr));
        if (hdr == NULL) {
            perror("rudp_sendto: malloc");
        return -1;
        }
        pkt = malloc(sizeof (struct packet));
        if (pkt == NULL) {
            perror("rudp_sendto: malloc");
            return -1;
        }
        for (p = rsocket->peers; p != NULL; p = p->next) {
            hdr->version = RUDP_VERSION;
            hdr->type = RUDP_FIN;
            hdr->seqno = p->sn++;
            pkt->data = NULL;
            pkt->datalen = 0;
        pkt->header = hdr;
            pkt->next = NULL;
            pkt->trans_count = 0;
            pkt->status = IN_QUEUE;
            if (add_packet(&p->packets, &pkt) < 0) {
                perror("rudp_sendto: add_packet");
                return -1;
            }
        }
        return 0;
     */
}

int rudp_close_session(int fd, void *arg) {
    event_timeout_delete(rudp_close_session, arg);
    rudp_socket_t rs        = (rudp_socket_t) arg;
    peer_t p;
    //printf("closing rudp socket %d\n", rs->sockfd);
    //printf("peer: %p\n", rs->peers);
    if (rs->peers == NULL) {
        //printf("peer null\n");
        event_fd_delete(rudp_listen, rs);
        close(rs->sockfd);
        rudp_socket_t r, *rr;
        rr = &rs_head;
        for (r = rs_head; r != NULL; r = r->next) {
            if (r == rs) {
                *rr = r->next;
        free(r);
                return 0;
            }
            rr = &r->next;
        }
        return -1;
    }
    //printf("peers not null\n");
    for (p = rs->peers; p != NULL; p = p->next) {
        //check if there are still packets to be sent
        if (p->packets != NULL) {
            struct timeval now, interval, t;
            gettimeofday(&now, NULL);
            interval.tv_sec = 0;
        interval.tv_usec = RUDP_CHECK_INTERVAL;
            timeradd(&now, &interval, &t);
            event_timeout(t, rudp_close_session, arg, "rudp_close_socket");
            return 0;
        }
        //printf("no packets in this peer, status %d\n", p->status);
        if (p->status != PEER_STATUS_CLOSED) {
            p->status = PEER_STATUS_CLOSING;
            struct rudp_hdr header;
            header.seqno = p->sn;
        header.type = RUDP_FIN;
            header.version = RUDP_VERSION;
            sendto(rs->sockfd, &header, sizeof (struct rudp_hdr), 0,
                    (struct sockaddr*) p->sa, sizeof (struct sockaddr_in));
            struct timeval now, interval, t;
            gettimeofday(&now, NULL);
            if (RUDP_TIMEOUT >= 1000) {
                interval.tv_sec = RUDP_TIMEOUT / 1000;
                interval.tv_usec = 0;
        } else {
                interval.tv_sec = 0;
                interval.tv_usec = RUDP_TIMEOUT * 1000;
            }
            timeradd(&now, &interval, &t);
            event_timeout(t, rudp_close_session, arg, "rudp_close_socket");
            //printf("FIN sent, check again 2s later\n");
            return 0;
        }
        //printf("peer status closed\n");
        if (remove_peer(&(rs->peers), p) < 0)        {
            perror("rudp_close_session: remove_peer");
            return -1;
        } else {
            //printf("peer successfully removed\n");
            struct timeval now, interval, t;
            gettimeofday(&now, NULL);
            interval.tv_sec = 0;
            interval.tv_usec = RUDP_CHECK_INTERVAL;
            timeradd(&now, &interval, &t);
            event_timeout(t, rudp_close_session, arg, "rudp_close_socket");
        return 0;
        }
    }
    return 0;
}

/*
 *rudp_recvfrom_handler: Register receive callback function
 */

int rudp_recvfrom_handler(rudp_socket_t rsocket,
        int (*handler)(rudp_socket_t, struct sockaddr_in *,
        char *, int)) {
    event_fd(rsocket->sockfd, rudp_listen, handler, "rudp_listen");
    event_fd_delete(rudp_listen, rsocket);
    return 0;
}

/*
 *rudp_event_handler: Register event handler        callback function
 */
int rudp_event_handler(rudp_socket_t rsocket,
        int (*handler)(rudp_socket_t, rudp_event_t,
        struct sockaddr_in *)) {
    struct timeval now, interval, t;
    gettimeofday(&now, NULL);
    interval.tv_sec = 0;
    interval.tv_usec = RUDP_CHECK_INTERVAL;
    timeradd(&now, &interval, &t);
    event_timeout(t, rudp_check_status, handler, "rudp_check_status");
    return 0;
}

int        rudp_check_status(int fd, void * arg) {
    event_timeout_delete(rudp_check_status, arg);
    int (*handler)(rudp_socket_t, rudp_event_t, struct sockaddr_in *);
    handler = (int (*)(rudp_socket_t, rudp_event_t, struct sockaddr_in *))arg;
    rudp_socket_t rs;
    peer_t p;
    packet_t pkt;
    for (rs = rs_head; rs != NULL; rs = rs->next) {
        for (p = rs->peers; p != NULL; p = p->next) {
            if (p->status == PEER_STATUS_CLOSED)        {
                handler(rs, RUDP_EVENT_CLOSED, p->sa);
                if (remove_peer(&(rs->peers), p) < 0)
                    return -1;
                return 0;
            }
            for (pkt = p->packets; pkt != NULL; pkt = pkt->next) {
                if (pkt->status == FAILED) {
                    handler(rs, RUDP_EVENT_TIMEOUT, p->sa);
                    if (remove_peer(&(rs->peers), p) < 0)
        return -1;
                    return 0;
                }
            }
        }
    }
    struct timeval now, interval, t;
    gettimeofday(&now, NULL);
    interval.tv_sec = 0;
    interval.tv_usec = RUDP_CHECK_INTERVAL;
    timeradd(&now, &interval, &t);
    event_timeout(t, rudp_check_status, handler, "rudp_check_status");
    return 0;
}

/*
 * rudp_sendto: Send a block of data to the receiver.
        */

int rudp_sendto(rudp_socket_t rsocket, void* data, int len, struct sockaddr_in* to) {
    peer_t p;
    packet_t pkt;
    struct rudp_hdr *hdr;
    hdr = malloc(sizeof (struct rudp_hdr));
    if (hdr == NULL) {
        perror("rudp_sendto: malloc");
        return -1;
    }
    pkt = malloc(sizeof (struct packet));
    if (pkt == NULL) {
        perror("rudp_sendto: malloc");
        return -1;
    }
    p =        find_peer(&(rsocket->peers), to);
    if (p == NULL) {
        perror("rudp_sendto: find_peer");
        return -1;
    }
    if (p->status == PEER_STATUS_INIT) {
        rudp_setup_session(0, p);
    }
    hdr = malloc(sizeof (struct rudp_hdr));
    if (hdr == NULL) {
        perror("rudp_sendto: malloc");
        return -1;
    }
    pkt = malloc(sizeof (struct packet));
    if (pkt == NULL) {
        perror("rudp_sendto: malloc");
        return -1;
    }
    hdr->version = RUDP_VERSION;
    hdr->type = RUDP_DATA;
    hdr->seqno = p->sn++;
    pkt->data = malloc(len);
    if (pkt->data == NULL) {
        perror("rudp_sendto: malloc");
        return -1;
    }
    memcpy(pkt->data, data, len);
    pkt->datalen = len;
    pkt->header = hdr;
    pkt->next = NULL;
    pkt->trans_count = 0;
        pkt->status = IN_QUEUE;
    //printf("adding packet %d to %d\n", pkt->header->seqno, ntohs(p->sa->sin_port));
    if (add_packet(&p->packets, &pkt) < 0) {
        perror("rudp_sendto: add_packet");
        return -1;
    }
    return 0;
}

int rudp_setup_session(int fd, void *arg) {
    event_timeout_delete(rudp_setup_session, arg);
    peer_t p = (peer_t) arg;
    if (p == NULL) {
        perror("rudp_setup_session: error finding peer");
        return -1;
    }
    int sockfd;
    rudp_socket_t rs;
    peer_t pp;
    int flag = 0;
    for (rs = rs_head; rs != NULL; rs = rs->next) {
        for (pp = rs->peers; pp != NULL; pp = pp->next) {
            if (pp == p) {
                sockfd = rs->sockfd;
                flag = 1;
                break;
            }
        }
        if (flag == 1)
        break;
    }
    struct rudp_hdr header;
    header.seqno = p->sn++;
    header.type = RUDP_SYN;
    header.version = RUDP_VERSION;
    sendto(sockfd, &header, sizeof (struct rudp_hdr), 0,
            (struct sockaddr*) p->sa, sizeof (struct sockaddr_in));
    p->status = PEER_STATUS_PASSIVE;

    struct timeval now, interval, t;
    gettimeofday(&now, NULL);
    if (RUDP_TIMEOUT >= 1000) {
        interval.tv_sec        = RUDP_TIMEOUT / 1000;
        interval.tv_usec = 0;
    } else {
        interval.tv_sec = 0;
        interval.tv_usec = RUDP_TIMEOUT * 1000;
    }
    timeradd(&now, &interval, &t);
    event_timeout(t, rudp_session_check, arg, "rudp_timeout");


    gettimeofday(&now, NULL);
    interval.tv_sec = 0;
    interval.tv_usec = RUDP_CHECK_INTERVAL;
    timeradd(&now, &interval, &t);
    event_timeout(t,        rudp_send, NULL, "rudp_send");
    return 0;
}

int rudp_session_check(int fd, void *arg) {
    event_timeout_delete(rudp_session_check, arg);
    peer_t p = (peer_t) arg;
    if (p == NULL) {
        perror("rudp_setup_session: error finding peer");
        return -1;
    }
    if (p->status == PEER_STATUS_PASSIVE) {
        struct timeval now, interval, t;
        gettimeofday(&now, NULL);
        interval.tv_usec = 0;
        interval.tv_sec = 0;
        timeradd(&now, &interval, &t);
        event_timeout(t, rudp_setup_session, arg, "rudp_setup_session");
    }
    return 0;
}
Leave a Comment

Multiple Sessions

A linked list of all RUDP sockets is maintained. When rudp_socket() is called, an RUDP socket is created and added to the linked list. An RUDP socket keeps a record of the pees/sessions it talks with. When RUDP receives a packet from an unknown socket address, or when RUDP receives a send packet request to an unknown socket address, a new session is created. And for each session, a linked list of all buffered packets is kept.

Leave a Comment

Session Establishment and Tearing Down

When rudp_sendto() is called, the protocol first check if there exists a session between the sender and receiver. If not, the protocol will try to setup a session by sending RUDP_SYN messages. And the packet the application wants to send will be buffered in the created session. After an RUDP_ACK message is received, the server side socket start sending out packets. Go back N protocol is used to control the sending process. After the protocol receives a rudp_close() signal, it will first check whether there are still active sessions and packets in the sending buffer. If not, the protocol will send out RUDP_FIN messages and after receiving RUDP_ACKs, the session is torn down.

Leave a Comment

RUDP Overview

RUDP is a protocol that ensures transfer reliability with UDP. A sliding window protocol (Go back N) is used to realize reliability. Using RUDP, applications can send and receive data packets without worrying about lost packets.

The lines in red signifies state change for RUDP clients (receiver side); while the black lines signifies state change for RUDP servers (sender side).

Leave a Comment