Commit 0b811dce authored by xpetrak2's avatar xpetrak2
Browse files

The peers interconnecting and signal callback

* SIGINT is now being handled
* the peers can connect to each other now
parent 978d3a36
......@@ -18,9 +18,18 @@
#include <event2/event.h>
#include <event2/listener.h>
#include <signal.h>
#include <stdlib.h>
#include <time.h>
#include "filing.h"
#include "log.h"
#include "neighbours.h"
#include "p2p.h"
#include "peers.h"
static void signal_cb(evutil_socket_t fd, short events, void *ctx);
static void conns_cb(evutil_socket_t fd, short events, void *ctx);
int main(void)
{
......@@ -42,23 +51,120 @@ int main(void)
* - terminate on SIGTERM
*/
struct evconnlistener *listener;
struct s_global_state global_state;
struct event *conns_event;
struct timeval conns_interval;
global_state_t global_state;
struct evconnlistener *listener;
struct event *sigint_event;
time_t t;
global_state.event_loop = NULL;
neighbours_init(&global_state.neighbours);
srand((unsigned) time(&t));
if (listen_init(&listener, &global_state) != 0) {
/* initialize all global_state variables */
global_state.event_loop = NULL;
if (setup_paths(&global_state.filepaths)) {
return 1;
} else if (joining_init(&global_state) != 0) {
}
linkedlist_init(&global_state.pending_neighbours);
linkedlist_init(&global_state.neighbours);
linkedlist_init(&global_state.peers);
if (fetch_peers(global_state.filepaths.peers, &global_state.peers)) {
return 2;
}
/* setup everything needed for TCP listening */
if (listen_init(&listener, &global_state) != 0) {
return 3;
}
/* register SIGINT event to its callback */
sigint_event = evsignal_new(global_state.event_loop,
SIGINT,
signal_cb,
&global_state);
if (!sigint_event || event_add(sigint_event, NULL) < 0) {
log_error("main - couldn't create or add SIGINT event");
return 4;
}
/* setup a function that actively checks the number of neighbours */
conns_interval.tv_sec = 10;
conns_interval.tv_usec = 0;
conns_event = event_new(global_state.event_loop,
-1,
EV_PERSIST,
conns_cb,
&global_state);
if (!conns_event || event_add(conns_event, &conns_interval) < 0) {
log_error("main - couldn't create or add conns_event");
return 4;
}
/* initiate joining the coincer network */
add_more_connections(&global_state, MIN_NEIGHBOURS);
/* start the event loop */
event_base_dispatch(global_state.event_loop);
event_base_free(global_state.event_loop);
evconnlistener_free(listener);
/* SIGINT received, loop terminated; the clean-up part */
clear_neighbours(&global_state.pending_neighbours);
clear_neighbours(&global_state.neighbours);
/* store peers into 'peers' file before cleaning them */
store_peers(global_state.filepaths.peers, &global_state.peers);
clear_peers(&global_state.peers);
clear_paths(&global_state.filepaths);
evconnlistener_free(listener);
event_free(sigint_event);
event_free(conns_event);
event_base_free(global_state.event_loop);
printf("\nCoincer says: Bye!\n");
return 0;
}
/**
* Callback function for a received signal.
*
* @param signal What signal was invoked.
* @param events Flags of the event occured.
* @param ctx Global state.
*/
static void signal_cb(evutil_socket_t signal __attribute__((unused)),
short events __attribute__((unused)),
void *ctx)
{
global_state_t *global_state = (global_state_t *) ctx;
event_base_loopexit(global_state->event_loop, NULL);
}
/**
* Actively check the number of neighbours and add more if needed.
*
* @param fd File descriptor.
* @param events Event flags.
* @param ctx Global state.
*/
static void conns_cb(int fd __attribute__((unused)),
short events __attribute__((unused)),
void *ctx)
{
int needed_conns;
global_state_t *global_state = (global_state_t *) ctx;
needed_conns = MIN_NEIGHBOURS -
linkedlist_size(&global_state->neighbours);
if (needed_conns > 0) {
/* ask twice more peers than we need; it's preferable
* to have more neighbours than minimum
*/
log_debug("conns_cb - we need %d more neighbours");
add_more_connections(global_state, 2 * needed_conns);
}
}
......@@ -16,7 +16,11 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _POSIX_SOURCE /* strtok_r */
#include <arpa/inet.h> /* inet_ntop */
#include <assert.h>
#include <errno.h>
#include <event2/listener.h>
#include <event2/bufferevent.h>
......@@ -25,10 +29,13 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h> /* sleep */
#include "linkedlist.h"
#include "log.h"
#include "neighbours.h"
#include "p2p.h"
#include "peers.h"
/**
* Simple helper for conversion of binary IP to readable IP address.
......@@ -36,11 +43,54 @@
* @param binary_ip Binary represented IP address.
* @param ip Readable IP address.
*/
static void ip_to_string(const unsigned char *binary_ip, char *ip)
static void ip_to_string(const struct in6_addr *binary_ip, char *ip)
{
inet_ntop(AF_INET6, binary_ip, ip, INET6_ADDRSTRLEN);
}
/**
* Ask the neighbour for a list of peers.
*
* @param global_state Data for the event loop to work with.
* @param neighbour The neighbour to be asked.
*/
void ask_for_peers(neighbour_t *neighbour)
{
struct bufferevent *bev;
if (neighbour == NULL || (bev = neighbour->buffer_event) == NULL) {
return;
}
/* send message "peers" to the neighbour, as a request
* for the list of peers; 6 is the number of bytes to be transmitted
*/
evbuffer_add(bufferevent_get_output(bev), "peers", 6);
}
/**
* Process received list of peer addresses.
*
* @param global_state Data for the event loop to work with.
* @param peers '\n'-separated list of peer addresses.
*/
static void process_peers(global_state_t *global_state, char *peers)
{
struct in6_addr addr;
const char delim[2] = "\n";
char *line;
char *save_ptr;
line = strtok_r(peers, delim, &save_ptr);
while (line != NULL) {
if (inet_pton(AF_INET6, line, &addr) == 1) {
add_peer(&global_state->peers, &addr);
}
line = strtok_r(NULL, delim, &save_ptr);
}
}
/**
* Processing a P2P message.
*
......@@ -55,7 +105,7 @@ static void p2p_process(struct bufferevent *bev, void *ctx)
char *message;
neighbour_t *neighbour;
struct evbuffer *output;
char response[256]; /* TODO: adjust size */
char response[2048]; /* TODO: adjust size */
char text_ip[INET6_ADDRSTRLEN];
global_state = (global_state_t *) ctx;
......@@ -63,12 +113,9 @@ static void p2p_process(struct bufferevent *bev, void *ctx)
/* find the neighbour based on their bufferevent */
neighbour = find_neighbour(&global_state->neighbours, bev);
/* message from unknown neighbour; quit p2p_process */
if (neighbour == NULL) {
return;
}
assert(neighbour != NULL);
/* refresh neighbour's failed pings */
/* reset neighbour's failed pings */
neighbour->failed_pings = 0;
/* read from the input buffer, write to output buffer */
......@@ -89,18 +136,26 @@ static void p2p_process(struct bufferevent *bev, void *ctx)
message[len] = '\0';
}
ip_to_string(neighbour->ip_addr, text_ip);
printf("Received: %s from %s\n", message, text_ip);
ip_to_string(&neighbour->addr, text_ip);
log_debug("p2p_process - received: %s from %s", message, text_ip);
response[0] = '\0';
/* TODO: Replace with JSON messages */
if (strcmp(message, "ping") == 0) {
strcpy(response, "pong");
/* ignore "pong" */
} else if (strcmp(message, "pong") == 0) {
/* "peers" is a request for list of addresses */
} else if (strcmp(message, "peers") == 0) {
peers_to_str(&global_state->peers, response);
/* list of peers */
} else {
strcpy(response, "");
process_peers(global_state, message);
}
if (strcmp(response, "") != 0) {
printf("Responding with: %s\n", response);
if (response[0] != '\0') {
log_debug("p2p_process - responding with: %s", response);
/* copy response to the output buffer */
evbuffer_add_printf(output, "%s", response);
}
......@@ -121,7 +176,7 @@ static void timeout_process(linkedlist_t *neighbours,
char text_ip[INET6_ADDRSTRLEN];
/* initialize text_ip */
ip_to_string(neighbour->ip_addr, text_ip);
ip_to_string(&neighbour->addr, text_ip);
/* the neighbour hasn't failed enough pings to be deleted */
if (neighbour->failed_pings < 3) {
......@@ -130,9 +185,9 @@ static void timeout_process(linkedlist_t *neighbours,
bufferevent_enable(neighbour->buffer_event,
EV_READ | EV_WRITE | EV_TIMEOUT);
printf("Sending ping to %s. Failed pings: %lu\n",
text_ip, neighbour->failed_pings);
log_debug("timeout_process - sending ping to %s."
" Failed pings: %lu", text_ip,
neighbour->failed_pings);
/* send ping to the inactive neighbour;
* 5 is the length of bytes to be transmitted
*/
......@@ -141,57 +196,155 @@ static void timeout_process(linkedlist_t *neighbours,
neighbour->failed_pings++;
} else {
printf("3 failed pings. Removing %s from neighbours\n",
text_ip);
log_info("timeout_process - 3 failed pings."
" Removing %s from neighbours",
text_ip);
delete_neighbour(neighbours, neighbour->buffer_event);
}
}
/**
* Callback for bufferevent event detection.
* Delete 'neighbour' from 'pending_neighbours' and add it into 'neighbours'.
*
* @param bev bufferevent on which the event occured.
* @param events Flags of the events occured.
* @param ctx Pointer to global_state_t to determine the neighbour.
* @param global_state Global state.
* @param neighbour Neighbour to be moved.
*
* @param neighbour_t The new neighbour in the 'neighbours'.
* @param NULL If adding failed.
*/
static void event_cb(struct bufferevent *bev, short events, void *ctx)
static neighbour_t *move_neighbour_from_pending(global_state_t *global_state,
neighbour_t *neighbour)
{
neighbour_t *neighbour;
char text_ip[INET6_ADDRSTRLEN];
linkedlist_node_t *neighbour_node;
neighbour_t *new_neighbour;
new_neighbour = add_new_neighbour(&global_state->neighbours,
&neighbour->addr,
neighbour->buffer_event);
/* if the add was unsuccessful, perform just the full delete */
if (new_neighbour == NULL) {
bufferevent_free(neighbour->buffer_event);
}
neighbour_node = linkedlist_find(&global_state->pending_neighbours,
neighbour);
linkedlist_delete(neighbour_node);
global_state_t *global_state = (global_state_t *) ctx;
return new_neighbour;
}
/* find neighbour with 'bev' */
neighbour = find_neighbour(&global_state->neighbours, bev);
/**
* Process the event that occured on our pending neighbour 'neighbour'.
*
* @param global_state Global state.
* @param neighbour The event occured on this pending neighbour.
* @param events What event occured.
*/
static void process_pending_neighbour(global_state_t *global_state,
neighbour_t *neighbour,
short events)
{
size_t available_peers_size;
int needed_conns;
neighbour_t *new_neighbour;
char text_ip[INET6_ADDRSTRLEN];
/* unknown neighbour; release 'bev' and stop processing the event */
if (neighbour == NULL) {
bufferevent_free(bev);
return;
available_peers_size = fetch_available_peers(&global_state->peers,
NULL);
/* initialize text_ip */
ip_to_string(&neighbour->addr, text_ip);
/* we've successfully connected to the neighbour */
if (events & (BEV_EVENT_CONNECTED)) {
log_info("process_pending_neighbour - connecting to "
"%s was SUCCESSFUL", text_ip);
/* we've got a new neighbour;
* we can't just delete the neighbour from pending
* and add it into 'neighbours' as the delete would
* free'd the bufferevent
*/
new_neighbour = move_neighbour_from_pending(global_state,
neighbour);
if (new_neighbour == NULL) {
return;
}
needed_conns = MIN_NEIGHBOURS -
linkedlist_size(&global_state->neighbours);
/* if we need more neighbours */
if (needed_conns > 0) {
/* and we don't have enough available */
if ((int)available_peers_size < needed_conns) {
ask_for_peers(new_neighbour);
}
}
/* connecting to the neighbour was unsuccessful */
} else {
log_info("process_pending_neighbour - connecting to "
"%s was NOT SUCCESSFUL", text_ip);
/* the peer is no longer a pending neighbour */
delete_neighbour(&global_state->pending_neighbours,
neighbour->buffer_event);
}
}
/**
* Process the event that occured on our neighbour 'neighbour'.
*
* @param global_state Global state.
* @param neighbour The event occured on this neighbour.
* @param events What event occured.
*/
static void process_neighbour(global_state_t *global_state,
neighbour_t *neighbour,
short events)
{
char text_ip[INET6_ADDRSTRLEN];
/* initialize text_ip */
ip_to_string(neighbour->ip_addr, text_ip);
ip_to_string(&neighbour->addr, text_ip);
if (events & BEV_EVENT_ERROR) {
printf("Connection error: removing %s\n", text_ip);
delete_neighbour(&global_state->neighbours, bev);
return;
}
if (events & (BEV_EVENT_CONNECTED)) {
printf("Successfully connected to %s\n", text_ip);
}
if (events & (BEV_EVENT_EOF)) {
printf("%s disconnected\n", text_ip);
delete_neighbour(&global_state->neighbours, bev);
return;
}
log_info("process_neighbour - error on bev: removing %s",
text_ip);
delete_neighbour(&global_state->neighbours,
neighbour->buffer_event);
} else if (events & (BEV_EVENT_EOF)) {
log_info("process_neighbour - %s disconnected", text_ip);
delete_neighbour(&global_state->neighbours,
neighbour->buffer_event);
/* timeout flag on 'bev' */
if (events & BEV_EVENT_TIMEOUT) {
} else if (events & BEV_EVENT_TIMEOUT) {
timeout_process(&global_state->neighbours, neighbour);
}
}
/**
* Callback for bufferevent event detection.
*
* @param bev bufferevent on which the event occured.
* @param events Flags of the events occured.
* @param ctx Pointer to global_state_t to determine the neighbour.
*/
static void event_cb(struct bufferevent *bev, short events, void *ctx)
{
neighbour_t *neighbour;
global_state_t *global_state = (global_state_t *) ctx;
/* find neighbour with 'bev' */
neighbour = find_neighbour(&global_state->neighbours, bev);
if (neighbour != NULL) {
process_neighbour(global_state, neighbour, events);
/* no such neighbour found; try finding it at pending_neighbours */
} else {
neighbour = find_neighbour(&global_state->pending_neighbours,
bev);
/* 'bev' must belong either to 'neighbours' or
* 'pending_neighbours'
*/
assert(neighbour != NULL);
process_pending_neighbour(global_state, neighbour, events);
}
}
/**
* Callback function for accepting new connections.
*
......@@ -206,16 +359,27 @@ static void accept_connection(struct evconnlistener *listener,
int socklen __attribute__((unused)),
void *ctx)
{
struct event_base *base;
struct bufferevent *bev;
unsigned char ip_addr[16];
struct in6_addr *new_addr;
char text_ip[INET6_ADDRSTRLEN];
struct timeval timeout;
global_state_t *global_state = (struct s_global_state *) ctx;
struct sockaddr_in6 *addr_in6 = (struct sockaddr_in6 *) addr;
/* put binary representation of IP to 'new_addr' */
new_addr = &addr_in6->sin6_addr;
ip_to_string(new_addr, text_ip);
if (find_neighbour_by_addr(&global_state->pending_neighbours,
new_addr)) {
log_debug("accept_connection - peer %s already at "
"pending neighbours", text_ip);
return;
}
/* get the event_base */
base = evconnlistener_get_base(listener);
......@@ -235,23 +399,20 @@ static void accept_connection(struct evconnlistener *listener,
/* after TIMEOUT_TIME seconds invoke event_cb */
bufferevent_set_timeouts(bev, &timeout, NULL);
/* put binary representation of IP to inet6_ip */
memcpy(ip_addr, addr_in6->sin6_addr.s6_addr, 16);
/* add the new connection to the list of our neighbours */
if (!add_new_neighbour(&global_state->neighbours,
ip_addr,
new_addr,
bev)) {
/* free the bufferevent if adding failed */
log_debug("accept_connection - adding failed");
bufferevent_free(bev);
return;
}
ip_to_string(ip_addr, text_ip);
printf("New connection from [%s]:%d\n", text_ip,
log_info("accept_connection - new connection from [%s]:%d", text_ip,
ntohs(addr_in6->sin6_port));
add_peer(&global_state->peers, new_addr);
}
/**
......@@ -267,10 +428,8 @@ static void accept_error_cb(struct evconnlistener *listener,
global_state_t *global_state = (struct s_global_state *) ctx;
int err = EVUTIL_SOCKET_ERROR();
fprintf(stderr, "Got an error %d (%s) on the listener. "
"Shutting down.\n", err, evutil_socket_error_to_string(err));
/* WIP */
log_error("accept_error_cb - got an error %d (%s) on the listener. "
"Shutting down.", err, evutil_socket_error_to_string(err));
/* stop the event loop */
event_base_loopexit(base, NULL);
......@@ -282,7 +441,7 @@ static void accept_error_cb(struct evconnlistener *listener,
/**
* Initialize listening and set up callbacks.
*
* @param listener The even loop listener.
* @param listener The event loop listener.
* @param global_state Data for the event loop to work with.
*
* @return 0 if successfully initialized.
......@@ -298,7 +457,7 @@ int listen_init(struct evconnlistener **listener,
*base = event_base_new();
if (!*base) {
puts("Couldn't open event base");
log_error("listen_init - event_base creation failure");
return 1;
}
......@@ -316,7 +475,7 @@ int listen_init(struct evconnlistener **listener,
sizeof(sock_addr));
if (!*listener) {
perror("Couldn't create listener: evconnlistener_new_bind");
log_error("listen_init - evconnlistener_new_bind");
return 1;
}
......@@ -326,18 +485,21 @@ int listen_init(struct evconnlistener **listener,
}
/**
* Attempt to connect to the particular peer.
* Attempt to connect to the particular addr.
*
* @param global_state Data for the event loop to work with.
* @param ip_addr Binary IP of a peer that we want to connect to.
* @param addr Binary IP of a peer that we want to connect to.
*
* @return neighbour_t * Pointer to newly connected neighbour.
* @return NULL Connecting was unsuccessful.
* @return 0 The connection attempt was succesful.
* @return 1 The peer is already our neighbour.
* @return 2 The peer is pending to become our neighbour.
* @return 3 Adding new pending neighbour unsuccessful.
*/
neighbour_t *connect_to_peer(global_state_t *global_state,
const unsigned char *ip_addr)
int connect_to_addr(global_state_t *global_state,
const struct in6_addr *addr)
{
struct bufferevent *bev;
peer_t *peer;
struct sockaddr *sock_addr;
struct sockaddr_in6 sock_addr6;
int sock_len;
......@@ -345,21 +507,27 @@ neighbour_t *connect_to_peer(global_state_t *global_state,
struct timeval timeout;
/* get textual representation of the input ip address */
ip_to_string(ip_addr, text_ip);
ip_to_string(addr, text_ip);
/* don't connect to already connected peer */
if (find_neighbour_by_ip(&global_state->neighbours, ip_addr) != NULL) {
printf("connect_to_peer: skipping already connected peer\n");
return NULL;
if (find_neighbour_by_addr(&global_state->neighbours, addr) != NULL) {
log_debug("connect_to_addr - peer already connected");
return 1;
}
/* don't attempt to connect to already pending connection */
if (linkedlist_find(&global_state->pending_neighbours, addr) != NULL) {
log_debug("connect_to_addr - peer is in the pending conns");
return 2;
}
memset(&sock_addr6, 0x0, sizeof(sock_addr6));
sock_addr6.sin6_family = AF_INET6;
sock_addr6.sin6_port = htons(DEFAULT_PORT);
memcpy(&sock_addr6.sin6_addr, ip_addr, 16);
sock_addr6.sin6_port = htons(DEFAULT_PORT);
memcpy(&sock_addr6.sin6_addr, addr, 16);