Skip to content →

Tag: Linux

Exporting and Importing Elasticsearch Indicies

In my project I need to run some local tests with data from a production elasticsearch cluster, so I exported data from the production server and imported to my local cluster. This can also be used when backing up and restoring data. Here’re the instructions.

Before you start, check out the official documentation: Snapshot and Restore.

Backing up/exporting data:

  1. Modify your eleasticsearch configuration file (normally elasticsearch.yml) and add a path.repo line, for example:
    path.repo: /usr/local/var/backups/
  2. Make sure this path has the correct permissions so that elasticsearch can read and write.
  3. Create snapshot:
    curl -XPUT http://localhost:9200/_snapshot/my_backup -d '{"type": "fs", "settings": {"compress": "true", "location": "/usr/local/var/backups/"}}}'
    curl -XPUT http://localhost:9200/_snapshot/my_backup/snapshot_1?wait_forcompletion=true
  4. Copy the files in the configured location to your local machine.

Restoring/importing data:

  1. Modify your local elasticsearch configuration similarly like step 1 when backing up.
  2. Place the snapshot files to the repo path.
  3. Close your indices:
    curl -XPOST http://localhost:9200/knx-bus/_close
  4. Import data:
    curl -XPOST http://localhost:9200/_snapshot/my_backup/snapshot_1/_restore?pretty
  5. Reopen your indices:
    curl -XPOST http://localhost:9200/knx-bus/_open

It is important that your the elasticsearch version on your importing party is compatible with the one exporting data, i.e., in this case your local machine has to be the same version or newer. If not, you need to upgrade elasticsearch first. The official documentation says:

The information stored in a snapshot is not tied to a particular cluster or a cluster name. Therefore it’s possible to restore a snapshot made from one cluster into another cluster. All that is required is registering the repository containing the snapshot in the new cluster and starting the restore process. The new cluster doesn’t have to have the same size or topology. However, the version of the new cluster should be the same or newer than the cluster that was used to create the snapshot.

2 Comments

Solution: dd too slow on Mac OS X

When I was cloning SD cards on Mac OS X using `dd’, it takes ages to get things done. I was using the following command:

diskutil unmountDisk /dev/disk2
sudo dd bs=1m if=~/Downloads/2013-10-09.alice.img of=/dev/disk2

It takes much less time when using /dev/rdisk2 instead of /dev/disk2:

diskutil unmountDisk /dev/disk2
sudo dd bs=1m if=~/Downloads/2013-10-09.alice.img of=/dev/rdisk2

The reason is that rdisks are “raw” thus resulting in a higher R/W speed, according to `man hdiutil` [1]:

/dev/rdisk nodes are character-special devices, but are “raw” in the BSD sense and force block-aligned I/O. They are closer to the physical disk than the buffer cache. /dev/disk nodes, on the other hand, are buffered block-special devices and are used primarily by the kernel’s filesystem code.

[1] http://superuser.com/questions/631592/mac-osx-why-is-dev-rdisk-20-times-faster-than-dev-disk

58 Comments

A “normal” sed on Mac

The `sed` program on Mac is not a standard (GNU) one. To get the normal one, use brew:

brew install gnu-sed

After this, alter PATH. For example, add the following line to your `~/.bash_profile`:

PATH="/usr/local/opt/gnu-sed/libexec/gnubin:$PATH"

And now you have a normal sed!

 

2 Comments

GitHub couples

I’m feeling good today because of theses things:

  1. My mobile phone ran out of battery and the alarm clock didn’t ring this morning, but I still managed to get up just in time and caught the bus at the last minute — and arrived at the company at my usual time.
  2. My manager told me it looks positive to renew my contract and hopefully it will be one and half years. He also says he tries to get it done before the summer vacation, which makes my life a lot easier. Also he says it’s possible to save my holidays till winter. So I’ll be back in China for some time in winter this year.
  3. A very old lady managed to stop and got on the bus even though she waved her hand a bit late to the bus driver. The bus driver was polite and that what I like about Finland: people generally don’t get angry.
  4. Here’s one very funny and geeky picture I saw from xda-developers. In case the link gets invalidated later, the picture reads: “So, where did you two meet?” “Windows users: at the office” “Mac users: at Starbucks” “Linux users: GitHub”.

 

Leave a Comment

Code

The code looks as follows.


#include 
#include 
#include 
#include 
#include
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

#include "event.h"
#include "rudp.h"
#include "rudp_api.h"

//#define DROP 3

rudp_socket_t rs_head = NULL;

peer_t find_peer(peer_t *firstp, structsockaddr_in* sa) {
    peer_t p;
    //int pc = 0;
    for (p = *firstp; p != NULL; p = p->next) {
        //printf("peer count: %d\n", ++pc);
        if (memcmp(p->sa, sa, sizeof (struct sockaddr_in)) == 0)
            return p;
    }
    //printf("peer not found, creating new peer\n");
    p = malloc(sizeof (struct peer));
    if (p == NULL) {
        perror("find_peer: malloc");
        return NULL;
    }
    struct sockaddr_in*sin;
    sin = malloc(sizeof (struct sockaddr_in));
    if (sin == NULL) {
        perror("peerfind: malloc");
        return NULL;
    }
    memcpy(sin, sa, sizeof (struct sockaddr_in));
    p->sa = sin;
    p->packets = NULL;
    p->status = PEER_STATUS_INIT;
    p->sb = 0;
    p->sm = RUDP_WINDOW - 1;
    p->sn = 0;
    p->next = (*firstp);
    *firstp = p;
    return p;
}

int remove_peer(peer_t *firstp,peer_t rp) {
    peer_t p, *pp;
    pp = firstp;
    for (p = *firstp; p != NULL; p = p->next) {
        if (p == rp) {
            *pp = rp->next;
            free(rp->sa);
            free(rp);
            return 0;
        }
        pp = &p->next;
    }
    return -1;
}

/*
 * Add packet into a linked list.
 * Inserted in a way that packets with smaller seqno are always placed in front
 */
int add_packet(packet_t*packet_head, packet_t *pkt) {
    if (*packet_head == NULL) {
        (*pkt)->next = *packet_head;
        *packet_head = *pkt;
    } else {
        packet_t *p = malloc(sizeof (packet_t));
        for ((*p) = *packet_head; (*p) != NULL; (*p) = (*p)->next) {
            if ((*pkt)->header->seqno == (*p)->header->seqno + 1) {
                (*pkt)->next = (*p)->next;
                (*p)->next = *pkt;
                break;
}
        }
        //return -1;
    }
    //printf("packet added: %d type %d\n", (*pkt)->header->seqno, (*pkt)->header->type);
    return 0;
}

/*
 * Remove a packet from a linked list
 */
int remove_packet(packet_t *packet_head, packet_t pkt) {
    packet_t p, *pp;
    pp = packet_head;
    for (p = *packet_head; p != NULL; p = p->next) {
        if (pkt == p) {
            //printf("packet seq no %d removed\n",pkt->header->seqno);
            *pp = p->next;
            free(p);
            return 0;
        }
        pp = &p->next;
    }
    return -1;
}

/*
 * rudp_socket: Create a RUDP socket.
 * May use a random port by setting port to zero.
 */

rudp_socket_t rudp_socket(int port32) {
    int sockfd;
    sockfd = socket(PF_INET, SOCK_DGRAM, 0);
    if (sockfd < 0) {
        perror("rudp_socket: socket");
return NULL;
    }
    struct sockaddr_in *sin;
    sin = malloc(sizeof (struct sockaddr_in));
    if (sin == NULL) {
        perror("rudp_socket: malloc");
        return NULL;
    }
    sin->sin_addr.s_addr = htonl(INADDR_ANY);
    //sin->sin_addr.s_addr = inet_addr("127.0.0.1");
    sin->sin_family = AF_INET;
    uint16_t port = port32;
    if (port == 0) {
        srand(time(0));
        port = (rand() % (65535 - 1024)) + 1024;
sin->sin_port = htons(port);
        while (bind(sockfd, (struct sockaddr*) sin, sizeof (struct sockaddr_in)) < 0) {
            port = (rand() % (65535 - 1024)) + 1024;
            sin->sin_port = htons(port);
        }
    } else {
        sin->sin_port = htons(port);
        if (bind(sockfd, (struct sockaddr*) sin, sizeof (struct sockaddr_in)) < 0) {
            perror("rudp_socket: bind");
            return NULL;
        }
}
    //printf("rudp_socket: bind successfully on port: %d\n", port);
    rudp_socket_t rs;
    rs = malloc(sizeof (struct rudp_socket));
    if (rs == NULL) {
        perror("rudp_socket: malloc");
        return NULL;
    }
    rs->sockfd = sockfd;
    rs->peers = NULL;
    rs->next = rs_head;
    rs_head = rs;
    /*
        int rc = 0;
        rudp_socket_t prs;
        for(prs = rs_head; prs != NULL; prs =prs->next)
            printf("rudp socket count: %d\n", ++rc);
     */
    event_fd(rs->sockfd, rudp_listen, rs, "rudp_listen");
    return rs;
}

/*
 * Listens on the socket
 */
int rudp_listen(int fd, void *arg) {
    rudp_socket_t rs;
    for (rs = rs_head; rs != NULL; rs = rs->next) {
        if (rs->sockfd == fd)
            break;
    }
    if (rs == NULL) {
        return -1;
    }
    int (*handler)(rudp_socket_t,struct sockaddr_in *, char *, int);

    /*
        client_addr.sin_family = AF_INET;
        client_addr.sin_addr.s_addr = htonl(INADDR_ANY);
        client_addr.sin_port = htons(0);
     */
    unsigned int clientlen = sizeof(struct sockaddr);
    struct sockaddr client_sockaddr;
    int bufsize = RUDP_MAXPKTSIZE + sizeof (struct rudp_hdr);
    int msglen = 0;
    char message[bufsize];
    memset(message, 0, bufsize);
    msglen =recvfrom(rs->sockfd, message, bufsize, 0, & client_sockaddr, &clientlen);
    if (msglen < 0) {
        perror("rudp_listen: recvfrom");
        return -1;
    }
    struct sockaddr_in client_addr;
    memcpy(&client_addr, &client_sockaddr, clientlen);
    if (ntohs(client_addr.sin_port) == 0) {
        perror("rudp_listen: recvfrom port 0");
        return -1;
    }
    //printf("received from port: %d\n",ntohs(client_addr.sin_port));
    peer_t p = find_peer(&(rs->peers), &client_addr);
    if (p == NULL) {
        return -1;
    }
    //printf("peer: %d\n", ntohs(p->sa->sin_port));
    if (msglen < 0) {
        perror("rudp_recvfrom: recvfrom");
        return -1;
    } else if (msglen < (sizeof (struct rudp_hdr))) {
        fprintf(stderr, "rudp_recvfrom: error receiving packet");
        return -1;
    }
    structrudp_hdr *msgheader = malloc(sizeof (struct rudp_hdr));
    if (msgheader == NULL) {
        perror("rudp_recvfrom: malloc");
        return -1;
    }
    memcpy(msgheader, message, sizeof (struct rudp_hdr));

    //printf("status=%d sn=%d\n", p->status, p->sn);
    //printf("message from port %d, type=%d seqno=%d\n", ntohs(p->sa->sin_port), msgheader->type, msgheader->seqno);
    struct rudp_hdr *replyheader = malloc(sizeof (structrudp_hdr));
    if (replyheader == NULL) {
        perror("rudp_recvfrom: malloc");
        return -1;
    }
    replyheader->version = RUDP_VERSION;
    switch (p->status) {
        case PEER_STATUS_INIT:
            if (msgheader->type == RUDP_SYN) {
                p->status = PEER_STATUS_ACTIVE;
                p->sn = msgheader->seqno + 1;
                replyheader->seqno = msgheader->seqno + 1;
replyheader->type = RUDP_ACK;
                sendto(fd, replyheader, sizeof (struct rudp_hdr), 0,
                        (struct sockaddr*) & client_addr, clientlen);
                //printf("ACK sent. Turns active\n");
            }
            break;
        case PEER_STATUS_PASSIVE:
            if (msgheader->type == RUDP_ACK && msgheader->seqno == 1) {
                p->status = PEER_STATUS_ACTIVE;
                packet_t pkt;
for (pkt = p->packets; pkt != NULL; pkt = pkt->next) {
                    if (pkt->header->seqno == msgheader->seqno - 1) {
                        pkt->status = ACKED;
                        //printf("syn packet no. %d acked\n", pkt->header->seqno);
                        break;
                    }
                }
                //p->sn = 1;
                p->sb = 1;
                p->sm = RUDP_WINDOW;
    }
            break;
        case PEER_STATUS_ACTIVE:
            switch (msgheader->type) {
                case RUDP_ACK:
                    //printf("sb: %d sm: %d\n", p->sb, p->sm);
                    if (msgheader->seqno >= p->sb) {
                        packet_t pkt;
                        for (pkt = p->packets; pkt != NULL; pkt = pkt->next) {
                            if (pkt->header->seqno == msgheader->seqno -    1) {
                                pkt->status = ACKED;
                                //printf("data packet no. %d acked\n", pkt->header->seqno);
                                break;
                            }
                        }
                        p->sm += msgheader->seqno - p->sb;
                        p->sb = msgheader->seqno;
                    }
                    break;
    case RUDP_DATA:
#ifdef DROP
                    srand(time(0));
                    if (rand() % DROP == 0) {
                        printf("packet dropped\n");
                        break;
                    }
#endif
                    if (msgheader->seqno == p->sn) {
                        p->sn = msgheader->seqno + 1;
                        replyheader->seqno = msgheader->seqno + 1;
    replyheader->type = RUDP_ACK;
                        handler = malloc(sizeof (int (*)()));
                        if (handler == NULL) {
                            perror("rudp_listen: malloc");
                            return -1;
                        }
                        handler = (int (*)(rudp_socket_t, struct sockaddr_in*, char *, int))arg;
                        void *data;
                        data = malloc(msglen -    sizeof (struct rudp_hdr));
                        if (data == NULL) {
                            perror("rudp_listen: malloc");
                            return -1;
                        }
                        memcpy(data, message + sizeof (struct rudp_hdr),
                                msglen - sizeof (struct rudp_hdr));
                        handler(rs, &client_addr, data, msglen - sizeof (struct    rudp_hdr));
                        sendto(fd, replyheader, sizeof (struct rudp_hdr), 0,
                                (struct sockaddr*) & client_addr, clientlen);
                    }
                    break;
                case RUDP_SYN:
                    p->status = PEER_STATUS_ACTIVE;
                    p->sn = msgheader->seqno + 1;
                    replyheader->seqno = msgheader->seqno + 1;
    replyheader->type = RUDP_ACK;
                    sendto(fd, replyheader, sizeof (struct rudp_hdr), 0,
                            (struct sockaddr*) & client_addr, clientlen);
                    break;
                case RUDP_FIN:
                    p->status = PEER_STATUS_CLOSED;
                    p->sn = msgheader->seqno + 1;
                    replyheader->seqno = msgheader->seqno + 1;
    replyheader->type = RUDP_ACK;
                    sendto(fd, replyheader, sizeof (struct rudp_hdr), 0,
                            (struct sockaddr*) & client_addr, clientlen);
                    break;
                default:
                    fprintf(stderr, "rudp_listen: invalid RUDP type %d\n",
                            msgheader->type);
                    break;
            }
            break;
        case    PEER_STATUS_CLOSING:
            if (msgheader->type == RUDP_ACK && msgheader->seqno == p->sn + 1) {
                p->status = PEER_STATUS_CLOSED;
                //printf("peer closed\n");
            }
            break;
        case PEER_STATUS_CLOSED:
            if (msgheader->type == RUDP_FIN) {
                p->status = PEER_STATUS_CLOSED;
                p->sn = msgheader->seqno + 1;
    replyheader->seqno = msgheader->seqno + 1;
                replyheader->type = RUDP_ACK;
                sendto(fd, replyheader, sizeof (struct rudp_hdr), 0,
                        (struct sockaddr*) & client_addr, clientlen);
            }
            /*
                        if (msgheader->type == RUDP_SYN) {
                            p->status = PEER_STATUS_ACTIVE;
    p->sn = msgheader->seqno + 1;
                            replyheader->seqno = msgheader->seqno + 1;
                            replyheader->type = RUDP_ACK;
                            sendto(fd, replyheader, sizeof (struct rudp_hdr), 0,
                                    (struct sockaddr*) & client_addr, clientlen);
                        } else if (msgheader->type == RUDP_FIN) {
                            p->status =    PEER_STATUS_CLOSED;
                            p->sn = msgheader->seqno + 1;
                            replyheader->seqno = msgheader->seqno + 1;
                            replyheader->type = RUDP_ACK;
                            sendto(fd, replyheader, sizeof (struct rudp_hdr), 0,
                                    (struct sockaddr*) & client_addr, clientlen);
                        }
             */
            break;
    default:
            fprintf(stderr, "rudp_listen: invalid peer status %d\n",
                    p->status);
            break;
    }
    return 0;
}

/*
 * Keep checking if there are packets to be sent out
 */
int rudp_send(int fd, void *arg) {
    event_timeout_delete(rudp_send, arg);
    struct timeval now, interval, t;
    gettimeofday(&now, NULL);
    interval.tv_sec = 0;
    interval.tv_usec = RUDP_CHECK_INTERVAL;
    timeradd(&now, &interval, &t);
    rudp_socket_t rs;
    peer_t p;
    packet_t pkt;
    void *msg;
    int len = sizeof (struct rudp_hdr);
    //int rsc = 0;
    if (rs_head == NULL) {
        return 0;
    }
    for (rs = rs_head; rs != NULL; rs = rs->next) {
        //printf("rudp socket count: %d\n", ++rsc);
        for (p = rs->peers; p != NULL; p = p->next) {
            if (p->status == PEER_STATUS_CLOSED) {
    if (remove_peer(&(rs->peers), p) < 0) {
                    perror("rudp_send: remove_peer");
                    return -1;
                } else {
                    event_timeout(t, rudp_send, NULL, "rudp_send");
                }
                return 0;
            } else if (p->status != PEER_STATUS_ACTIVE) {
                event_timeout(t, rudp_send, NULL, "rudp_send");
                return 0;
            }
        //int pc = 0;
            for (pkt = p->packets; pkt != NULL; pkt = pkt->next) {
                //printf("pakcet %d: seq number %d status %d\n", ++pc, pkt->header->seqno, pkt->status);
                if (pkt->status == ACKED) {
                    if (remove_packet(&(p->packets), pkt) == -1) {
                        return -1;
                    }
                    event_timeout(t, rudp_send, NULL, "rudp_send");
        return 0;
                }
                if ((pkt->status == IN_QUEUE || pkt->status == TIMEOUT) &&
                        pkt->header->seqno >= p->sb &&
                        pkt->header->seqno <= p->sm) {
                    if (pkt->header->type == RUDP_FIN) {
                        p->status = PEER_STATUS_CLOSING;
                    }
                    len = sizeof (struct        rudp_hdr) + pkt->datalen;
                    msg = malloc(len);
                    if (msg == NULL) {
                        perror("rudp_send: malloc");
                        return -1;
                    }
                    memcpy(msg, pkt->header, sizeof (struct rudp_hdr));
                    memcpy(msg + sizeof (struct rudp_hdr), pkt->data, pkt->datalen);
                    sendto(rs->sockfd, msg,        len, 0,
                            (struct sockaddr*) p->sa, sizeof (struct sockaddr));
                    pkt->status = SENT;
                    pkt->trans_count++;
                    struct timeval now, interval, t;
                    gettimeofday(&now, NULL);
                    if (RUDP_TIMEOUT >= 1000) {
                        interval.tv_sec = RUDP_TIMEOUT / 1000;
                        interval.tv_usec = 0;
        } else {
                        interval.tv_sec = 0;
                        interval.tv_usec = RUDP_TIMEOUT * 1000;
                    }
                    timeradd(&now, &interval, &t);
                    event_timeout(t, rudp_timeout, pkt, "rudp_timeout");
                }

            }
        }
    }
    event_timeout(t, rudp_send, NULL, "rudp_send");
    return 0;
}

int rudp_timeout(int a, void *arg) {
        packet_t pkt = (packet_t) arg;
    event_timeout_delete(rudp_timeout, pkt);
    if (pkt->status != ACKED) {
        //printf("Timeout: %d\n", pkt->header->seqno);
        pkt->status = TIMEOUT;
        if (pkt->trans_count >= RUDP_MAXRETRANS)
            pkt->status = FAILED;
    }
    return 0;
}

/*
 *rudp_close: Close socket
 */

int rudp_close(rudp_socket_t rsocket) {

    struct timeval now, interval,        t;
    gettimeofday(&now, NULL);
    interval.tv_sec = 0;
    interval.tv_usec = RUDP_CHECK_INTERVAL;
    timeradd(&now, &interval, &t);
    event_timeout(t, rudp_close_session, rsocket, "rudp_close_socket");
    return 0;
    /*
        peer_t p;
        packet_t pkt;
        struct rudp_hdr *hdr;
        hdr = malloc(sizeof (struct rudp_hdr));
        if (hdr == NULL) {
            perror("rudp_sendto: malloc");
        return -1;
        }
        pkt = malloc(sizeof (struct packet));
        if (pkt == NULL) {
            perror("rudp_sendto: malloc");
            return -1;
        }
        for (p = rsocket->peers; p != NULL; p = p->next) {
            hdr->version = RUDP_VERSION;
            hdr->type = RUDP_FIN;
            hdr->seqno = p->sn++;
            pkt->data = NULL;
            pkt->datalen = 0;
        pkt->header = hdr;
            pkt->next = NULL;
            pkt->trans_count = 0;
            pkt->status = IN_QUEUE;
            if (add_packet(&p->packets, &pkt) < 0) {
                perror("rudp_sendto: add_packet");
                return -1;
            }
        }
        return 0;
     */
}

int rudp_close_session(int fd, void *arg) {
    event_timeout_delete(rudp_close_session, arg);
    rudp_socket_t rs        = (rudp_socket_t) arg;
    peer_t p;
    //printf("closing rudp socket %d\n", rs->sockfd);
    //printf("peer: %p\n", rs->peers);
    if (rs->peers == NULL) {
        //printf("peer null\n");
        event_fd_delete(rudp_listen, rs);
        close(rs->sockfd);
        rudp_socket_t r, *rr;
        rr = &rs_head;
        for (r = rs_head; r != NULL; r = r->next) {
            if (r == rs) {
                *rr = r->next;
        free(r);
                return 0;
            }
            rr = &r->next;
        }
        return -1;
    }
    //printf("peers not null\n");
    for (p = rs->peers; p != NULL; p = p->next) {
        //check if there are still packets to be sent
        if (p->packets != NULL) {
            struct timeval now, interval, t;
            gettimeofday(&now, NULL);
            interval.tv_sec = 0;
        interval.tv_usec = RUDP_CHECK_INTERVAL;
            timeradd(&now, &interval, &t);
            event_timeout(t, rudp_close_session, arg, "rudp_close_socket");
            return 0;
        }
        //printf("no packets in this peer, status %d\n", p->status);
        if (p->status != PEER_STATUS_CLOSED) {
            p->status = PEER_STATUS_CLOSING;
            struct rudp_hdr header;
            header.seqno = p->sn;
        header.type = RUDP_FIN;
            header.version = RUDP_VERSION;
            sendto(rs->sockfd, &header, sizeof (struct rudp_hdr), 0,
                    (struct sockaddr*) p->sa, sizeof (struct sockaddr_in));
            struct timeval now, interval, t;
            gettimeofday(&now, NULL);
            if (RUDP_TIMEOUT >= 1000) {
                interval.tv_sec = RUDP_TIMEOUT / 1000;
                interval.tv_usec = 0;
        } else {
                interval.tv_sec = 0;
                interval.tv_usec = RUDP_TIMEOUT * 1000;
            }
            timeradd(&now, &interval, &t);
            event_timeout(t, rudp_close_session, arg, "rudp_close_socket");
            //printf("FIN sent, check again 2s later\n");
            return 0;
        }
        //printf("peer status closed\n");
        if (remove_peer(&(rs->peers), p) < 0)        {
            perror("rudp_close_session: remove_peer");
            return -1;
        } else {
            //printf("peer successfully removed\n");
            struct timeval now, interval, t;
            gettimeofday(&now, NULL);
            interval.tv_sec = 0;
            interval.tv_usec = RUDP_CHECK_INTERVAL;
            timeradd(&now, &interval, &t);
            event_timeout(t, rudp_close_session, arg, "rudp_close_socket");
        return 0;
        }
    }
    return 0;
}

/*
 *rudp_recvfrom_handler: Register receive callback function
 */

int rudp_recvfrom_handler(rudp_socket_t rsocket,
        int (*handler)(rudp_socket_t, struct sockaddr_in *,
        char *, int)) {
    event_fd(rsocket->sockfd, rudp_listen, handler, "rudp_listen");
    event_fd_delete(rudp_listen, rsocket);
    return 0;
}

/*
 *rudp_event_handler: Register event handler        callback function
 */
int rudp_event_handler(rudp_socket_t rsocket,
        int (*handler)(rudp_socket_t, rudp_event_t,
        struct sockaddr_in *)) {
    struct timeval now, interval, t;
    gettimeofday(&now, NULL);
    interval.tv_sec = 0;
    interval.tv_usec = RUDP_CHECK_INTERVAL;
    timeradd(&now, &interval, &t);
    event_timeout(t, rudp_check_status, handler, "rudp_check_status");
    return 0;
}

int        rudp_check_status(int fd, void * arg) {
    event_timeout_delete(rudp_check_status, arg);
    int (*handler)(rudp_socket_t, rudp_event_t, struct sockaddr_in *);
    handler = (int (*)(rudp_socket_t, rudp_event_t, struct sockaddr_in *))arg;
    rudp_socket_t rs;
    peer_t p;
    packet_t pkt;
    for (rs = rs_head; rs != NULL; rs = rs->next) {
        for (p = rs->peers; p != NULL; p = p->next) {
            if (p->status == PEER_STATUS_CLOSED)        {
                handler(rs, RUDP_EVENT_CLOSED, p->sa);
                if (remove_peer(&(rs->peers), p) < 0)
                    return -1;
                return 0;
            }
            for (pkt = p->packets; pkt != NULL; pkt = pkt->next) {
                if (pkt->status == FAILED) {
                    handler(rs, RUDP_EVENT_TIMEOUT, p->sa);
                    if (remove_peer(&(rs->peers), p) < 0)
        return -1;
                    return 0;
                }
            }
        }
    }
    struct timeval now, interval, t;
    gettimeofday(&now, NULL);
    interval.tv_sec = 0;
    interval.tv_usec = RUDP_CHECK_INTERVAL;
    timeradd(&now, &interval, &t);
    event_timeout(t, rudp_check_status, handler, "rudp_check_status");
    return 0;
}

/*
 * rudp_sendto: Send a block of data to the receiver.
        */

int rudp_sendto(rudp_socket_t rsocket, void* data, int len, struct sockaddr_in* to) {
    peer_t p;
    packet_t pkt;
    struct rudp_hdr *hdr;
    hdr = malloc(sizeof (struct rudp_hdr));
    if (hdr == NULL) {
        perror("rudp_sendto: malloc");
        return -1;
    }
    pkt = malloc(sizeof (struct packet));
    if (pkt == NULL) {
        perror("rudp_sendto: malloc");
        return -1;
    }
    p =        find_peer(&(rsocket->peers), to);
    if (p == NULL) {
        perror("rudp_sendto: find_peer");
        return -1;
    }
    if (p->status == PEER_STATUS_INIT) {
        rudp_setup_session(0, p);
    }
    hdr = malloc(sizeof (struct rudp_hdr));
    if (hdr == NULL) {
        perror("rudp_sendto: malloc");
        return -1;
    }
    pkt = malloc(sizeof (struct packet));
    if (pkt == NULL) {
        perror("rudp_sendto: malloc");
        return -1;
    }
    hdr->version = RUDP_VERSION;
    hdr->type = RUDP_DATA;
    hdr->seqno = p->sn++;
    pkt->data = malloc(len);
    if (pkt->data == NULL) {
        perror("rudp_sendto: malloc");
        return -1;
    }
    memcpy(pkt->data, data, len);
    pkt->datalen = len;
    pkt->header = hdr;
    pkt->next = NULL;
    pkt->trans_count = 0;
        pkt->status = IN_QUEUE;
    //printf("adding packet %d to %d\n", pkt->header->seqno, ntohs(p->sa->sin_port));
    if (add_packet(&p->packets, &pkt) < 0) {
        perror("rudp_sendto: add_packet");
        return -1;
    }
    return 0;
}

int rudp_setup_session(int fd, void *arg) {
    event_timeout_delete(rudp_setup_session, arg);
    peer_t p = (peer_t) arg;
    if (p == NULL) {
        perror("rudp_setup_session: error finding peer");
        return -1;
    }
    int sockfd;
    rudp_socket_t rs;
    peer_t pp;
    int flag = 0;
    for (rs = rs_head; rs != NULL; rs = rs->next) {
        for (pp = rs->peers; pp != NULL; pp = pp->next) {
            if (pp == p) {
                sockfd = rs->sockfd;
                flag = 1;
                break;
            }
        }
        if (flag == 1)
        break;
    }
    struct rudp_hdr header;
    header.seqno = p->sn++;
    header.type = RUDP_SYN;
    header.version = RUDP_VERSION;
    sendto(sockfd, &header, sizeof (struct rudp_hdr), 0,
            (struct sockaddr*) p->sa, sizeof (struct sockaddr_in));
    p->status = PEER_STATUS_PASSIVE;

    struct timeval now, interval, t;
    gettimeofday(&now, NULL);
    if (RUDP_TIMEOUT >= 1000) {
        interval.tv_sec        = RUDP_TIMEOUT / 1000;
        interval.tv_usec = 0;
    } else {
        interval.tv_sec = 0;
        interval.tv_usec = RUDP_TIMEOUT * 1000;
    }
    timeradd(&now, &interval, &t);
    event_timeout(t, rudp_session_check, arg, "rudp_timeout");


    gettimeofday(&now, NULL);
    interval.tv_sec = 0;
    interval.tv_usec = RUDP_CHECK_INTERVAL;
    timeradd(&now, &interval, &t);
    event_timeout(t,        rudp_send, NULL, "rudp_send");
    return 0;
}

int rudp_session_check(int fd, void *arg) {
    event_timeout_delete(rudp_session_check, arg);
    peer_t p = (peer_t) arg;
    if (p == NULL) {
        perror("rudp_setup_session: error finding peer");
        return -1;
    }
    if (p->status == PEER_STATUS_PASSIVE) {
        struct timeval now, interval, t;
        gettimeofday(&now, NULL);
        interval.tv_usec = 0;
        interval.tv_sec = 0;
        timeradd(&now, &interval, &t);
        event_timeout(t, rudp_setup_session, arg, "rudp_setup_session");
    }
    return 0;
}
Leave a Comment

Multiple Sessions

A linked list of all RUDP sockets is maintained. When rudp_socket() is called, an RUDP socket is created and added to the linked list. An RUDP socket keeps a record of the pees/sessions it talks with. When RUDP receives a packet from an unknown socket address, or when RUDP receives a send packet request to an unknown socket address, a new session is created. And for each session, a linked list of all buffered packets is kept.

Leave a Comment

Session Establishment and Tearing Down

When rudp_sendto() is called, the protocol first check if there exists a session between the sender and receiver. If not, the protocol will try to setup a session by sending RUDP_SYN messages. And the packet the application wants to send will be buffered in the created session. After an RUDP_ACK message is received, the server side socket start sending out packets. Go back N protocol is used to control the sending process. After the protocol receives a rudp_close() signal, it will first check whether there are still active sessions and packets in the sending buffer. If not, the protocol will send out RUDP_FIN messages and after receiving RUDP_ACKs, the session is torn down.

Leave a Comment

RUDP Overview

RUDP is a protocol that ensures transfer reliability with UDP. A sliding window protocol (Go back N) is used to realize reliability. Using RUDP, applications can send and receive data packets without worrying about lost packets.

The lines in red signifies state change for RUDP clients (receiver side); while the black lines signifies state change for RUDP servers (sender side).

Leave a Comment

CrazyBus Launch Script

#!/bin/bash

PREFIX=
if [ -L "$0" ]; then
    PREFIX=`readlink -f $0`
    if [ $? -eq 0 ]; then
        PREFIX=`dirname $PREFIX`        
    else 
            PREFIX=`file $0`
            PREFIX=${PREFIX##*symbolic link to }
            PREFIX=`dirname $PREFIX`
    fi
else

PREFIX=`dirname $0`
fi

case $PREFIX in
        /*)
        ;;
        *)
        cd $PREFIX
        PREFIX=`pwd`
        ;;
esac

cp=
for i in $PREFIX/lib/*.jar; do
        cp=$i:$cp
done


$PREFIX/java/bin/java \
-Djava.library.path=$PREFIX/swt -classpath $cp \
org.crazybus.login.Login  &
Leave a Comment