The code looks as follows.
#include#include #include #include #include #include #include #include #include #include #include #include #include #include #include "event.h" #include "rudp.h" #include "rudp_api.h" //#define DROP 3 rudp_socket_t rs_head = NULL; peer_t find_peer(peer_t *firstp, structsockaddr_in* sa) { peer_t p; //int pc = 0; for (p = *firstp; p != NULL; p = p->next) { //printf("peer count: %d\n", ++pc); if (memcmp(p->sa, sa, sizeof (struct sockaddr_in)) == 0) return p; } //printf("peer not found, creating new peer\n"); p = malloc(sizeof (struct peer)); if (p == NULL) { perror("find_peer: malloc"); return NULL; } struct sockaddr_in*sin; sin = malloc(sizeof (struct sockaddr_in)); if (sin == NULL) { perror("peerfind: malloc"); return NULL; } memcpy(sin, sa, sizeof (struct sockaddr_in)); p->sa = sin; p->packets = NULL; p->status = PEER_STATUS_INIT; p->sb = 0; p->sm = RUDP_WINDOW - 1; p->sn = 0; p->next = (*firstp); *firstp = p; return p; } int remove_peer(peer_t *firstp,peer_t rp) { peer_t p, *pp; pp = firstp; for (p = *firstp; p != NULL; p = p->next) { if (p == rp) { *pp = rp->next; free(rp->sa); free(rp); return 0; } pp = &p->next; } return -1; } /* * Add packet into a linked list. * Inserted in a way that packets with smaller seqno are always placed in front */ int add_packet(packet_t*packet_head, packet_t *pkt) { if (*packet_head == NULL) { (*pkt)->next = *packet_head; *packet_head = *pkt; } else { packet_t *p = malloc(sizeof (packet_t)); for ((*p) = *packet_head; (*p) != NULL; (*p) = (*p)->next) { if ((*pkt)->header->seqno == (*p)->header->seqno + 1) { (*pkt)->next = (*p)->next; (*p)->next = *pkt; break; } } //return -1; } //printf("packet added: %d type %d\n", (*pkt)->header->seqno, (*pkt)->header->type); return 0; } /* * Remove a packet from a linked list */ int remove_packet(packet_t *packet_head, packet_t pkt) { packet_t p, *pp; pp = packet_head; for (p = *packet_head; p != NULL; p = p->next) { if (pkt == p) { //printf("packet seq no %d removed\n",pkt->header->seqno); *pp = p->next; free(p); return 0; } pp = &p->next; } return -1; } /* * rudp_socket: Create a RUDP socket. * May use a random port by setting port to zero. */ rudp_socket_t rudp_socket(int port32) { int sockfd; sockfd = socket(PF_INET, SOCK_DGRAM, 0); if (sockfd < 0) { perror("rudp_socket: socket"); return NULL; } struct sockaddr_in *sin; sin = malloc(sizeof (struct sockaddr_in)); if (sin == NULL) { perror("rudp_socket: malloc"); return NULL; } sin->sin_addr.s_addr = htonl(INADDR_ANY); //sin->sin_addr.s_addr = inet_addr("127.0.0.1"); sin->sin_family = AF_INET; uint16_t port = port32; if (port == 0) { srand(time(0)); port = (rand() % (65535 - 1024)) + 1024; sin->sin_port = htons(port); while (bind(sockfd, (struct sockaddr*) sin, sizeof (struct sockaddr_in)) < 0) { port = (rand() % (65535 - 1024)) + 1024; sin->sin_port = htons(port); } } else { sin->sin_port = htons(port); if (bind(sockfd, (struct sockaddr*) sin, sizeof (struct sockaddr_in)) < 0) { perror("rudp_socket: bind"); return NULL; } } //printf("rudp_socket: bind successfully on port: %d\n", port); rudp_socket_t rs; rs = malloc(sizeof (struct rudp_socket)); if (rs == NULL) { perror("rudp_socket: malloc"); return NULL; } rs->sockfd = sockfd; rs->peers = NULL; rs->next = rs_head; rs_head = rs; /* int rc = 0; rudp_socket_t prs; for(prs = rs_head; prs != NULL; prs =prs->next) printf("rudp socket count: %d\n", ++rc); */ event_fd(rs->sockfd, rudp_listen, rs, "rudp_listen"); return rs; } /* * Listens on the socket */ int rudp_listen(int fd, void *arg) { rudp_socket_t rs; for (rs = rs_head; rs != NULL; rs = rs->next) { if (rs->sockfd == fd) break; } if (rs == NULL) { return -1; } int (*handler)(rudp_socket_t,struct sockaddr_in *, char *, int); /* client_addr.sin_family = AF_INET; client_addr.sin_addr.s_addr = htonl(INADDR_ANY); client_addr.sin_port = htons(0); */ unsigned int clientlen = sizeof(struct sockaddr); struct sockaddr client_sockaddr; int bufsize = RUDP_MAXPKTSIZE + sizeof (struct rudp_hdr); int msglen = 0; char message[bufsize]; memset(message, 0, bufsize); msglen =recvfrom(rs->sockfd, message, bufsize, 0, & client_sockaddr, &clientlen); if (msglen < 0) { perror("rudp_listen: recvfrom"); return -1; } struct sockaddr_in client_addr; memcpy(&client_addr, &client_sockaddr, clientlen); if (ntohs(client_addr.sin_port) == 0) { perror("rudp_listen: recvfrom port 0"); return -1; } //printf("received from port: %d\n",ntohs(client_addr.sin_port)); peer_t p = find_peer(&(rs->peers), &client_addr); if (p == NULL) { return -1; } //printf("peer: %d\n", ntohs(p->sa->sin_port)); if (msglen < 0) { perror("rudp_recvfrom: recvfrom"); return -1; } else if (msglen < (sizeof (struct rudp_hdr))) { fprintf(stderr, "rudp_recvfrom: error receiving packet"); return -1; } structrudp_hdr *msgheader = malloc(sizeof (struct rudp_hdr)); if (msgheader == NULL) { perror("rudp_recvfrom: malloc"); return -1; } memcpy(msgheader, message, sizeof (struct rudp_hdr)); //printf("status=%d sn=%d\n", p->status, p->sn); //printf("message from port %d, type=%d seqno=%d\n", ntohs(p->sa->sin_port), msgheader->type, msgheader->seqno); struct rudp_hdr *replyheader = malloc(sizeof (structrudp_hdr)); if (replyheader == NULL) { perror("rudp_recvfrom: malloc"); return -1; } replyheader->version = RUDP_VERSION; switch (p->status) { case PEER_STATUS_INIT: if (msgheader->type == RUDP_SYN) { p->status = PEER_STATUS_ACTIVE; p->sn = msgheader->seqno + 1; replyheader->seqno = msgheader->seqno + 1; replyheader->type = RUDP_ACK; sendto(fd, replyheader, sizeof (struct rudp_hdr), 0, (struct sockaddr*) & client_addr, clientlen); //printf("ACK sent. Turns active\n"); } break; case PEER_STATUS_PASSIVE: if (msgheader->type == RUDP_ACK && msgheader->seqno == 1) { p->status = PEER_STATUS_ACTIVE; packet_t pkt; for (pkt = p->packets; pkt != NULL; pkt = pkt->next) { if (pkt->header->seqno == msgheader->seqno - 1) { pkt->status = ACKED; //printf("syn packet no. %d acked\n", pkt->header->seqno); break; } } //p->sn = 1; p->sb = 1; p->sm = RUDP_WINDOW; } break; case PEER_STATUS_ACTIVE: switch (msgheader->type) { case RUDP_ACK: //printf("sb: %d sm: %d\n", p->sb, p->sm); if (msgheader->seqno >= p->sb) { packet_t pkt; for (pkt = p->packets; pkt != NULL; pkt = pkt->next) { if (pkt->header->seqno == msgheader->seqno - 1) { pkt->status = ACKED; //printf("data packet no. %d acked\n", pkt->header->seqno); break; } } p->sm += msgheader->seqno - p->sb; p->sb = msgheader->seqno; } break; case RUDP_DATA: #ifdef DROP srand(time(0)); if (rand() % DROP == 0) { printf("packet dropped\n"); break; } #endif if (msgheader->seqno == p->sn) { p->sn = msgheader->seqno + 1; replyheader->seqno = msgheader->seqno + 1; replyheader->type = RUDP_ACK; handler = malloc(sizeof (int (*)())); if (handler == NULL) { perror("rudp_listen: malloc"); return -1; } handler = (int (*)(rudp_socket_t, struct sockaddr_in*, char *, int))arg; void *data; data = malloc(msglen - sizeof (struct rudp_hdr)); if (data == NULL) { perror("rudp_listen: malloc"); return -1; } memcpy(data, message + sizeof (struct rudp_hdr), msglen - sizeof (struct rudp_hdr)); handler(rs, &client_addr, data, msglen - sizeof (struct rudp_hdr)); sendto(fd, replyheader, sizeof (struct rudp_hdr), 0, (struct sockaddr*) & client_addr, clientlen); } break; case RUDP_SYN: p->status = PEER_STATUS_ACTIVE; p->sn = msgheader->seqno + 1; replyheader->seqno = msgheader->seqno + 1; replyheader->type = RUDP_ACK; sendto(fd, replyheader, sizeof (struct rudp_hdr), 0, (struct sockaddr*) & client_addr, clientlen); break; case RUDP_FIN: p->status = PEER_STATUS_CLOSED; p->sn = msgheader->seqno + 1; replyheader->seqno = msgheader->seqno + 1; replyheader->type = RUDP_ACK; sendto(fd, replyheader, sizeof (struct rudp_hdr), 0, (struct sockaddr*) & client_addr, clientlen); break; default: fprintf(stderr, "rudp_listen: invalid RUDP type %d\n", msgheader->type); break; } break; case PEER_STATUS_CLOSING: if (msgheader->type == RUDP_ACK && msgheader->seqno == p->sn + 1) { p->status = PEER_STATUS_CLOSED; //printf("peer closed\n"); } break; case PEER_STATUS_CLOSED: if (msgheader->type == RUDP_FIN) { p->status = PEER_STATUS_CLOSED; p->sn = msgheader->seqno + 1; replyheader->seqno = msgheader->seqno + 1; replyheader->type = RUDP_ACK; sendto(fd, replyheader, sizeof (struct rudp_hdr), 0, (struct sockaddr*) & client_addr, clientlen); } /* if (msgheader->type == RUDP_SYN) { p->status = PEER_STATUS_ACTIVE; p->sn = msgheader->seqno + 1; replyheader->seqno = msgheader->seqno + 1; replyheader->type = RUDP_ACK; sendto(fd, replyheader, sizeof (struct rudp_hdr), 0, (struct sockaddr*) & client_addr, clientlen); } else if (msgheader->type == RUDP_FIN) { p->status = PEER_STATUS_CLOSED; p->sn = msgheader->seqno + 1; replyheader->seqno = msgheader->seqno + 1; replyheader->type = RUDP_ACK; sendto(fd, replyheader, sizeof (struct rudp_hdr), 0, (struct sockaddr*) & client_addr, clientlen); } */ break; default: fprintf(stderr, "rudp_listen: invalid peer status %d\n", p->status); break; } return 0; } /* * Keep checking if there are packets to be sent out */ int rudp_send(int fd, void *arg) { event_timeout_delete(rudp_send, arg); struct timeval now, interval, t; gettimeofday(&now, NULL); interval.tv_sec = 0; interval.tv_usec = RUDP_CHECK_INTERVAL; timeradd(&now, &interval, &t); rudp_socket_t rs; peer_t p; packet_t pkt; void *msg; int len = sizeof (struct rudp_hdr); //int rsc = 0; if (rs_head == NULL) { return 0; } for (rs = rs_head; rs != NULL; rs = rs->next) { //printf("rudp socket count: %d\n", ++rsc); for (p = rs->peers; p != NULL; p = p->next) { if (p->status == PEER_STATUS_CLOSED) { if (remove_peer(&(rs->peers), p) < 0) { perror("rudp_send: remove_peer"); return -1; } else { event_timeout(t, rudp_send, NULL, "rudp_send"); } return 0; } else if (p->status != PEER_STATUS_ACTIVE) { event_timeout(t, rudp_send, NULL, "rudp_send"); return 0; } //int pc = 0; for (pkt = p->packets; pkt != NULL; pkt = pkt->next) { //printf("pakcet %d: seq number %d status %d\n", ++pc, pkt->header->seqno, pkt->status); if (pkt->status == ACKED) { if (remove_packet(&(p->packets), pkt) == -1) { return -1; } event_timeout(t, rudp_send, NULL, "rudp_send"); return 0; } if ((pkt->status == IN_QUEUE || pkt->status == TIMEOUT) && pkt->header->seqno >= p->sb && pkt->header->seqno <= p->sm) { if (pkt->header->type == RUDP_FIN) { p->status = PEER_STATUS_CLOSING; } len = sizeof (struct rudp_hdr) + pkt->datalen; msg = malloc(len); if (msg == NULL) { perror("rudp_send: malloc"); return -1; } memcpy(msg, pkt->header, sizeof (struct rudp_hdr)); memcpy(msg + sizeof (struct rudp_hdr), pkt->data, pkt->datalen); sendto(rs->sockfd, msg, len, 0, (struct sockaddr*) p->sa, sizeof (struct sockaddr)); pkt->status = SENT; pkt->trans_count++; struct timeval now, interval, t; gettimeofday(&now, NULL); if (RUDP_TIMEOUT >= 1000) { interval.tv_sec = RUDP_TIMEOUT / 1000; interval.tv_usec = 0; } else { interval.tv_sec = 0; interval.tv_usec = RUDP_TIMEOUT * 1000; } timeradd(&now, &interval, &t); event_timeout(t, rudp_timeout, pkt, "rudp_timeout"); } } } } event_timeout(t, rudp_send, NULL, "rudp_send"); return 0; } int rudp_timeout(int a, void *arg) { packet_t pkt = (packet_t) arg; event_timeout_delete(rudp_timeout, pkt); if (pkt->status != ACKED) { //printf("Timeout: %d\n", pkt->header->seqno); pkt->status = TIMEOUT; if (pkt->trans_count >= RUDP_MAXRETRANS) pkt->status = FAILED; } return 0; } /* *rudp_close: Close socket */ int rudp_close(rudp_socket_t rsocket) { struct timeval now, interval, t; gettimeofday(&now, NULL); interval.tv_sec = 0; interval.tv_usec = RUDP_CHECK_INTERVAL; timeradd(&now, &interval, &t); event_timeout(t, rudp_close_session, rsocket, "rudp_close_socket"); return 0; /* peer_t p; packet_t pkt; struct rudp_hdr *hdr; hdr = malloc(sizeof (struct rudp_hdr)); if (hdr == NULL) { perror("rudp_sendto: malloc"); return -1; } pkt = malloc(sizeof (struct packet)); if (pkt == NULL) { perror("rudp_sendto: malloc"); return -1; } for (p = rsocket->peers; p != NULL; p = p->next) { hdr->version = RUDP_VERSION; hdr->type = RUDP_FIN; hdr->seqno = p->sn++; pkt->data = NULL; pkt->datalen = 0; pkt->header = hdr; pkt->next = NULL; pkt->trans_count = 0; pkt->status = IN_QUEUE; if (add_packet(&p->packets, &pkt) < 0) { perror("rudp_sendto: add_packet"); return -1; } } return 0; */ } int rudp_close_session(int fd, void *arg) { event_timeout_delete(rudp_close_session, arg); rudp_socket_t rs = (rudp_socket_t) arg; peer_t p; //printf("closing rudp socket %d\n", rs->sockfd); //printf("peer: %p\n", rs->peers); if (rs->peers == NULL) { //printf("peer null\n"); event_fd_delete(rudp_listen, rs); close(rs->sockfd); rudp_socket_t r, *rr; rr = &rs_head; for (r = rs_head; r != NULL; r = r->next) { if (r == rs) { *rr = r->next; free(r); return 0; } rr = &r->next; } return -1; } //printf("peers not null\n"); for (p = rs->peers; p != NULL; p = p->next) { //check if there are still packets to be sent if (p->packets != NULL) { struct timeval now, interval, t; gettimeofday(&now, NULL); interval.tv_sec = 0; interval.tv_usec = RUDP_CHECK_INTERVAL; timeradd(&now, &interval, &t); event_timeout(t, rudp_close_session, arg, "rudp_close_socket"); return 0; } //printf("no packets in this peer, status %d\n", p->status); if (p->status != PEER_STATUS_CLOSED) { p->status = PEER_STATUS_CLOSING; struct rudp_hdr header; header.seqno = p->sn; header.type = RUDP_FIN; header.version = RUDP_VERSION; sendto(rs->sockfd, &header, sizeof (struct rudp_hdr), 0, (struct sockaddr*) p->sa, sizeof (struct sockaddr_in)); struct timeval now, interval, t; gettimeofday(&now, NULL); if (RUDP_TIMEOUT >= 1000) { interval.tv_sec = RUDP_TIMEOUT / 1000; interval.tv_usec = 0; } else { interval.tv_sec = 0; interval.tv_usec = RUDP_TIMEOUT * 1000; } timeradd(&now, &interval, &t); event_timeout(t, rudp_close_session, arg, "rudp_close_socket"); //printf("FIN sent, check again 2s later\n"); return 0; } //printf("peer status closed\n"); if (remove_peer(&(rs->peers), p) < 0) { perror("rudp_close_session: remove_peer"); return -1; } else { //printf("peer successfully removed\n"); struct timeval now, interval, t; gettimeofday(&now, NULL); interval.tv_sec = 0; interval.tv_usec = RUDP_CHECK_INTERVAL; timeradd(&now, &interval, &t); event_timeout(t, rudp_close_session, arg, "rudp_close_socket"); return 0; } } return 0; } /* *rudp_recvfrom_handler: Register receive callback function */ int rudp_recvfrom_handler(rudp_socket_t rsocket, int (*handler)(rudp_socket_t, struct sockaddr_in *, char *, int)) { event_fd(rsocket->sockfd, rudp_listen, handler, "rudp_listen"); event_fd_delete(rudp_listen, rsocket); return 0; } /* *rudp_event_handler: Register event handler callback function */ int rudp_event_handler(rudp_socket_t rsocket, int (*handler)(rudp_socket_t, rudp_event_t, struct sockaddr_in *)) { struct timeval now, interval, t; gettimeofday(&now, NULL); interval.tv_sec = 0; interval.tv_usec = RUDP_CHECK_INTERVAL; timeradd(&now, &interval, &t); event_timeout(t, rudp_check_status, handler, "rudp_check_status"); return 0; } int rudp_check_status(int fd, void * arg) { event_timeout_delete(rudp_check_status, arg); int (*handler)(rudp_socket_t, rudp_event_t, struct sockaddr_in *); handler = (int (*)(rudp_socket_t, rudp_event_t, struct sockaddr_in *))arg; rudp_socket_t rs; peer_t p; packet_t pkt; for (rs = rs_head; rs != NULL; rs = rs->next) { for (p = rs->peers; p != NULL; p = p->next) { if (p->status == PEER_STATUS_CLOSED) { handler(rs, RUDP_EVENT_CLOSED, p->sa); if (remove_peer(&(rs->peers), p) < 0) return -1; return 0; } for (pkt = p->packets; pkt != NULL; pkt = pkt->next) { if (pkt->status == FAILED) { handler(rs, RUDP_EVENT_TIMEOUT, p->sa); if (remove_peer(&(rs->peers), p) < 0) return -1; return 0; } } } } struct timeval now, interval, t; gettimeofday(&now, NULL); interval.tv_sec = 0; interval.tv_usec = RUDP_CHECK_INTERVAL; timeradd(&now, &interval, &t); event_timeout(t, rudp_check_status, handler, "rudp_check_status"); return 0; } /* * rudp_sendto: Send a block of data to the receiver. */ int rudp_sendto(rudp_socket_t rsocket, void* data, int len, struct sockaddr_in* to) { peer_t p; packet_t pkt; struct rudp_hdr *hdr; hdr = malloc(sizeof (struct rudp_hdr)); if (hdr == NULL) { perror("rudp_sendto: malloc"); return -1; } pkt = malloc(sizeof (struct packet)); if (pkt == NULL) { perror("rudp_sendto: malloc"); return -1; } p = find_peer(&(rsocket->peers), to); if (p == NULL) { perror("rudp_sendto: find_peer"); return -1; } if (p->status == PEER_STATUS_INIT) { rudp_setup_session(0, p); } hdr = malloc(sizeof (struct rudp_hdr)); if (hdr == NULL) { perror("rudp_sendto: malloc"); return -1; } pkt = malloc(sizeof (struct packet)); if (pkt == NULL) { perror("rudp_sendto: malloc"); return -1; } hdr->version = RUDP_VERSION; hdr->type = RUDP_DATA; hdr->seqno = p->sn++; pkt->data = malloc(len); if (pkt->data == NULL) { perror("rudp_sendto: malloc"); return -1; } memcpy(pkt->data, data, len); pkt->datalen = len; pkt->header = hdr; pkt->next = NULL; pkt->trans_count = 0; pkt->status = IN_QUEUE; //printf("adding packet %d to %d\n", pkt->header->seqno, ntohs(p->sa->sin_port)); if (add_packet(&p->packets, &pkt) < 0) { perror("rudp_sendto: add_packet"); return -1; } return 0; } int rudp_setup_session(int fd, void *arg) { event_timeout_delete(rudp_setup_session, arg); peer_t p = (peer_t) arg; if (p == NULL) { perror("rudp_setup_session: error finding peer"); return -1; } int sockfd; rudp_socket_t rs; peer_t pp; int flag = 0; for (rs = rs_head; rs != NULL; rs = rs->next) { for (pp = rs->peers; pp != NULL; pp = pp->next) { if (pp == p) { sockfd = rs->sockfd; flag = 1; break; } } if (flag == 1) break; } struct rudp_hdr header; header.seqno = p->sn++; header.type = RUDP_SYN; header.version = RUDP_VERSION; sendto(sockfd, &header, sizeof (struct rudp_hdr), 0, (struct sockaddr*) p->sa, sizeof (struct sockaddr_in)); p->status = PEER_STATUS_PASSIVE; struct timeval now, interval, t; gettimeofday(&now, NULL); if (RUDP_TIMEOUT >= 1000) { interval.tv_sec = RUDP_TIMEOUT / 1000; interval.tv_usec = 0; } else { interval.tv_sec = 0; interval.tv_usec = RUDP_TIMEOUT * 1000; } timeradd(&now, &interval, &t); event_timeout(t, rudp_session_check, arg, "rudp_timeout"); gettimeofday(&now, NULL); interval.tv_sec = 0; interval.tv_usec = RUDP_CHECK_INTERVAL; timeradd(&now, &interval, &t); event_timeout(t, rudp_send, NULL, "rudp_send"); return 0; } int rudp_session_check(int fd, void *arg) { event_timeout_delete(rudp_session_check, arg); peer_t p = (peer_t) arg; if (p == NULL) { perror("rudp_setup_session: error finding peer"); return -1; } if (p->status == PEER_STATUS_PASSIVE) { struct timeval now, interval, t; gettimeofday(&now, NULL); interval.tv_usec = 0; interval.tv_sec = 0; timeradd(&now, &interval, &t); event_timeout(t, rudp_setup_session, arg, "rudp_setup_session"); } return 0; }
Comments