Commit cd5ee4fa authored by xpetrak2's avatar xpetrak2 Committed by xHire
Browse files

Timeout-based disconnections

parent a76b0139
......@@ -4,7 +4,8 @@ bin_PROGRAMS = src/coincerd src/coincer
src_coincerd_SOURCES = \
src/coincerd.c \
src/p2p.c src/p2p.h
src/p2p.c src/p2p.h \
src/neighbours.c src/neighbours.h
src_coincerd_CFLAGS = $(AM_CFLAGS) $(JANSSON_CFLAGS) $(LIBEVENT_CFLAGS) $(LIBSODIUM_CFLAGS)
src_coincerd_LDADD = $(AM_LDADD) $(JANSSON_LIBS) $(LIBEVENT_LIBS) $(LIBSODIUM_LIBS)
......
......@@ -17,7 +17,9 @@
*/
#include <event2/event.h>
#include <event2/listener.h>
#include "neighbours.h"
#include "p2p.h"
int main(void)
......@@ -40,14 +42,24 @@ int main(void)
* - terminate on SIGTERM
*/
struct event_base *event_loop;
struct evconnlistener *listener;
struct Loop_Data loop_data;
/* To store the return values of setup functions */
int ret;
if ((ret = listen_init(&event_loop))) {
loop_data.event_loop = NULL;
neighbours_init(&loop_data.neighbours);
if ((ret = listen_init(&listener, &loop_data)) != 0) {
return ret;
}
event_base_dispatch(event_loop);
event_base_dispatch(loop_data.event_loop);
event_base_free(loop_data.event_loop);
evconnlistener_free(listener);
clear_neighbours(&loop_data.neighbours);
return 0;
}
#include <assert.h>
#include <event2/bufferevent.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "neighbours.h"
void neighbours_init(struct Neighbours *neighbours)
{
neighbours->first = neighbours->last = NULL;
neighbours->size = 0;
}
struct Neighbour *find_neighbour(const struct Neighbours *neighbours,
const struct bufferevent *bev)
{
struct Neighbour *current = neighbours->first;
while (current != NULL) {
if (current->buffer_event == bev) {
return current;
}
current = current->next;
}
return NULL;
}
struct Neighbour *find_neighbour_by_ip(const struct Neighbours *neighbours,
const char *ip_addr)
{
struct Neighbour *current = neighbours->first;
while (current != NULL) {
if (strcmp(current->ip_addr, ip_addr) == 0) {
return current;
}
current = current->next;
}
return NULL;
}
struct Neighbour *add_new_neighbour(struct Neighbours *neighbours,
const char *ip_addr,
struct bufferevent *bev)
{
struct Neighbour *new_neighbour = (struct Neighbour *)
malloc(sizeof(struct Neighbour));
if (find_neighbour_by_ip(neighbours, ip_addr)) {
return NULL;
}
assert(new_neighbour != NULL);
strcpy(new_neighbour->ip_addr, ip_addr);
new_neighbour->failed_pings = 0;
new_neighbour->next = NULL;
new_neighbour->buffer_event = bev;
if (neighbours->last == NULL) {
neighbours->first = neighbours->last = new_neighbour;
} else {
neighbours->last->next = new_neighbour;
neighbours->last = new_neighbour;
}
neighbours->size += 1;
return new_neighbour;
}
void delete_neighbour(struct Neighbours *neighbours, struct bufferevent *bev)
{
struct Neighbour *current = neighbours->first;
struct Neighbour *neighbour = find_neighbour(neighbours, bev);
if (current == NULL || neighbour == NULL) {
return;
}
if (neighbour->buffer_event != NULL) {
bufferevent_free(bev);
}
if (neighbours->first == neighbour) {
if (neighbours->first == neighbours->last) {
neighbours->first = neighbours->last = NULL;
} else {
neighbours->first = neighbours->first->next;
}
free(neighbour);
neighbours->size = 0;
return;
}
while (current != NULL) {
if (current->next == neighbour) {
current->next = neighbour->next;
if (current->next == NULL) {
neighbours->last = current;
}
free(neighbour);
neighbours->size -= 1;
break;
}
current = current->next;
}
}
void clear_neighbours(struct Neighbours *neighbours)
{
struct Neighbour *current = neighbours->first;
while (current != NULL) {
struct Neighbour *temp = current;
current = current->next;
if (temp->buffer_event != NULL) {
bufferevent_free(temp->buffer_event);
}
free(temp);
}
neighbours_init(neighbours);
}
#ifndef NEIGHBOURS_H
#define NEIGHBOURS_H
#include <event2/bufferevent.h>
#include <stddef.h>
/* data type for the linked list of neighbours */
struct Neighbour {
/* neighbours's IPv6 address */
char ip_addr[46];
/* bufferevent belonging to this neighbour */
struct bufferevent *buffer_event;
/* number of failed ping attempts - max 3, then disconnect */
size_t failed_pings;
/* next Neighbour in the linked list - at struct Neighbours */
struct Neighbour *next;
};
/* linked list of Neighbours */
struct Neighbours {
/* number of neighbours we are connected to */
size_t size;
struct Neighbour *first;
struct Neighbour *last;
};
/* set linked list variables to their default values */
void neighbours_init(struct Neighbours *neighbours);
/* find neighbour in neighbours based on their buffer_event,
returns NULL if not found */
struct Neighbour *find_neighbour(const struct Neighbours *neighbours,
const struct bufferevent *bev);
/* find neighbour in neighbours based on their ip_addr,
returns NULL if not found */
struct Neighbour *find_neighbour_by_ip(const struct Neighbours *neighbours,
const char *ip_addr);
/* add new neighbour into neighbours
returns NULL if neighbour already exists */
struct Neighbour* add_new_neighbour(struct Neighbours *neighbours,
const char *ip_addr,
struct bufferevent *bev);
/* delete neighbour from neighbours */
void delete_neighbour(struct Neighbours *neighbours,
struct bufferevent *bev);
/* delete all neighbours */
void clear_neighbours(struct Neighbours *neighbours);
#endif
......@@ -33,48 +33,102 @@
*
* @brief Callback for reading an input buffer
* @param bev buffer to read data from
* @param ctx optional programmer-defined data to be passed into this
* callback
* @param ctx struct Loop_Data *
*/
static void p2p_process(struct bufferevent *bev,
void *ctx __attribute__((unused)))
static void p2p_process(struct bufferevent *bev, void *ctx)
{
/* Using this we can know who's sent us the message and
reset their amount of failed pings */
struct Loop_Data *loop_data = (struct Loop_Data *) ctx;
/* Find the neighbour based on their buffer event */
struct Neighbour *neighbour =
find_neighbour(&loop_data->neighbours, bev);
/* Read from input buffer, write to output buffer */
struct evbuffer *input = bufferevent_get_input(bev);
struct evbuffer *output = bufferevent_get_output(bev);
size_t len = evbuffer_get_length(input);
char *data = (char *) malloc(len * (sizeof(char) + 1));
/* FOR TESTING PURPOSES */
/* Display the input data */
size_t len = evbuffer_get_length(input);
char *data = (char *) malloc(len * sizeof(char));
evbuffer_copyout(input, data, len);
/* Drain input buffer into data */
if (evbuffer_remove(input, data, len) == -1) {
free(data);
return;
} else {
data[len] = '\0';
}
if (neighbour == NULL) {
free(data);
return;
}
neighbour->failed_pings = 0;
printf("Sending back: %s", data);
/* Copy string data to the output buffer */
evbuffer_add_printf(output, "%s", data);
free(data);
}
static void timeout_process(struct Neighbours *neighbours,
struct Neighbour *neighbour)
{
if (neighbour->failed_pings < 3) {
/* bufferevent was disabled when timeout flag was set */
bufferevent_enable(neighbour->buffer_event,
EV_READ | EV_WRITE | EV_TIMEOUT);
/* Copy all the data from the input buffer to the output buffer */
evbuffer_add_buffer(output, input);
printf("Sending ping to %s. Failed pings: %lu\n",
neighbour->ip_addr, neighbour->failed_pings);
/* TODO/FIXME: this is wrong PoC: the input buffer should be discarded
* and the output one filled with our data
*/
/* Send ping to the inactive neighbour.
5 is the length of bytes to be transmitted */
evbuffer_add(bufferevent_get_output(neighbour->buffer_event),
"ping", 5);
neighbour->failed_pings++;
} else {
printf("3 failed pings. Removing %s from neighbours\n",
neighbour->ip_addr);
delete_neighbour(neighbours, neighbour->buffer_event);
}
}
/**
* @brief Callback for bufferevent event detection
* @param bev bufferevent on which the event occured
* @param events flags of the events occured
* @param ctx optional programmer-defined data to be passed into this
* callback
* @param ctx struct Loop_Data *
*/
static void event_cb(struct bufferevent *bev, short events,
void *ctx __attribute__((unused)))
static void event_cb(struct bufferevent *bev, short events, void *ctx)
{
struct Loop_Data *loop_data = (struct Loop_Data *) ctx;
struct Neighbour *neighbour =
find_neighbour(&loop_data->neighbours, bev);
if (neighbour == NULL) {
return;
}
if (events & BEV_EVENT_ERROR) {
fprintf(stderr, "Error from bufferevent");
perror("Error from bufferevent");
printf("Removing %s from neighbours.\n", neighbour->ip_addr);
delete_neighbour(&loop_data->neighbours, bev);
}
if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
bufferevent_free(bev);
if (events & (BEV_EVENT_EOF)) {
printf("%s has disconnected.\n", neighbour->ip_addr);
delete_neighbour(&loop_data->neighbours, bev);
}
if (events & BEV_EVENT_TIMEOUT) {
/* Bufferevent bev received timout event */
timeout_process(&loop_data->neighbours, neighbour);
}
}
......@@ -98,61 +152,85 @@ static void ip_to_string(struct sockaddr *addr, char *ip)
* @param fd file descriptor for the new connection
* @param address routing information
* @param socklen size of address
* @param ctx optional programmer-defined data to be passed
* into this callback; is NULL
* @param ctx struct Loop_Data *
*/
static void accept_connection(struct evconnlistener *listener,
evutil_socket_t fd, struct sockaddr *addr,
int socklen __attribute__((unused)),
void *ctx __attribute__((unused)))
void *ctx)
{
/* to display IP address of the other peer */
char ip[INET6_ADDRSTRLEN + 1]; /* NOTE: not sure if +1 is needed... */
/* We need to add the new connection to the list
of our neighbours */
struct Loop_Data *loop_data = (struct Loop_Data *) ctx;
struct timeval write_timeout;
struct timeval read_timeout;
/* To display IP address of the other peer */
char ip[INET6_ADDRSTRLEN];
/* Set up a bufferevent for a new connection */
struct event_base *base = evconnlistener_get_base(listener);
struct bufferevent *bev = bufferevent_socket_new(base, fd,
BEV_OPT_CLOSE_ON_FREE);
struct bufferevent *bev =
bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
struct sockaddr_in6 *addr_in6 = (struct sockaddr_in6 *) addr;
/* subscribe every received P2P message to be processed */
bufferevent_setcb(bev, p2p_process, NULL, event_cb, NULL);
bufferevent_setcb(bev, p2p_process, NULL, event_cb, ctx);
bufferevent_enable(bev, EV_READ | EV_WRITE | EV_TIMEOUT);
/* after READ_TIMEOUT or WRITE_TIMEOUT seconds invoke event_cb */
write_timeout.tv_sec = WRITE_TIMEOUT;
write_timeout.tv_usec = 0;
bufferevent_enable(bev, EV_READ | EV_WRITE);
read_timeout.tv_sec = READ_TIMEOUT;
read_timeout.tv_usec = 0;
bufferevent_set_timeouts(bev, &read_timeout, &write_timeout);
ip_to_string(addr, ip);
/* Add the new connection to the list of our neighbours */
if (!add_new_neighbour(&loop_data->neighbours, ip, bev)) {
bufferevent_free(bev);
return;
}
printf("New connection from [%s]:%d\n", ip, ntohs(addr_in6->sin6_port));
}
/**
* @brief Callback for listener error detection
* @param listener listener on which the error occured
* @param ctx optional programmer-defined data to be passed
* into this callback
* @param ctx struct Loop_Data *
*/
static void accept_error_cb(struct evconnlistener *listener,
void *ctx __attribute__((unused)))
void *ctx)
{
struct event_base *base = evconnlistener_get_base(listener);
struct Loop_Data *loop_data = (struct Loop_Data *) ctx;
int err = EVUTIL_SOCKET_ERROR();
fprintf(stderr, "Got an error %d (%s) on the listener. "
"Shutting down.\n", err, evutil_socket_error_to_string(err));
event_base_loopexit(base, NULL);
clear_neighbours(&loop_data->neighbours);
}
/**
* @brief Initialize listening and set up callbacks
*
* @param base The event loop
* @param listener The even loop listener
* @param loop_data Data for the event loop to work with
*
* @return 1 if an error occured
* @return 0 if successfully initialized
*/
int listen_init(struct event_base **base)
int listen_init(struct evconnlistener **listener,
struct Loop_Data *loop_data)
{
struct evconnlistener *listener;
struct event_base **base = &loop_data->event_loop;
struct sockaddr_in6 sock_addr;
int port = DEFAULT_PORT;
......@@ -169,16 +247,18 @@ int listen_init(struct event_base **base)
sock_addr.sin6_addr = in6addr_any;
sock_addr.sin6_port = htons(port);
listener = evconnlistener_new_bind(*base, accept_connection, NULL,
*listener = evconnlistener_new_bind(*base, accept_connection, loop_data,
LEV_OPT_CLOSE_ON_FREE |
LEV_OPT_REUSEABLE, -1,
(struct sockaddr *) &sock_addr,
sizeof(sock_addr));
if (!listener) {
if (!*listener) {
perror("Couldn't create listener: evconnlistener_new_bind");
return 1;
}
evconnlistener_set_error_cb(listener, accept_error_cb);
evconnlistener_set_error_cb(*listener, accept_error_cb);
return 0;
}
......@@ -21,8 +21,29 @@
#include <event2/event.h>
#include "neighbours.h"
#define DEFAULT_PORT 31070
/* after (READ/WRITE)_TIMEOUT seconds invoke timeout callback */
#define READ_TIMEOUT 30
#define WRITE_TIMEOUT 30
/* event loop will work with the data stored in an instance of this struct */
struct Loop_Data {
struct event_base *event_loop;
struct Neighbours neighbours;
};
int listen_init(struct event_base **base);
/**
* @brief Initialize listening and set up callbacks
*
* @param listener The even loop listener
* @param loop_data Data for the event loop to work with
*
* @return 1 if an error occured
* @return 0 if successfully initialized
*/
int listen_init(struct evconnlistener **listener,
struct Loop_Data *loop_data);
#endif /* P2P_H */
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment