Commit 4fa80e03 authored by xpetrak2's avatar xpetrak2 Committed by xHire
Browse files

p2p: Implemented disconnecting and a few minor adjustments

parent d0ca0fdd
......@@ -26,155 +26,190 @@
#include <stdlib.h>
#include <string.h>
#include "linkedlist.h"
#include "neighbours.h"
#include "p2p.h"
/**
* Simple helper for conversion of binary IP to readable IP address.
*
* @param binary_ip Binary represented IP address.
* @param ip Readable IP address.
*/
static void ip_to_string(unsigned char *binary_ip, char *ip)
{
inet_ntop(AF_INET6, binary_ip, ip, INET6_ADDRSTRLEN);
}
/**
* Processing a P2P message.
*
* @brief Callback for reading an input buffer
* @param bev buffer to read data from
* @param ctx struct s_global_state *
* @param bev Buffer to read data from.
* @param ctx Pointer to global_state_t to process message sender.
*/
static void p2p_process(struct bufferevent *bev, void *ctx)
{
/* Using this we can know who's sent us the message and
reset their amount of failed pings */
struct s_global_state *global_state = (struct s_global_state *) ctx;
char *data;
global_state_t *global_state;
struct evbuffer *input;
size_t len;
neighbour_t *neighbour;
struct evbuffer *output;
/* Find the neighbour based on their buffer event */
struct Neighbour *neighbour =
find_neighbour(&global_state->neighbours, bev);
global_state = (global_state_t *) ctx;
/* Read from input buffer, write to output buffer */
struct evbuffer *input = bufferevent_get_input(bev);
struct evbuffer *output = bufferevent_get_output(bev);
/* find the neighbour based on their buffer event */
neighbour = find_neighbour(&global_state->neighbours, bev);
size_t len = evbuffer_get_length(input);
char *data = (char *) malloc(len * (sizeof(char) + 1));
/* message from unknown neighbour; quit p2p_process */
if (neighbour == NULL) {
return;
}
/* refresh neighbour's failed pings */
neighbour->failed_pings = 0;
/* read from the input buffer, write to output buffer */
input = bufferevent_get_input(bev);
output = bufferevent_get_output(bev);
/* get length of the input messaage */
len = evbuffer_get_length(input);
/* allocate memory for the input message including '\0' */
data = (char *) malloc((len + 1) * sizeof(char));
/* FOR TESTING PURPOSES */
/* Drain input buffer into data */
/* drain input buffer into data; -1 if evbuffer_remove failed */
if (evbuffer_remove(input, data, len) == -1) {
free(data);
return;
} else {
data[len] = '\0';
}
if (neighbour == NULL) {
free(data);
return;
}
neighbour->failed_pings = 0;
}
printf("Sending back: %s", data);
/* Copy string data to the output buffer */
/* copy string data to the output buffer */
evbuffer_add_printf(output, "%s", data);
/* deallocate input message */
free(data);
}
static void timeout_process(struct Neighbours *neighbours,
struct Neighbour *neighbour)
/**
* Process the timeout invoked by event_cb.
*
* @param neighbours Linked list of neighbours.
* @param neighbour Timeout invoked on this neighbour.
*/
static void timeout_process(linkedlist_t *neighbours,
neighbour_t *neighbour)
{
char text_ip[INET6_ADDRSTRLEN];
/* initialize text_ip */
ip_to_string(neighbour->ip_addr, text_ip);
/* the neighbour hasn't failed enough pings to be deleted */
if (neighbour->failed_pings < 3) {
/* bufferevent was disabled when timeout flag was set */
bufferevent_enable(neighbour->buffer_event,
EV_READ | EV_WRITE | EV_TIMEOUT);
printf("Sending ping to %s. Failed pings: %lu\n",
neighbour->ip_addr, neighbour->failed_pings);
text_ip, neighbour->failed_pings);
/* Send ping to the inactive neighbour.
5 is the length of bytes to be transmitted */
/* send ping to the inactive neighbour;
* 5 is the length of bytes to be transmitted
*/
evbuffer_add(bufferevent_get_output(neighbour->buffer_event),
"ping", 5);
neighbour->failed_pings++;
} else {
printf("3 failed pings. Removing %s from neighbours\n",
neighbour->ip_addr);
text_ip);
delete_neighbour(neighbours, neighbour->buffer_event);
}
}
/**
* @brief Callback for bufferevent event detection
* @param bev bufferevent on which the event occured
* @param events flags of the events occured
* @param ctx struct s_global_state *
* Callback for bufferevent event detection.
*
* @param bev bufferevent on which the event occured.
* @param events Flags of the events occured.
* @param ctx Pointer to global_state_t to determine the neighbour.
*/
static void event_cb(struct bufferevent *bev, short events, void *ctx)
{
struct s_global_state *global_state = (struct s_global_state *) ctx;
struct Neighbour *neighbour =
find_neighbour(&global_state->neighbours, bev);
neighbour_t *neighbour;
char text_ip[INET6_ADDRSTRLEN];
global_state_t *global_state = (global_state_t *) ctx;
/* find neighbour with 'bev' */
neighbour = find_neighbour(&global_state->neighbours, bev);
/* unknown neighbour; release 'bev' and stop processing the event */
if (neighbour == NULL) {
bufferevent_free(bev);
return;
}
/* initialize text_ip */
ip_to_string(neighbour->ip_addr, text_ip);
if (events & BEV_EVENT_ERROR) {
perror("Error from bufferevent");
printf("Removing %s from neighbours.\n", neighbour->ip_addr);
printf("%s was removed\n", text_ip);
delete_neighbour(&global_state->neighbours, bev);
return;
}
if (events & (BEV_EVENT_EOF)) {
printf("%s has disconnected.\n", neighbour->ip_addr);
printf("%s disconnected\n", text_ip);
delete_neighbour(&global_state->neighbours, bev);
return;
}
/* timeout flag on 'bev' */
if (events & BEV_EVENT_TIMEOUT) {
/* Bufferevent bev received timout event */
timeout_process(&global_state->neighbours, neighbour);
}
}
/**
* Simple helper for conversion of sockaddr to a textual IP address.
*
* @param addr sockaddr data
* @param ip Destination string buffer
*/
static void ip_to_string(struct sockaddr *addr, char *ip)
{
struct sockaddr_in6 *addr_in6 = (struct sockaddr_in6 *) addr;
inet_ntop(AF_INET6, &addr_in6->sin6_addr, ip, INET6_ADDRSTRLEN);
}
/**
* Callback function. It handles accepting new connections.
* Callback function for accepting new connections.
*
* @brief Callback for accepting a new connection
* @param listener incoming connection
* @param fd file descriptor for the new connection
* @param address routing information
* @param socklen size of address
* @param ctx struct s_global_state *
* @param listener Incoming connection.
* @param fd File descriptor for the new connection.
* @param address Routing information.
* @param socklen The size of the address.
* @param ctx Pointer to global_state_t to add new neighbour.
*/
static void accept_connection(struct evconnlistener *listener,
evutil_socket_t fd, struct sockaddr *addr,
int socklen __attribute__((unused)),
void *ctx)
{
/* We need to add the new connection to the list of our neighbours */
struct s_global_state *global_state = (struct s_global_state *) ctx;
struct timeval write_timeout;
struct timeval read_timeout;
/* To display IP address of the other peer */
char ip[INET6_ADDRSTRLEN];
/* Bufferevent for a new connection */
struct event_base *base;
struct bufferevent *bev;
struct event_base *base;
struct bufferevent *bev;
unsigned char inet6_ip[sizeof(struct in6_addr)];
struct timeval read_timeout;
char text_ip[INET6_ADDRSTRLEN];
struct timeval write_timeout;
global_state_t *global_state = (struct s_global_state *) ctx;
struct sockaddr_in6 *addr_in6 = (struct sockaddr_in6 *) addr;
/* Set up the bufferevent for a new connection */
/* get the event_base */
base = evconnlistener_get_base(listener);
bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
struct sockaddr_in6 *addr_in6 = (struct sockaddr_in6 *) addr;
/* setup a bufferevent for the new connection */
bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
/* subscribe every received P2P message to be processed */
bufferevent_setcb(bev, p2p_process, NULL, event_cb, ctx);
......@@ -190,46 +225,60 @@ static void accept_connection(struct evconnlistener *listener,
bufferevent_set_timeouts(bev, &read_timeout, &write_timeout);
ip_to_string(addr, ip);
/* put binary representation of IP to inet6_ip */
memcpy(inet6_ip, addr_in6->sin6_addr.s6_addr, sizeof(struct in6_addr));
/* Add the new connection to the list of our neighbours */
if (!add_new_neighbour(&global_state->neighbours, ip, bev)) {
/* add the new connection to the list of our neighbours */
if (!add_new_neighbour(&global_state->neighbours,
inet6_ip,
bev)) {
/* free the bufferevent if adding failed */
bufferevent_free(bev);
return;
}
printf("New connection from [%s]:%d\n", ip, ntohs(addr_in6->sin6_port));
ip_to_string(inet6_ip, text_ip);
printf("New connection from [%s]:%d\n", text_ip,
ntohs(addr_in6->sin6_port));
}
/**
* @brief Callback for listener error detection
* @param listener listener on which the error occured
* @param ctx struct s_global_state *
* Callback for listener error detection.
*
* @param listener Listener on which the error occured.
* @param ctx Pointer to global_state_t.
*/
static void accept_error_cb(struct evconnlistener *listener,
void *ctx)
{
struct event_base *base = evconnlistener_get_base(listener);
struct s_global_state *global_state = (struct s_global_state *) ctx;
global_state_t *global_state = (struct s_global_state *) ctx;
int err = EVUTIL_SOCKET_ERROR();
fprintf(stderr, "Got an error %d (%s) on the listener. "
"Shutting down.\n", err, evutil_socket_error_to_string(err));
/* WIP */
/* stop the event loop */
event_base_loopexit(base, NULL);
/* delete neighbours */
clear_neighbours(&global_state->neighbours);
}
/**
* @brief Initialize listening and set up callbacks
* Initialize listening and set up callbacks.
*
* @param listener The even loop listener
* @param global_state Data for the event loop to work with
* @param listener The even loop listener.
* @param global_state Data for the event loop to work with.
*
* @return 1 if an error occured
* @return 0 if successfully initialized
* @return 0 if successfully initialized.
* @return 1 if an error occured.
*/
int listen_init(struct evconnlistener **listener,
struct s_global_state *global_state)
global_state_t *global_state)
{
struct event_base **base = &global_state->event_loop;
struct sockaddr_in6 sock_addr;
......@@ -248,7 +297,8 @@ int listen_init(struct evconnlistener **listener,
sock_addr.sin6_addr = in6addr_any;
sock_addr.sin6_port = htons(port);
*listener = evconnlistener_new_bind(*base, accept_connection, global_state,
*listener = evconnlistener_new_bind(*base, accept_connection,
global_state,
LEV_OPT_CLOSE_ON_FREE |
LEV_OPT_REUSEABLE, -1,
(struct sockaddr *) &sock_addr,
......
......@@ -21,28 +21,21 @@
#include <event2/event.h>
#include "neighbours.h"
#include "linkedlist.h"
#define DEFAULT_PORT 31070
/* after (READ/WRITE)_TIMEOUT seconds invoke timeout callback */
#define READ_TIMEOUT 30
#define WRITE_TIMEOUT 30
/* event loop will work with the data stored in an instance of this struct */
struct s_global_state {
struct event_base *event_loop;
struct s_neighbours neighbours;
};
/**
* @brief Initialize listening and set up callbacks
*
* @param listener The even loop listener
* @param global_state Data for the event loop to work with
*
* @return 0 if successfully initialized
* @return 1 if an error occured
* Event loop works with the data stored in an instance of this struct.
*/
typedef struct s_global_state {
struct event_base *event_loop; /**< For holding and polling events. */
linkedlist_t neighbours; /**< Linked list of our neighbours. */
} global_state_t;
int listen_init(struct evconnlistener **listener,
struct s_global_state *global_state);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment