From c55e3e32463d9821b7459c9c86108e6f364a1529 Mon Sep 17 00:00:00 2001 From: Logan oos Even <46396513+Logan007@users.noreply.github.com> Date: Fri, 4 Jun 2021 03:52:12 +0545 Subject: [PATCH] multithreaded supernode name resolution (#645) --- configure.seed | 7 ++ include/n2n_define.h | 3 + include/n2n_typedefs.h | 29 ++++++ src/edge.c | 5 + src/edge_utils.c | 11 +++ src/n2n.c | 127 +++++++++++++++++++++++-- src/sn.c | 6 ++ src/sn_utils.c | 5 + win32/n2n_win32.h | 204 ++++++++++++++++++++++++----------------- 9 files changed, 307 insertions(+), 90 deletions(-) diff --git a/configure.seed b/configure.seed index 6acc76b..60b5b45 100644 --- a/configure.seed +++ b/configure.seed @@ -81,6 +81,13 @@ if test x$cap != x; then AC_DEFINE([HAVE_LIBCAP],[1],[Support for linux capabilities]) fi +AC_CHECK_LIB([pthread], [pthread_mutex_trylock], pthread=true) +if test x$pthread != x; then + LDFLAGS="${LDFLAGS} -pthread" + AC_DEFINE([HAVE_PTHREAD],[],[pthread is present]) +fi + + MACHINE=`uname -m` SYSTEM=`uname -s` diff --git a/include/n2n_define.h b/include/n2n_define.h index 056a80f..8b1a7c9 100644 --- a/include/n2n_define.h +++ b/include/n2n_define.h @@ -62,6 +62,9 @@ #define SORT_COMMUNITIES_INTERVAL 90 /* sec. until supernode sorts communities' hash list again */ +#define N2N_RESOLVE_INTERVAL 300 /* seconds until edge and supernode try to resolve supernode names again */ +#define N2N_RESOLVE_CHECK_INTERVAL 30 /* seconds until main loop checking in on changes from resolver thread */ + #define ETH_FRAMESIZE 14 #define IP4_SRCOFFSET 12 #define IP4_DSTOFFSET 16 diff --git a/include/n2n_typedefs.h b/include/n2n_typedefs.h index adb6289..cb7c2b2 100644 --- a/include/n2n_typedefs.h +++ b/include/n2n_typedefs.h @@ -586,6 +586,32 @@ typedef struct n2n_trans_op { /* *************************************************** */ + +typedef struct n2n_resolve_ip_sock { + char *org_ip; /* pointer to original ip/named address string (used read only) */ + n2n_sock_t sock; /* resolved socket */ + n2n_sock_t *org_sock; /* pointer to original socket where 'sock' gets copied to from time to time */ + int error_code; /* result of last resolution attempt */ + + UT_hash_handle hh; /* makes this structure hashable */ +} n2n_resolve_ip_sock_t; + + +// structure to hold resolver thread's parameters +typedef struct n2n_resolve_parameter { + n2n_resolve_ip_sock_t *list; /* pointer to list of to be resolved nodes */ + uint8_t changed; /* indicates a change */ +#ifdef HAVE_PTHREAD + pthread_t id; /* thread id */ + pthread_mutex_t access; /* mutex for shared access */ +#endif + time_t last_checked; /* last time the resolver completed */ +} n2n_resolve_parameter_t; + + +/* *************************************************** */ + + typedef struct n2n_edge_conf { struct peer_info *supernodes; /**< List of supernodes */ n2n_route_t *routes; /**< Networks to route through n2n */ @@ -673,6 +699,8 @@ struct n2n_edge { struct n2n_edge_stats stats; /**< Statistics */ + n2n_resolve_parameter_t *resolve_parameter; /**< Pointer to name resolver's parameter block */ + n2n_tuntap_priv_config_t tuntap_priv_conf; /**< Tuntap config */ network_traffic_filter_t *network_traffic_filter; @@ -773,6 +801,7 @@ typedef struct n2n_sn { struct sn_community *federation; n2n_private_public_key_t private_key; /* private federation key derived from federation name */ n2n_auth_t auth; + n2n_resolve_parameter_t *resolve_parameter;/*Pointer to name resolver's parameter block */ } n2n_sn_t; diff --git a/src/edge.c b/src/edge.c index 1eb4255..776f976 100644 --- a/src/edge.c +++ b/src/edge.c @@ -50,6 +50,7 @@ int supernode_disconnect (n2n_edge_t *eee); int fetch_and_eventually_process_data (n2n_edge_t *eee, SOCKET sock, uint8_t *pktbuf, uint16_t *expected, uint16_t *position, time_t now); +int resolve_create_thread (n2n_resolve_parameter_t **param, struct peer_info *sn_list); /* ***************************************************** */ @@ -1226,6 +1227,10 @@ int main (int argc, char* argv[]) { traceEvent(TRACE_WARNING, "Running as root is discouraged, check out the -u/-g options"); #endif + if(resolve_create_thread(&(eee->resolve_parameter), eee->conf.supernodes) == 0) { + traceEvent(TRACE_NORMAL, "Successfully created resolver thread"); + } + #ifdef __linux__ signal(SIGPIPE, SIG_IGN); signal(SIGTERM, term_handler); diff --git a/src/edge_utils.c b/src/edge_utils.c index d9c5260..dd135c7 100644 --- a/src/edge_utils.c +++ b/src/edge_utils.c @@ -26,6 +26,9 @@ static HEAP_ALLOC (wrkmem, LZO1X_1_MEM_COMPRESS); /* ************************************** */ +int resolve_check (n2n_resolve_parameter_t *param, time_t now); +int resolve_cancel_thread (n2n_resolve_parameter_t *param); + static const char * supernode_ip (const n2n_edge_t * eee); static void send_register (n2n_edge_t *eee, const n2n_sock_t *remote_peer, const n2n_mac_t peer_mac); @@ -1458,12 +1461,16 @@ void update_supernode_reg (n2n_edge_t * eee, time_t now) { --(eee->sup_attempts); } +#ifndef HAVE_PTHREAD if(supernode2sock(&(eee->curr_sn->sock), eee->curr_sn->ip_addr) == 0) { +#endif traceEvent(TRACE_INFO, "Registering with supernode [%s][number of supernodes %d][attempts left %u]", supernode_ip(eee), HASH_COUNT(eee->conf.supernodes), (unsigned int)eee->sup_attempts); send_register_super(eee); +#ifndef HAVE_PTHREAD } +#endif register_with_local_peers(eee); @@ -2952,6 +2959,8 @@ int run_edge_loop (n2n_edge_t *eee, int *keep_running) { sort_supernodes(eee, now); + resolve_check(eee->resolve_parameter, now); + if(eee->cb.main_loop_period) eee->cb.main_loop_period(eee, now); @@ -2973,6 +2982,8 @@ int run_edge_loop (n2n_edge_t *eee, int *keep_running) { /** Deinitialise the edge and deallocate any owned memory. */ void edge_term (n2n_edge_t * eee) { + resolve_cancel_thread(eee->resolve_parameter); + if(eee->sock >= 0) closesocket(eee->sock); diff --git a/src/n2n.c b/src/n2n.c index 2110d33..d29493a 100644 --- a/src/n2n.c +++ b/src/n2n.c @@ -256,10 +256,8 @@ char * macaddr_str (macstr_t buf, /** Resolve the supernode IP address. * - * REVISIT: This is a really bad idea. The edge will block completely while the - * hostname resolution is performed. This could take 15 seconds. */ -int supernode2sock (n2n_sock_t * sn, const n2n_sn_name_t addrIn) { +int supernode2sock (n2n_sock_t *sn, const n2n_sn_name_t addrIn) { n2n_sn_name_t addr; const char *supernode_host; @@ -278,7 +276,7 @@ int supernode2sock (n2n_sock_t * sn, const n2n_sn_name_t addrIn) { if(supernode_port) { sn->port = atoi(supernode_port); } else { - traceEvent(TRACE_WARNING, "Bad supernode parameter (-l ) %s %s:%s", + traceEvent(TRACE_WARNING, "supernode2sock sees malformed supernode parameter (-l ) %s %s:%s", addr, supernode_host, supernode_port); } @@ -296,27 +294,141 @@ int supernode2sock (n2n_sock_t * sn, const n2n_sn_name_t addrIn) { sn->family = AF_INET; } else { /* Should only return IPv4 addresses due to aihints. */ - traceEvent(TRACE_WARNING, "Failed to resolve supernode IPv4 address for %s", supernode_host); + traceEvent(TRACE_WARNING, "supernode2sock fails to resolve supernode IPv4 address for %s", supernode_host); rv = -1; } freeaddrinfo(ainfo); /* free everything allocated by getaddrinfo(). */ ainfo = NULL; } else { - traceEvent(TRACE_WARNING, "Failed to resolve supernode host %s, %d: %s", supernode_host, nameerr, gai_strerror(nameerr)); + traceEvent(TRACE_WARNING, "supernode2sock fails to resolve supernode host %s, %d: %s", supernode_host, nameerr, gai_strerror(nameerr)); rv = -2; } } else { - traceEvent(TRACE_WARNING, "Wrong supernode parameter (-l )"); + traceEvent(TRACE_WARNING, "supernode2sock sees wrong supernode parameter (-l )"); rv = -3; } return(rv); } + +void *resolve_thread (void *p) { + +#ifdef HAVE_PTHREAD + n2n_resolve_parameter_t *param = (n2n_resolve_parameter_t*)p; + n2n_resolve_ip_sock_t *entry, *tmp_entry; + + while(1) { + sleep(N2N_RESOLVE_INTERVAL); + + // lock access + pthread_mutex_lock(¶m->access); + + HASH_ITER(hh, param->list, entry, tmp_entry) { + // resolve + entry->error_code = supernode2sock(&entry->sock, entry->org_ip); + // if socket changed and no error + if(!sock_equal(&entry->sock, entry->org_sock) + && (!entry->error_code)) { + // flag the change + param->changed = 1; + } + } + + // unlock access + pthread_mutex_unlock(¶m->access); + } +#endif +} + + +int resolve_create_thread (n2n_resolve_parameter_t **param, struct peer_info *sn_list) { + +#ifdef HAVE_PTHREAD + struct peer_info *sn, *tmp_sn; + n2n_resolve_ip_sock_t *entry; + int ret; + + // create parameter structure + *param = (n2n_resolve_parameter_t*)calloc(1, sizeof(n2n_resolve_parameter_t)); + if(*param) { + HASH_ITER(hh, sn_list, sn, tmp_sn) { + // create entries for those peers that come with ip_addr string (from command-line) + if(sn->ip_addr) { + entry = (n2n_resolve_ip_sock_t*)calloc(1, sizeof(n2n_resolve_ip_sock_t)); + if(entry) { + entry->org_ip = sn->ip_addr; + entry->org_sock = &(sn->sock); + memcpy(&(entry->sock), &(sn->sock), sizeof(n2n_sock_t)); + HASH_ADD(hh, (*param)->list, org_ip, sizeof(char*), entry); + } else + traceEvent(TRACE_WARNING, "resolve_create_thread was unable to add list entry for supernode '%s'", sn->ip_addr); + } + } + } else { + traceEvent(TRACE_WARNING, "resolve_create_thread was unable to create list of supernodes"); + return -1; + } + + // create thread + ret = pthread_create(&((*param)->id), NULL, resolve_thread, (void *)*param); + if(ret) { + traceEvent(TRACE_WARNING, "resolve_create_thread failed to create resolver thread with error number %d", ret); + return -1; + } + + pthread_mutex_init(&((*param)->access), NULL); + + return 0; +#endif +} + + +void resolve_cancel_thread (n2n_resolve_parameter_t *param) { + +#ifdef HAVE_PTHREAD + pthread_cancel(param->id); + free(param); +#endif +} + + +void resolve_check (n2n_resolve_parameter_t *param, time_t now) { + +#ifdef HAVE_PTHREAD + n2n_resolve_ip_sock_t *entry, *tmp_entry; + n2n_sock_str_t sock_buf; + + if(now - param->last_checked > N2N_RESOLVE_CHECK_INTERVAL) { + // try to lock access + if(pthread_mutex_trylock(¶m->access) == 0) { + // any changes? + if(param->changed) { + // reset flag + param->changed = 0; + // unselectively copy all socks (even those with error code, that would be the old one because + // sockets do not get overwritten in case of error in resolve_thread) from list to supernode list + HASH_ITER(hh, param->list, entry, tmp_entry) { + memcpy(entry->org_sock, &entry->sock, sizeof(n2n_sock_t)); + traceEvent(TRACE_DEBUG, "resolve_check renews ip address of supernode '%s' to %s", + entry->org_ip, sock_to_cstr(sock_buf, &(entry->sock))); + } + } + param->last_checked = now; + + // unlock access + pthread_mutex_unlock(¶m->access); + } + } +#endif +} + + /* ************************************** */ + struct peer_info* add_sn_to_list_by_mac_or_sock (struct peer_info **sn_list, n2n_sock_t *sock, const n2n_mac_t mac, int *skip_add) { struct peer_info *scan, *tmp, *peer = NULL; @@ -608,6 +720,7 @@ int sock_equal (const n2n_sock_t * a, return(1); } + /* *********************************************** */ // fills a specified memory area with random numbers diff --git a/src/sn.c b/src/sn.c index 4884056..e904cb8 100644 --- a/src/sn.c +++ b/src/sn.c @@ -25,6 +25,8 @@ static n2n_sn_t sss_node; +int resolve_create_thread (n2n_resolve_parameter_t **param, struct peer_info *sn_list); + /** Load the list of allowed communities. Existing/previous ones will be removed * */ @@ -858,6 +860,10 @@ int main (int argc, char * const argv[]) { } #endif + if(resolve_create_thread(&(sss_node.resolve_parameter), sss_node.federation->edges) == 0) { + traceEvent(TRACE_NORMAL, "Successfully created resolver thread"); + } + traceEvent(TRACE_NORMAL, "supernode started"); #ifdef __linux__ diff --git a/src/sn_utils.c b/src/sn_utils.c index 3edbf3e..feadbf2 100644 --- a/src/sn_utils.c +++ b/src/sn_utils.c @@ -20,6 +20,8 @@ #define HASH_FIND_COMMUNITY(head, name, out) HASH_FIND_STR(head, name, out) +int resolve_check (n2n_resolve_parameter_t *param, time_t now); +int resolve_cancel_thread (n2n_resolve_parameter_t *param); static ssize_t sendto_peer (n2n_sn_t *sss, const struct peer_info *peer, @@ -430,6 +432,8 @@ void sn_term (n2n_sn_t *sss) { n2n_tcp_connection_t *conn, *tmp_conn; node_supernode_association_t *assoc, *tmp_assoc; + resolve_cancel_thread(sss->resolve_parameter); + if(sss->sock >= 0) { closesocket(sss->sock); } @@ -2358,6 +2362,7 @@ int run_sn_loop (n2n_sn_t *sss, int *keep_running) { re_register_and_purge_supernodes(sss, sss->federation, &last_re_reg_and_purge, now); purge_expired_communities(sss, &last_purge_edges, now); sort_communities(sss, &last_sort_communities, now); + resolve_check(sss->resolve_parameter, now); } /* while */ sn_term(sss); diff --git a/win32/n2n_win32.h b/win32/n2n_win32.h index 1630372..e47d690 100644 --- a/win32/n2n_win32.h +++ b/win32/n2n_win32.h @@ -1,83 +1,121 @@ -/* - - (C) 2007-09 - Luca Deri - -*/ - -#ifndef _N2N_WIN32_H_ -#define _N2N_WIN32_H_ - -#ifndef _CRT_SECURE_NO_WARNINGS -#define _CRT_SECURE_NO_WARNINGS -#endif - -#define WIN32_LEAN_AND_MEAN - -#if defined(__MINGW32__) -/* should be defined here and before winsock gets included */ -#ifndef _WIN32_WINNT -#define _WIN32_WINNT 0x501 //Otherwise the linker doesnt find getaddrinfo -#endif /* #ifndef _WIN32_WINNT */ -#include -#endif /* #if defined(__MINGW32__) */ - -#include -#include -#include - - -#include "wintap.h" - -#undef EAFNOSUPPORT -#define EAFNOSUPPORT WSAEAFNOSUPPORT -#define MAX(a,b) (a > b ? a : b) -#define MIN(a,b) (a < b ? a : b) - -#define snprintf _snprintf -#define strdup _strdup - -#define socklen_t int - - -/* ************************************* */ - -struct ip { -#if BYTE_ORDER == LITTLE_ENDIAN - u_char ip_hl:4, /* header length */ - ip_v:4; /* version */ -#else - u_char ip_v:4, /* version */ - ip_hl:4; /* header length */ -#endif - u_char ip_tos; /* type of service */ - short ip_len; /* total length */ - u_short ip_id; /* identification */ - short ip_off; /* fragment offset field */ -#define IP_DF 0x4000 /* dont fragment flag */ -#define IP_MF 0x2000 /* more fragments flag */ -#define IP_OFFMASK 0x1fff /* mask for fragmenting bits */ - u_char ip_ttl; /* time to live */ - u_char ip_p; /* protocol */ - u_short ip_sum; /* checksum */ - struct in_addr ip_src,ip_dst; /* source and dest address */ -}; - - -/* ************************************* */ - -typedef struct tuntap_dev { - HANDLE device_handle; - char *device_name; - char *ifName; - OVERLAPPED overlap_read, overlap_write; - n2n_mac_t mac_addr; - uint32_t ip_addr; - uint32_t device_mask; - unsigned int mtu; - unsigned int metric; -} tuntap_dev; - -#define index(a, b) strchr(a, b) -#define sleep(x) Sleep(x * 1000) - -#endif +/* + + (C) 2007-09 - Luca Deri + +*/ + +#ifndef _N2N_WIN32_H_ +#define _N2N_WIN32_H_ + +#ifndef _CRT_SECURE_NO_WARNINGS +#define _CRT_SECURE_NO_WARNINGS +#endif + +#define WIN32_LEAN_AND_MEAN + +#if defined(__MINGW32__) +/* should be defined here and before winsock gets included */ +#ifndef _WIN32_WINNT +#define _WIN32_WINNT 0x501 //Otherwise the linker doesnt find getaddrinfo +#endif /* #ifndef _WIN32_WINNT */ +#include +#endif /* #if defined(__MINGW32__) */ + +#include +#include +#include + + +#include "wintap.h" + +#undef EAFNOSUPPORT +#define EAFNOSUPPORT WSAEAFNOSUPPORT +#define MAX(a,b) (a > b ? a : b) +#define MIN(a,b) (a < b ? a : b) + +#define snprintf _snprintf +#define strdup _strdup + +#define socklen_t int + + +/* ************************************* */ + +struct ip { +#if BYTE_ORDER == LITTLE_ENDIAN + u_char ip_hl:4, /* header length */ + ip_v:4; /* version */ +#else + u_char ip_v:4, /* version */ + ip_hl:4; /* header length */ +#endif + u_char ip_tos; /* type of service */ + short ip_len; /* total length */ + u_short ip_id; /* identification */ + short ip_off; /* fragment offset field */ +#define IP_DF 0x4000 /* dont fragment flag */ +#define IP_MF 0x2000 /* more fragments flag */ +#define IP_OFFMASK 0x1fff /* mask for fragmenting bits */ + u_char ip_ttl; /* time to live */ + u_char ip_p; /* protocol */ + u_short ip_sum; /* checksum */ + struct in_addr ip_src,ip_dst; /* source and dest address */ +}; + + +/* ************************************* */ + + +typedef struct tuntap_dev { + HANDLE device_handle; + char *device_name; + char *ifName; + OVERLAPPED overlap_read, overlap_write; + n2n_mac_t mac_addr; + uint32_t ip_addr; + uint32_t device_mask; + unsigned int mtu; + unsigned int metric; +} tuntap_dev; + + +/* ************************************* */ + + +#define index(a, b) strchr(a, b) +#define sleep(x) Sleep(x * 1000) + + +/* ************************************* */ + + +#define HAVE_PTHREAD +#define pthread_t HANDLE +#define pthread_mutex_t HANDLE + +#define pthread_create(p_thread_handle, attr, thread_func, p_param) \ + (*p_thread_handle = CreateThread(0 /* default security flags */, 0 /*default stack*/, \ + thread_func, p_param, 0 /* default creation flags */, \ + NULL) == 0) + +#define pthread_cancel(p_thread_handle) \ + TerminateThread(p_thread_handle, 0) + +#define pthread_mutex_init(p_mutex_handle, attr) \ + *p_mutex_handle = CreateMutex(NULL /*default security flags */, \ + FALSE /* initially not owned */, NULL /* unnamed */) + +#define pthread_mutex_lock(mutex) \ + WaitForSingleObject(*mutex, INFINITE) + +#define pthread_mutex_trylock(mutex) \ + WaitForSingleObject(*mutex, NULL) + +#define pthread_mutex_unlock(mutex) \ + ReleaseMutex(*mutex) + + +/* ************************************* */ + + +#endif