The code looks as follows.
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "event.h"
#include "rudp.h"
#include "rudp_api.h"
//#define DROP 3
rudp_socket_t rs_head = NULL;
peer_t find_peer(peer_t *firstp, structsockaddr_in* sa) {
peer_t p;
//int pc = 0;
for (p = *firstp; p != NULL; p = p->next) {
//printf("peer count: %d\n", ++pc);
if (memcmp(p->sa, sa, sizeof (struct sockaddr_in)) == 0)
return p;
}
//printf("peer not found, creating new peer\n");
p = malloc(sizeof (struct peer));
if (p == NULL) {
perror("find_peer: malloc");
return NULL;
}
struct sockaddr_in*sin;
sin = malloc(sizeof (struct sockaddr_in));
if (sin == NULL) {
perror("peerfind: malloc");
return NULL;
}
memcpy(sin, sa, sizeof (struct sockaddr_in));
p->sa = sin;
p->packets = NULL;
p->status = PEER_STATUS_INIT;
p->sb = 0;
p->sm = RUDP_WINDOW - 1;
p->sn = 0;
p->next = (*firstp);
*firstp = p;
return p;
}
int remove_peer(peer_t *firstp,peer_t rp) {
peer_t p, *pp;
pp = firstp;
for (p = *firstp; p != NULL; p = p->next) {
if (p == rp) {
*pp = rp->next;
free(rp->sa);
free(rp);
return 0;
}
pp = &p->next;
}
return -1;
}
/*
* Add packet into a linked list.
* Inserted in a way that packets with smaller seqno are always placed in front
*/
int add_packet(packet_t*packet_head, packet_t *pkt) {
if (*packet_head == NULL) {
(*pkt)->next = *packet_head;
*packet_head = *pkt;
} else {
packet_t *p = malloc(sizeof (packet_t));
for ((*p) = *packet_head; (*p) != NULL; (*p) = (*p)->next) {
if ((*pkt)->header->seqno == (*p)->header->seqno + 1) {
(*pkt)->next = (*p)->next;
(*p)->next = *pkt;
break;
}
}
//return -1;
}
//printf("packet added: %d type %d\n", (*pkt)->header->seqno, (*pkt)->header->type);
return 0;
}
/*
* Remove a packet from a linked list
*/
int remove_packet(packet_t *packet_head, packet_t pkt) {
packet_t p, *pp;
pp = packet_head;
for (p = *packet_head; p != NULL; p = p->next) {
if (pkt == p) {
//printf("packet seq no %d removed\n",pkt->header->seqno);
*pp = p->next;
free(p);
return 0;
}
pp = &p->next;
}
return -1;
}
/*
* rudp_socket: Create a RUDP socket.
* May use a random port by setting port to zero.
*/
rudp_socket_t rudp_socket(int port32) {
int sockfd;
sockfd = socket(PF_INET, SOCK_DGRAM, 0);
if (sockfd < 0) {
perror("rudp_socket: socket");
return NULL;
}
struct sockaddr_in *sin;
sin = malloc(sizeof (struct sockaddr_in));
if (sin == NULL) {
perror("rudp_socket: malloc");
return NULL;
}
sin->sin_addr.s_addr = htonl(INADDR_ANY);
//sin->sin_addr.s_addr = inet_addr("127.0.0.1");
sin->sin_family = AF_INET;
uint16_t port = port32;
if (port == 0) {
srand(time(0));
port = (rand() % (65535 - 1024)) + 1024;
sin->sin_port = htons(port);
while (bind(sockfd, (struct sockaddr*) sin, sizeof (struct sockaddr_in)) < 0) {
port = (rand() % (65535 - 1024)) + 1024;
sin->sin_port = htons(port);
}
} else {
sin->sin_port = htons(port);
if (bind(sockfd, (struct sockaddr*) sin, sizeof (struct sockaddr_in)) < 0) {
perror("rudp_socket: bind");
return NULL;
}
}
//printf("rudp_socket: bind successfully on port: %d\n", port);
rudp_socket_t rs;
rs = malloc(sizeof (struct rudp_socket));
if (rs == NULL) {
perror("rudp_socket: malloc");
return NULL;
}
rs->sockfd = sockfd;
rs->peers = NULL;
rs->next = rs_head;
rs_head = rs;
/*
int rc = 0;
rudp_socket_t prs;
for(prs = rs_head; prs != NULL; prs =prs->next)
printf("rudp socket count: %d\n", ++rc);
*/
event_fd(rs->sockfd, rudp_listen, rs, "rudp_listen");
return rs;
}
/*
* Listens on the socket
*/
int rudp_listen(int fd, void *arg) {
rudp_socket_t rs;
for (rs = rs_head; rs != NULL; rs = rs->next) {
if (rs->sockfd == fd)
break;
}
if (rs == NULL) {
return -1;
}
int (*handler)(rudp_socket_t,struct sockaddr_in *, char *, int);
/*
client_addr.sin_family = AF_INET;
client_addr.sin_addr.s_addr = htonl(INADDR_ANY);
client_addr.sin_port = htons(0);
*/
unsigned int clientlen = sizeof(struct sockaddr);
struct sockaddr client_sockaddr;
int bufsize = RUDP_MAXPKTSIZE + sizeof (struct rudp_hdr);
int msglen = 0;
char message[bufsize];
memset(message, 0, bufsize);
msglen =recvfrom(rs->sockfd, message, bufsize, 0, & client_sockaddr, &clientlen);
if (msglen < 0) {
perror("rudp_listen: recvfrom");
return -1;
}
struct sockaddr_in client_addr;
memcpy(&client_addr, &client_sockaddr, clientlen);
if (ntohs(client_addr.sin_port) == 0) {
perror("rudp_listen: recvfrom port 0");
return -1;
}
//printf("received from port: %d\n",ntohs(client_addr.sin_port));
peer_t p = find_peer(&(rs->peers), &client_addr);
if (p == NULL) {
return -1;
}
//printf("peer: %d\n", ntohs(p->sa->sin_port));
if (msglen < 0) {
perror("rudp_recvfrom: recvfrom");
return -1;
} else if (msglen < (sizeof (struct rudp_hdr))) {
fprintf(stderr, "rudp_recvfrom: error receiving packet");
return -1;
}
structrudp_hdr *msgheader = malloc(sizeof (struct rudp_hdr));
if (msgheader == NULL) {
perror("rudp_recvfrom: malloc");
return -1;
}
memcpy(msgheader, message, sizeof (struct rudp_hdr));
//printf("status=%d sn=%d\n", p->status, p->sn);
//printf("message from port %d, type=%d seqno=%d\n", ntohs(p->sa->sin_port), msgheader->type, msgheader->seqno);
struct rudp_hdr *replyheader = malloc(sizeof (structrudp_hdr));
if (replyheader == NULL) {
perror("rudp_recvfrom: malloc");
return -1;
}
replyheader->version = RUDP_VERSION;
switch (p->status) {
case PEER_STATUS_INIT:
if (msgheader->type == RUDP_SYN) {
p->status = PEER_STATUS_ACTIVE;
p->sn = msgheader->seqno + 1;
replyheader->seqno = msgheader->seqno + 1;
replyheader->type = RUDP_ACK;
sendto(fd, replyheader, sizeof (struct rudp_hdr), 0,
(struct sockaddr*) & client_addr, clientlen);
//printf("ACK sent. Turns active\n");
}
break;
case PEER_STATUS_PASSIVE:
if (msgheader->type == RUDP_ACK && msgheader->seqno == 1) {
p->status = PEER_STATUS_ACTIVE;
packet_t pkt;
for (pkt = p->packets; pkt != NULL; pkt = pkt->next) {
if (pkt->header->seqno == msgheader->seqno - 1) {
pkt->status = ACKED;
//printf("syn packet no. %d acked\n", pkt->header->seqno);
break;
}
}
//p->sn = 1;
p->sb = 1;
p->sm = RUDP_WINDOW;
}
break;
case PEER_STATUS_ACTIVE:
switch (msgheader->type) {
case RUDP_ACK:
//printf("sb: %d sm: %d\n", p->sb, p->sm);
if (msgheader->seqno >= p->sb) {
packet_t pkt;
for (pkt = p->packets; pkt != NULL; pkt = pkt->next) {
if (pkt->header->seqno == msgheader->seqno - 1) {
pkt->status = ACKED;
//printf("data packet no. %d acked\n", pkt->header->seqno);
break;
}
}
p->sm += msgheader->seqno - p->sb;
p->sb = msgheader->seqno;
}
break;
case RUDP_DATA:
#ifdef DROP
srand(time(0));
if (rand() % DROP == 0) {
printf("packet dropped\n");
break;
}
#endif
if (msgheader->seqno == p->sn) {
p->sn = msgheader->seqno + 1;
replyheader->seqno = msgheader->seqno + 1;
replyheader->type = RUDP_ACK;
handler = malloc(sizeof (int (*)()));
if (handler == NULL) {
perror("rudp_listen: malloc");
return -1;
}
handler = (int (*)(rudp_socket_t, struct sockaddr_in*, char *, int))arg;
void *data;
data = malloc(msglen - sizeof (struct rudp_hdr));
if (data == NULL) {
perror("rudp_listen: malloc");
return -1;
}
memcpy(data, message + sizeof (struct rudp_hdr),
msglen - sizeof (struct rudp_hdr));
handler(rs, &client_addr, data, msglen - sizeof (struct rudp_hdr));
sendto(fd, replyheader, sizeof (struct rudp_hdr), 0,
(struct sockaddr*) & client_addr, clientlen);
}
break;
case RUDP_SYN:
p->status = PEER_STATUS_ACTIVE;
p->sn = msgheader->seqno + 1;
replyheader->seqno = msgheader->seqno + 1;
replyheader->type = RUDP_ACK;
sendto(fd, replyheader, sizeof (struct rudp_hdr), 0,
(struct sockaddr*) & client_addr, clientlen);
break;
case RUDP_FIN:
p->status = PEER_STATUS_CLOSED;
p->sn = msgheader->seqno + 1;
replyheader->seqno = msgheader->seqno + 1;
replyheader->type = RUDP_ACK;
sendto(fd, replyheader, sizeof (struct rudp_hdr), 0,
(struct sockaddr*) & client_addr, clientlen);
break;
default:
fprintf(stderr, "rudp_listen: invalid RUDP type %d\n",
msgheader->type);
break;
}
break;
case PEER_STATUS_CLOSING:
if (msgheader->type == RUDP_ACK && msgheader->seqno == p->sn + 1) {
p->status = PEER_STATUS_CLOSED;
//printf("peer closed\n");
}
break;
case PEER_STATUS_CLOSED:
if (msgheader->type == RUDP_FIN) {
p->status = PEER_STATUS_CLOSED;
p->sn = msgheader->seqno + 1;
replyheader->seqno = msgheader->seqno + 1;
replyheader->type = RUDP_ACK;
sendto(fd, replyheader, sizeof (struct rudp_hdr), 0,
(struct sockaddr*) & client_addr, clientlen);
}
/*
if (msgheader->type == RUDP_SYN) {
p->status = PEER_STATUS_ACTIVE;
p->sn = msgheader->seqno + 1;
replyheader->seqno = msgheader->seqno + 1;
replyheader->type = RUDP_ACK;
sendto(fd, replyheader, sizeof (struct rudp_hdr), 0,
(struct sockaddr*) & client_addr, clientlen);
} else if (msgheader->type == RUDP_FIN) {
p->status = PEER_STATUS_CLOSED;
p->sn = msgheader->seqno + 1;
replyheader->seqno = msgheader->seqno + 1;
replyheader->type = RUDP_ACK;
sendto(fd, replyheader, sizeof (struct rudp_hdr), 0,
(struct sockaddr*) & client_addr, clientlen);
}
*/
break;
default:
fprintf(stderr, "rudp_listen: invalid peer status %d\n",
p->status);
break;
}
return 0;
}
/*
* Keep checking if there are packets to be sent out
*/
int rudp_send(int fd, void *arg) {
event_timeout_delete(rudp_send, arg);
struct timeval now, interval, t;
gettimeofday(&now, NULL);
interval.tv_sec = 0;
interval.tv_usec = RUDP_CHECK_INTERVAL;
timeradd(&now, &interval, &t);
rudp_socket_t rs;
peer_t p;
packet_t pkt;
void *msg;
int len = sizeof (struct rudp_hdr);
//int rsc = 0;
if (rs_head == NULL) {
return 0;
}
for (rs = rs_head; rs != NULL; rs = rs->next) {
//printf("rudp socket count: %d\n", ++rsc);
for (p = rs->peers; p != NULL; p = p->next) {
if (p->status == PEER_STATUS_CLOSED) {
if (remove_peer(&(rs->peers), p) < 0) {
perror("rudp_send: remove_peer");
return -1;
} else {
event_timeout(t, rudp_send, NULL, "rudp_send");
}
return 0;
} else if (p->status != PEER_STATUS_ACTIVE) {
event_timeout(t, rudp_send, NULL, "rudp_send");
return 0;
}
//int pc = 0;
for (pkt = p->packets; pkt != NULL; pkt = pkt->next) {
//printf("pakcet %d: seq number %d status %d\n", ++pc, pkt->header->seqno, pkt->status);
if (pkt->status == ACKED) {
if (remove_packet(&(p->packets), pkt) == -1) {
return -1;
}
event_timeout(t, rudp_send, NULL, "rudp_send");
return 0;
}
if ((pkt->status == IN_QUEUE || pkt->status == TIMEOUT) &&
pkt->header->seqno >= p->sb &&
pkt->header->seqno <= p->sm) {
if (pkt->header->type == RUDP_FIN) {
p->status = PEER_STATUS_CLOSING;
}
len = sizeof (struct rudp_hdr) + pkt->datalen;
msg = malloc(len);
if (msg == NULL) {
perror("rudp_send: malloc");
return -1;
}
memcpy(msg, pkt->header, sizeof (struct rudp_hdr));
memcpy(msg + sizeof (struct rudp_hdr), pkt->data, pkt->datalen);
sendto(rs->sockfd, msg, len, 0,
(struct sockaddr*) p->sa, sizeof (struct sockaddr));
pkt->status = SENT;
pkt->trans_count++;
struct timeval now, interval, t;
gettimeofday(&now, NULL);
if (RUDP_TIMEOUT >= 1000) {
interval.tv_sec = RUDP_TIMEOUT / 1000;
interval.tv_usec = 0;
} else {
interval.tv_sec = 0;
interval.tv_usec = RUDP_TIMEOUT * 1000;
}
timeradd(&now, &interval, &t);
event_timeout(t, rudp_timeout, pkt, "rudp_timeout");
}
}
}
}
event_timeout(t, rudp_send, NULL, "rudp_send");
return 0;
}
int rudp_timeout(int a, void *arg) {
packet_t pkt = (packet_t) arg;
event_timeout_delete(rudp_timeout, pkt);
if (pkt->status != ACKED) {
//printf("Timeout: %d\n", pkt->header->seqno);
pkt->status = TIMEOUT;
if (pkt->trans_count >= RUDP_MAXRETRANS)
pkt->status = FAILED;
}
return 0;
}
/*
*rudp_close: Close socket
*/
int rudp_close(rudp_socket_t rsocket) {
struct timeval now, interval, t;
gettimeofday(&now, NULL);
interval.tv_sec = 0;
interval.tv_usec = RUDP_CHECK_INTERVAL;
timeradd(&now, &interval, &t);
event_timeout(t, rudp_close_session, rsocket, "rudp_close_socket");
return 0;
/*
peer_t p;
packet_t pkt;
struct rudp_hdr *hdr;
hdr = malloc(sizeof (struct rudp_hdr));
if (hdr == NULL) {
perror("rudp_sendto: malloc");
return -1;
}
pkt = malloc(sizeof (struct packet));
if (pkt == NULL) {
perror("rudp_sendto: malloc");
return -1;
}
for (p = rsocket->peers; p != NULL; p = p->next) {
hdr->version = RUDP_VERSION;
hdr->type = RUDP_FIN;
hdr->seqno = p->sn++;
pkt->data = NULL;
pkt->datalen = 0;
pkt->header = hdr;
pkt->next = NULL;
pkt->trans_count = 0;
pkt->status = IN_QUEUE;
if (add_packet(&p->packets, &pkt) < 0) {
perror("rudp_sendto: add_packet");
return -1;
}
}
return 0;
*/
}
int rudp_close_session(int fd, void *arg) {
event_timeout_delete(rudp_close_session, arg);
rudp_socket_t rs = (rudp_socket_t) arg;
peer_t p;
//printf("closing rudp socket %d\n", rs->sockfd);
//printf("peer: %p\n", rs->peers);
if (rs->peers == NULL) {
//printf("peer null\n");
event_fd_delete(rudp_listen, rs);
close(rs->sockfd);
rudp_socket_t r, *rr;
rr = &rs_head;
for (r = rs_head; r != NULL; r = r->next) {
if (r == rs) {
*rr = r->next;
free(r);
return 0;
}
rr = &r->next;
}
return -1;
}
//printf("peers not null\n");
for (p = rs->peers; p != NULL; p = p->next) {
//check if there are still packets to be sent
if (p->packets != NULL) {
struct timeval now, interval, t;
gettimeofday(&now, NULL);
interval.tv_sec = 0;
interval.tv_usec = RUDP_CHECK_INTERVAL;
timeradd(&now, &interval, &t);
event_timeout(t, rudp_close_session, arg, "rudp_close_socket");
return 0;
}
//printf("no packets in this peer, status %d\n", p->status);
if (p->status != PEER_STATUS_CLOSED) {
p->status = PEER_STATUS_CLOSING;
struct rudp_hdr header;
header.seqno = p->sn;
header.type = RUDP_FIN;
header.version = RUDP_VERSION;
sendto(rs->sockfd, &header, sizeof (struct rudp_hdr), 0,
(struct sockaddr*) p->sa, sizeof (struct sockaddr_in));
struct timeval now, interval, t;
gettimeofday(&now, NULL);
if (RUDP_TIMEOUT >= 1000) {
interval.tv_sec = RUDP_TIMEOUT / 1000;
interval.tv_usec = 0;
} else {
interval.tv_sec = 0;
interval.tv_usec = RUDP_TIMEOUT * 1000;
}
timeradd(&now, &interval, &t);
event_timeout(t, rudp_close_session, arg, "rudp_close_socket");
//printf("FIN sent, check again 2s later\n");
return 0;
}
//printf("peer status closed\n");
if (remove_peer(&(rs->peers), p) < 0) {
perror("rudp_close_session: remove_peer");
return -1;
} else {
//printf("peer successfully removed\n");
struct timeval now, interval, t;
gettimeofday(&now, NULL);
interval.tv_sec = 0;
interval.tv_usec = RUDP_CHECK_INTERVAL;
timeradd(&now, &interval, &t);
event_timeout(t, rudp_close_session, arg, "rudp_close_socket");
return 0;
}
}
return 0;
}
/*
*rudp_recvfrom_handler: Register receive callback function
*/
int rudp_recvfrom_handler(rudp_socket_t rsocket,
int (*handler)(rudp_socket_t, struct sockaddr_in *,
char *, int)) {
event_fd(rsocket->sockfd, rudp_listen, handler, "rudp_listen");
event_fd_delete(rudp_listen, rsocket);
return 0;
}
/*
*rudp_event_handler: Register event handler callback function
*/
int rudp_event_handler(rudp_socket_t rsocket,
int (*handler)(rudp_socket_t, rudp_event_t,
struct sockaddr_in *)) {
struct timeval now, interval, t;
gettimeofday(&now, NULL);
interval.tv_sec = 0;
interval.tv_usec = RUDP_CHECK_INTERVAL;
timeradd(&now, &interval, &t);
event_timeout(t, rudp_check_status, handler, "rudp_check_status");
return 0;
}
int rudp_check_status(int fd, void * arg) {
event_timeout_delete(rudp_check_status, arg);
int (*handler)(rudp_socket_t, rudp_event_t, struct sockaddr_in *);
handler = (int (*)(rudp_socket_t, rudp_event_t, struct sockaddr_in *))arg;
rudp_socket_t rs;
peer_t p;
packet_t pkt;
for (rs = rs_head; rs != NULL; rs = rs->next) {
for (p = rs->peers; p != NULL; p = p->next) {
if (p->status == PEER_STATUS_CLOSED) {
handler(rs, RUDP_EVENT_CLOSED, p->sa);
if (remove_peer(&(rs->peers), p) < 0)
return -1;
return 0;
}
for (pkt = p->packets; pkt != NULL; pkt = pkt->next) {
if (pkt->status == FAILED) {
handler(rs, RUDP_EVENT_TIMEOUT, p->sa);
if (remove_peer(&(rs->peers), p) < 0)
return -1;
return 0;
}
}
}
}
struct timeval now, interval, t;
gettimeofday(&now, NULL);
interval.tv_sec = 0;
interval.tv_usec = RUDP_CHECK_INTERVAL;
timeradd(&now, &interval, &t);
event_timeout(t, rudp_check_status, handler, "rudp_check_status");
return 0;
}
/*
* rudp_sendto: Send a block of data to the receiver.
*/
int rudp_sendto(rudp_socket_t rsocket, void* data, int len, struct sockaddr_in* to) {
peer_t p;
packet_t pkt;
struct rudp_hdr *hdr;
hdr = malloc(sizeof (struct rudp_hdr));
if (hdr == NULL) {
perror("rudp_sendto: malloc");
return -1;
}
pkt = malloc(sizeof (struct packet));
if (pkt == NULL) {
perror("rudp_sendto: malloc");
return -1;
}
p = find_peer(&(rsocket->peers), to);
if (p == NULL) {
perror("rudp_sendto: find_peer");
return -1;
}
if (p->status == PEER_STATUS_INIT) {
rudp_setup_session(0, p);
}
hdr = malloc(sizeof (struct rudp_hdr));
if (hdr == NULL) {
perror("rudp_sendto: malloc");
return -1;
}
pkt = malloc(sizeof (struct packet));
if (pkt == NULL) {
perror("rudp_sendto: malloc");
return -1;
}
hdr->version = RUDP_VERSION;
hdr->type = RUDP_DATA;
hdr->seqno = p->sn++;
pkt->data = malloc(len);
if (pkt->data == NULL) {
perror("rudp_sendto: malloc");
return -1;
}
memcpy(pkt->data, data, len);
pkt->datalen = len;
pkt->header = hdr;
pkt->next = NULL;
pkt->trans_count = 0;
pkt->status = IN_QUEUE;
//printf("adding packet %d to %d\n", pkt->header->seqno, ntohs(p->sa->sin_port));
if (add_packet(&p->packets, &pkt) < 0) {
perror("rudp_sendto: add_packet");
return -1;
}
return 0;
}
int rudp_setup_session(int fd, void *arg) {
event_timeout_delete(rudp_setup_session, arg);
peer_t p = (peer_t) arg;
if (p == NULL) {
perror("rudp_setup_session: error finding peer");
return -1;
}
int sockfd;
rudp_socket_t rs;
peer_t pp;
int flag = 0;
for (rs = rs_head; rs != NULL; rs = rs->next) {
for (pp = rs->peers; pp != NULL; pp = pp->next) {
if (pp == p) {
sockfd = rs->sockfd;
flag = 1;
break;
}
}
if (flag == 1)
break;
}
struct rudp_hdr header;
header.seqno = p->sn++;
header.type = RUDP_SYN;
header.version = RUDP_VERSION;
sendto(sockfd, &header, sizeof (struct rudp_hdr), 0,
(struct sockaddr*) p->sa, sizeof (struct sockaddr_in));
p->status = PEER_STATUS_PASSIVE;
struct timeval now, interval, t;
gettimeofday(&now, NULL);
if (RUDP_TIMEOUT >= 1000) {
interval.tv_sec = RUDP_TIMEOUT / 1000;
interval.tv_usec = 0;
} else {
interval.tv_sec = 0;
interval.tv_usec = RUDP_TIMEOUT * 1000;
}
timeradd(&now, &interval, &t);
event_timeout(t, rudp_session_check, arg, "rudp_timeout");
gettimeofday(&now, NULL);
interval.tv_sec = 0;
interval.tv_usec = RUDP_CHECK_INTERVAL;
timeradd(&now, &interval, &t);
event_timeout(t, rudp_send, NULL, "rudp_send");
return 0;
}
int rudp_session_check(int fd, void *arg) {
event_timeout_delete(rudp_session_check, arg);
peer_t p = (peer_t) arg;
if (p == NULL) {
perror("rudp_setup_session: error finding peer");
return -1;
}
if (p->status == PEER_STATUS_PASSIVE) {
struct timeval now, interval, t;
gettimeofday(&now, NULL);
interval.tv_usec = 0;
interval.tv_sec = 0;
timeradd(&now, &interval, &t);
event_timeout(t, rudp_setup_session, arg, "rudp_setup_session");
}
return 0;
}