Browse Source

Implement a load-based supernode selection strategy on edges (#493)

* Implement load-based selection strategy

* Update n2n_wire.h

* Update edge_utils.c

* Update n2n.c

* Update sn_utils.c

* Update edge_utils.c
pull/500/head
Francesco Carli 4 years ago
committed by GitHub
parent
commit
8915609f1a
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      include/n2n.h
  2. 6
      include/n2n_define.h
  3. 4
      include/n2n_wire.h
  4. 51
      include/sn_selection.h
  5. 77
      src/edge_utils.c
  6. 5
      src/n2n.c
  7. 2
      src/sn.c
  8. 123
      src/sn_selection.c
  9. 48
      src/sn_utils.c
  10. 4
      src/wire.c

5
include/n2n.h

@ -254,7 +254,7 @@ struct n2n_udphdr
#include "cc20.h"
#include "speck.h"
#include "n2n_regex.h"
#include "sn_selection.h"
#ifdef WIN32
#define N2N_IFNAMSIZ 64
#else
@ -303,7 +303,7 @@ struct peer_info {
time_t last_seen;
time_t last_p2p;
time_t last_sent_query;
time_t ping_time;
SN_SELECTION_CRITERION_DATA_TYPE selection_criterion;
uint64_t last_valid_time_stamp;
char *ip_addr;
@ -490,6 +490,7 @@ struct n2n_edge {
n2n_edge_callbacks_t cb; /**< API callbacks */
void *user_data; /**< Can hold user data */
uint64_t sn_last_valid_time_stamp;/**< last valid time stamp from supernode */
SN_SELECTION_CRITERION_DATA_TYPE sn_selection_criterion_common_data;
/* Sockets */
n2n_sock_t supernode;

6
include/n2n_define.h

@ -44,9 +44,11 @@
#define SOCKET_TIMEOUT_INTERVAL_SECS 10
#define REGISTER_SUPER_INTERVAL_DFL 20 /* sec, usually UDP NAT entries in a firewall expire after 30 seconds */
#define MAX_PING_TIME 3000 /* millisec, indicates default value for ping_time field in peer_info structure */
#define SWEEP_TIME 30 /* sec, indicates the value after which we have to sort the hash list of supernodes in edges */
#define SWEEP_TIME 30 /* sec, indicates the value after which we have to sort the hash list of supernodes in edges
* and when we send out packets to query selection-relevant informations from supernodes. */
/* Timeouts used in re_register_and_purge_supernodes. LAST_SEEN_SN_ACTIVE and LAST_SEEN_SN_INACTIVE
* values should be at least 3*SOCKET_TIMEOUT_INTERVAL_SECS apart. */
#define LAST_SEEN_SN_ACTIVE 20 /* sec, indicates supernodes that are proven to be active */
#define LAST_SEEN_SN_INACTIVE 90 /* sec, indicates supernodes that are proven to be inactive: they will be purged */
#define LAST_SEEN_SN_NEW (LAST_SEEN_SN_INACTIVE - LAST_SEEN_SN_ACTIVE)/2 /* sec, indicates supernodes with unsure status, must be tested to check if they are active */

4
include/n2n_wire.h

@ -52,6 +52,8 @@ typedef char n2n_sock_str_t[N2N_SOCKBUF_SIZE]; /* tracing string buffer
#include <sys/socket.h> /* AF_INET and AF_INET6 */
#endif /* #if defined(WIN32) */
#include "sn_selection.h"
typedef enum n2n_pc
{
n2n_ping=0, /* Not used */
@ -189,6 +191,7 @@ typedef struct n2n_PEER_INFO {
n2n_mac_t srcMac;
n2n_mac_t mac;
n2n_sock_t sock;
SN_SELECTION_CRITERION_DATA_TYPE data;
} n2n_PEER_INFO_t;
@ -197,7 +200,6 @@ typedef struct n2n_QUERY_PEER
n2n_mac_t srcMac;
n2n_sock_t sock;
n2n_mac_t targetMac;
uint8_t req_data; /* data we want the supernode to send back in the answer's payload (e.g. 0 = no payload, 1 = number of connected nodes ...) */
} n2n_QUERY_PEER_t;
typedef struct n2n_buf n2n_buf_t;

51
include/sn_selection.h

@ -0,0 +1,51 @@
/**
* (C) 2007-20 - ntop.org and contributors
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not see see <http://www.gnu.org/licenses/>
*
*/
#ifndef _SN_SELECTION_
#define _SN_SELECTION_
#define SN_SELECTION_CRITERION_DATA_TYPE uint32_t
#define SN_SELECTION_CRITERION_BUF_SIZE 14
typedef char selection_criterion_str_t[SN_SELECTION_CRITERION_BUF_SIZE];
#include "n2n.h"
typedef struct n2n_edge n2n_edge_t;
typedef struct peer_info peer_info_t;
typedef struct n2n_sn n2n_sn_t;
/* selection criterion's functions */
int sn_selection_criterion_init(peer_info_t *peer);
int sn_selection_criterion_default(SN_SELECTION_CRITERION_DATA_TYPE *selection_criterion);
int sn_selection_criterion_calculate(n2n_edge_t *eee, peer_info_t *peer, SN_SELECTION_CRITERION_DATA_TYPE *data);
/* common data's functions */
int sn_selection_criterion_common_data_default(n2n_edge_t *eee);
/* sorting function */
int sn_selection_sort(peer_info_t **peer_list);
/* gathering data function */
SN_SELECTION_CRITERION_DATA_TYPE sn_selection_criterion_gather_data(n2n_sn_t *sss);
/* management port output function */
extern char * sn_selection_criterion_str(selection_criterion_str_t out, peer_info_t *peer);
#endif /* _SN_SELECTION_ */

77
src/edge_utils.c

@ -203,6 +203,7 @@ n2n_edge_t* edge_init(const n2n_edge_conf_t *conf, int *rv) {
eee->pending_peers = NULL;
eee->sup_attempts = N2N_EDGE_SUP_ATTEMPTS;
eee->sn_last_valid_time_stamp = initial_time_stamp ();
sn_selection_criterion_common_data_default(eee);
pearson_hash_init();
@ -716,7 +717,6 @@ static void send_query_peer( n2n_edge_t * eee,
idx=0;
encode_mac( query.targetMac, &idx, dstMac );
query.req_data = 0;
idx=0;
@ -737,16 +737,13 @@ static void send_query_peer( n2n_edge_t * eee,
} else {
traceEvent( TRACE_DEBUG, "send PING to supernodes" );
memcpy(tmp_pkt, pktbuf, idx);
HASH_ITER(hh, eee->conf.supernodes, peer, tmp){
if(eee->conf.header_encryption == HEADER_ENCRYPTION_ENABLED){
/* Re-encrypt the orginal message again for non-repeating IV. */
memcpy(pktbuf, tmp_pkt, idx);
packet_header_encrypt (pktbuf, idx, eee->conf.header_encryption_ctx,
eee->conf.header_iv_ctx,
time_stamp (), pearson_hash_16 (pktbuf, idx));
}
HASH_ITER(hh, eee->conf.supernodes, peer, tmp){
sendto_sock( eee->udp_sock, pktbuf, idx, &(peer->sock));
}
}
@ -754,13 +751,6 @@ static void send_query_peer( n2n_edge_t * eee,
/* ******************************************************** */
static int ping_time_sort(struct peer_info *a, struct peer_info *b){
// comparison function for sorting supernodes in ascending order of their
// ping_time-fields
return (a->ping_time - b->ping_time);
}
/** Send a REGISTER_SUPER packet to the current supernode. */
static void send_register_super(n2n_edge_t *eee) {
uint8_t pktbuf[N2N_PKT_BUF_SIZE] = {0};
@ -823,13 +813,15 @@ static int sort_supernodes(n2n_edge_t *eee, time_t now){
if(now - eee->last_sweep > SWEEP_TIME){
if(eee->sn_wait == 0){
// this routine gets periodically called
// it sorts supernodes in ascending order of their ping_time-fields
HASH_SORT(eee->conf.supernodes, ping_time_sort);
// it sorts supernodes in ascending order of their selection_criterion fields
sn_selection_sort(&(eee->conf.supernodes));
}
HASH_ITER(hh, eee->conf.supernodes, scan, tmp){
scan->ping_time = MAX_PING_TIME;
sn_selection_criterion_default(&(scan->selection_criterion));
}
sn_selection_criterion_common_data_default(eee);
send_query_peer(eee, null_mac);
eee->last_sweep = now;
@ -996,8 +988,8 @@ void update_supernode_reg(n2n_edge_t * eee, time_t nowTime) {
if(0 == eee->sup_attempts) {
/* Give up on that supernode and try the next one. */
eee->curr_sn->ping_time = MAX_PING_TIME;
HASH_SORT(eee->conf.supernodes, ping_time_sort);
sn_selection_criterion_default(&(eee->curr_sn->selection_criterion));
sn_selection_sort(&(eee->conf.supernodes));
eee->curr_sn = eee->conf.supernodes;
memcpy(&eee->supernode, &(eee->curr_sn->sock), sizeof(n2n_sock_t));
@ -1277,6 +1269,7 @@ static void readFromMgmtSocket(n2n_edge_t *eee, int *keep_running) {
uint32_t num_pending_peers = 0;
uint32_t num_known_peers = 0;
uint32_t num = 0;
selection_criterion_str_t sel_buf;
now = time(NULL);
@ -1396,6 +1389,30 @@ static void readFromMgmtSocket(n2n_edge_t *eee, int *keep_running) {
msg_len = 0;
}
// dump supernodes
msg_len += snprintf((char *) (udp_buf + msg_len), (N2N_PKT_BUF_SIZE - msg_len),
"-----------------------------------------------------------------------------------------------\n");
msg_len += snprintf((char *) (udp_buf + msg_len), (N2N_PKT_BUF_SIZE - msg_len),
"supernodes:\n");
HASH_ITER(hh, eee->conf.supernodes, peer, tmpPeer) {
net = htonl(peer->dev_addr.net_addr);
msg_len += snprintf((char *) (udp_buf + msg_len), (N2N_PKT_BUF_SIZE - msg_len),
" %-4u %-15s %-17s %-21s %-14s %lu\n",
++num,
(peer->purgeable == SN_UNPURGEABLE)?"-l ":" ",
macaddr_str(mac_buf, peer->mac_addr),
sock_to_cstr(sockbuf, &(peer->sock)),
sn_selection_criterion_str(sel_buf, peer),
now - peer->last_seen);
sendto(eee->udp_mgmt_sock, udp_buf, msg_len, 0,
(struct sockaddr *) &sender_sock, sizeof(struct sockaddr_in));
msg_len = 0;
}
// end dump supernodes
msg_len += snprintf((char *) (udp_buf + msg_len), (N2N_PKT_BUF_SIZE - msg_len),
"-----------------------------------------------------------------------------------------------\n");
@ -1785,11 +1802,11 @@ void readFromIPSocket(n2n_edge_t * eee, int in_sock) {
time_t now=0;
uint64_t stamp = 0;
size_t length;
size_t i;
length = sizeof(sender_sock);
i = sizeof(sender_sock);
recvlen = recvfrom(in_sock, udp_buf, N2N_PKT_BUF_SIZE, 0/*flags*/,
(struct sockaddr *)&sender_sock, (socklen_t*)&length);
(struct sockaddr *)&sender_sock, (socklen_t*)&i);
if(recvlen < 0) {
#ifdef WIN32
@ -1808,8 +1825,7 @@ void readFromIPSocket(n2n_edge_t * eee, int in_sock) {
/* REVISIT: when UDP/IPv6 is supported we will need a flag to indicate which
* IP transport version the packet arrived on. May need to UDP sockets. */
/* REVISIT: do not endprse use with several supernodes
memset(&sender, 0, sizeof(n2n_sock_t)); */
memset(&sender, 0, sizeof(n2n_sock_t));
sender.family = AF_INET; /* UDP socket was opened PF_INET v4 */
sender.port = ntohs(sender_sock.sin_port);
@ -1841,8 +1857,6 @@ void readFromIPSocket(n2n_edge_t * eee, int in_sock) {
}
}
/* hexdump(udp_buf, recvlen); */
rem = recvlen; /* Counts down bytes of packet to protect against buffer overruns. */
idx = 0; /* marches through packet header as parts are decoded. */
if(decode_common(&cmn, udp_buf, &rem, &idx) < 0)
@ -2039,12 +2053,14 @@ void readFromIPSocket(n2n_edge_t * eee, int in_sock) {
(tmp_sock->family == AF_INET)?(void*)&tmp_sock->addr.v4:(void*)&tmp_sock->addr.v6,
sn->ip_addr, N2N_EDGE_SN_HOST_SIZE-1);
sprintf (sn->ip_addr, "%s:%u", sn->ip_addr, (uint16_t)tmp_sock->port);
//sock_to_cstr(sn->ip_addr, tmp_sock);
}
sn_selection_criterion_default(&(sn->selection_criterion));
sn->last_seen = now - LAST_SEEN_SN_NEW;
sn->last_valid_time_stamp = initial_time_stamp();
traceEvent(TRACE_NORMAL, "Supernode '%s' added to the list of supernodes.", sn->ip_addr);
}
/* REVISIT: find a more elegant expression to increase following pointers. */
tmp_sock = (void*)tmp_sock + REG_SUPER_ACK_PAYLOAD_ENTRY_SIZE;
tmp_mac = (void*)tmp_sock + sizeof(n2n_sock_t);
}
@ -2074,7 +2090,6 @@ void readFromIPSocket(n2n_edge_t * eee, int in_sock) {
* based on its NAT configuration. */
//eee->conf.register_interval = ra.lifetime;
eee->curr_sn->ping_time = (now - eee->last_register_req)*1000;
}
else
{
@ -2091,6 +2106,7 @@ void readFromIPSocket(n2n_edge_t * eee, int in_sock) {
n2n_PEER_INFO_t pi;
struct peer_info * scan;
int skip_add;
SN_SELECTION_CRITERION_DATA_TYPE data;
decode_PEER_INFO( &pi, &cmn, udp_buf, &rem, &idx );
@ -2112,7 +2128,9 @@ void readFromIPSocket(n2n_edge_t * eee, int in_sock) {
skip_add = SN_ADD_SKIP;
scan = add_sn_to_list_by_mac_or_sock(&(eee->conf.supernodes), &sender, &pi.srcMac, &skip_add);
if(scan != NULL){
scan->ping_time = (now - eee->last_sweep)*1000;
scan->last_seen = now;
/* The data type depends on the actual selection strategy that has been chosen. */
sn_selection_criterion_calculate(eee, scan, &pi.data);
break;
}
} else {
@ -2205,7 +2223,8 @@ int run_edge_loop(n2n_edge_t * eee, int *keep_running) {
max_sock = max(max_sock, eee->device.fd);
#endif
wait_time.tv_sec = SOCKET_TIMEOUT_INTERVAL_SECS; wait_time.tv_usec = 0;
wait_time.tv_sec = (eee->sn_wait)?(SOCKET_TIMEOUT_INTERVAL_SECS / 10 + 1):(SOCKET_TIMEOUT_INTERVAL_SECS);
wait_time.tv_usec = 0;
rc = select(max_sock+1, &socket_mask, NULL, NULL, &wait_time);
nowTime = time(NULL);

5
src/n2n.c

@ -18,6 +18,8 @@
#include "n2n.h"
#include "sn_selection.h"
#include "minilzo.h"
#include <assert.h>
@ -294,8 +296,6 @@ struct peer_info* add_sn_to_list_by_mac_or_sock(struct peer_info **sn_list, n2n_
if(memcmp(mac,null_mac,sizeof(n2n_mac_t)) != 0) { /* not zero MAC */
HASH_FIND_PEER(*sn_list, mac, peer);
//REVISIT: make this dependent from last_seen and update socket
}
if(peer == NULL) { /* zero MAC, search by socket */
@ -312,6 +312,7 @@ struct peer_info* add_sn_to_list_by_mac_or_sock(struct peer_info **sn_list, n2n_
if((peer == NULL) && (*skip_add == SN_ADD)) {
peer = (struct peer_info*)calloc(1,sizeof(struct peer_info));
if(peer) {
sn_selection_criterion_default(&(peer->selection_criterion));
memcpy(&(peer->sock),sock,sizeof(n2n_sock_t));
memcpy(&(peer->mac_addr),mac, sizeof(n2n_mac_t));
HASH_ADD_PEER(*sn_list, peer);

2
src/sn.c

@ -186,7 +186,7 @@ static void help() {
printf("[-F <federation_name>] ");
#if 0
printf("[-m <mac_address>] ");
#endif /* #if 0 */
#endif
#ifndef WIN32
printf("[-u <uid> -g <gid>] ");
#endif /* ifndef WIN32 */

123
src/sn_selection.c

@ -0,0 +1,123 @@
/**
* (C) 2007-20 - ntop.org and contributors
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not see see <http://www.gnu.org/licenses/>
*
*/
#include "sn_selection.h"
#include <stdint.h>
static SN_SELECTION_CRITERION_DATA_TYPE sn_selection_criterion_common_read(n2n_edge_t *eee);
static int sn_selection_criterion_sort(peer_info_t *a, peer_info_t *b);
/* ****************************************************************************** */
/* Initialize selection_criterion field in peer_info structure*/
int sn_selection_criterion_init(peer_info_t *peer){
if(peer != NULL){
sn_selection_criterion_default(&(peer->selection_criterion));
}
return 0; /* OK */
}
/* Set selection_criterion field to default value according to selected strategy. */
int sn_selection_criterion_default(SN_SELECTION_CRITERION_DATA_TYPE *selection_criterion){
*selection_criterion = (SN_SELECTION_CRITERION_DATA_TYPE) UINT32_MAX >> 1;
return 0; /* OK */
}
/* Take data from PEER_INFO payload and transform them into a selection_criterion.
* This function is highly dependant of the chosen selection criterion.
*/
int sn_selection_criterion_calculate(n2n_edge_t *eee, peer_info_t *peer, SN_SELECTION_CRITERION_DATA_TYPE *data){
SN_SELECTION_CRITERION_DATA_TYPE common_data;
int sum = 0;
common_data = sn_selection_criterion_common_read(eee);
peer->selection_criterion = (SN_SELECTION_CRITERION_DATA_TYPE)(be32toh(*data) + common_data);
/* Mitigation of the real supernode load in order to see less oscillations.
* Edges jump from a supernode to another back and forth due to purging.
* Because this behavior has a cost of switching, the real load is mitigated with a stickyness factor.
* This factor is dynamically calculated basing on network size and prevent that unnecessary switching */
if(peer == eee->curr_sn){
sum = HASH_COUNT(eee->known_peers) + HASH_COUNT(eee->pending_peers);
peer->selection_criterion = peer->selection_criterion * sum / (sum + 1);
}
return 0; /* OK */
}
/* Set sn_selection_criterion_common_data field to default value. */
int sn_selection_criterion_common_data_default(n2n_edge_t *eee){
SN_SELECTION_CRITERION_DATA_TYPE tmp = 0;
tmp = HASH_COUNT(eee->pending_peers);
if(eee->conf.header_encryption == HEADER_ENCRYPTION_ENABLED){
tmp *= 2;
}
eee->sn_selection_criterion_common_data = tmp / HASH_COUNT(eee->conf.supernodes);
return 0; /* OK */
}
/* Return the value of sn_selection_criterion_common_data field. */
static SN_SELECTION_CRITERION_DATA_TYPE sn_selection_criterion_common_read(n2n_edge_t *eee){
return eee->sn_selection_criterion_common_data;
}
/* Function that compare two selection_criterion fields and sorts them in ascending order. */
static int sn_selection_criterion_sort(peer_info_t *a, peer_info_t *b){
// comparison function for sorting supernodes in ascending order of their selection_criterion.
return (a->selection_criterion - b->selection_criterion);
}
/* Function that sorts peer_list using sn_selection_criterion_sort. */
int sn_selection_sort(peer_info_t **peer_list){
HASH_SORT(*peer_list, sn_selection_criterion_sort);
return 0; /* OK */
}
/* Function that gathers requested data on a supernode. */
SN_SELECTION_CRITERION_DATA_TYPE sn_selection_criterion_gather_data(n2n_sn_t *sss){
SN_SELECTION_CRITERION_DATA_TYPE data = 0, tmp = 0;
struct sn_community *comm, *tmp_comm;
HASH_ITER(hh, sss->communities, comm, tmp_comm){
tmp = HASH_COUNT(comm->edges) + 1; /* number of nodes in the community + the community itself. */
if(comm->header_encryption == HEADER_ENCRYPTION_ENABLED){ /*double-count encrypted communities (and their nodes): they exert more load on supernode. */
tmp *= 2;
}
data += tmp;
}
return htobe32(data);
}
/* Convert selection_criterion field in a string for management port output. */
extern char * sn_selection_criterion_str(selection_criterion_str_t out, peer_info_t *peer){
if(NULL == out) { return NULL; }
memset(out, 0, SN_SELECTION_CRITERION_BUF_SIZE);
snprintf(out, SN_SELECTION_CRITERION_BUF_SIZE -1, "ld = %d", (short int)(peer->selection_criterion));
return out;
}

48
src/sn_utils.c

@ -24,6 +24,7 @@ static int try_forward(n2n_sn_t * sss,
const struct sn_community *comm,
const n2n_common_t * cmn,
const n2n_mac_t dstMac,
uint8_t from_supernode,
const uint8_t * pktbuf,
size_t pktsize,
uint8_t from_supernode);
@ -42,6 +43,7 @@ static int try_broadcast(n2n_sn_t * sss,
const struct sn_community *comm,
const n2n_common_t * cmn,
const n2n_mac_t srcMac,
uint8_t from_supernode,
const uint8_t * pktbuf,
size_t pktsize,
uint8_t from_supernode);
@ -74,7 +76,7 @@ static int process_udp(n2n_sn_t *sss,
size_t udp_size,
time_t now);
static const n2n_mac_t null_mac = {0, 0, 0, 0, 0, 0}; /* 00:00:00:00:00:00 */
static const n2n_mac_t null_mac = {0, 0, 0, 0, 0, 0};
/* ************************************** */
@ -82,6 +84,7 @@ static int try_forward(n2n_sn_t * sss,
const struct sn_community *comm,
const n2n_common_t * cmn,
const n2n_mac_t dstMac,
uint8_t from_supernode,
const uint8_t * pktbuf,
size_t pktsize,
uint8_t from_supernode)
@ -120,7 +123,9 @@ static int try_forward(n2n_sn_t * sss,
if(!from_supernode){
/* Forwarding packet to all federated supernodes. */
traceEvent(TRACE_DEBUG, "Unknown MAC. Broadcasting packet to all federated supernodes.");
try_broadcast(sss, NULL, cmn, sss->mac_addr, pktbuf, pktsize, from_supernode);
try_broadcast(sss, NULL, cmn, sss->mac_addr, from_supernode, pktbuf, pktsize);
} else {
traceEvent(TRACE_DEBUG, "try_forward unknown MAC. Dropping the packet.");
/* Not a known MAC so drop. */
@ -174,6 +179,7 @@ static int try_broadcast(n2n_sn_t * sss,
const struct sn_community *comm,
const n2n_common_t * cmn,
const n2n_mac_t srcMac,
uint8_t from_supernode,
const uint8_t * pktbuf,
size_t pktsize,
uint8_t from_supernode)
@ -709,7 +715,6 @@ static int process_mgmt(n2n_sn_t *sss,
" id tun_tap MAC edge hint last_seen\n");
ressize += snprintf(resbuf + ressize, N2N_SN_PKTBUF_SIZE - ressize,
"-------------------------------------------------------------------------------------------------\n");
HASH_ITER(hh, sss->communities, community, tmp) {
num_edges += HASH_COUNT(community->edges);
ressize += snprintf(resbuf + ressize, N2N_SN_PKTBUF_SIZE - ressize,
@ -814,6 +819,7 @@ static int process_udp(n2n_sn_t * sss,
char buf[32];
struct sn_community *comm, *tmp;
uint64_t stamp;
const n2n_mac_t null_mac = {0, 0, 0, 0, 0, 0}; /* 00:00:00:00:00:00 */
traceEvent(TRACE_DEBUG, "Processing incoming UDP packet [len: %lu][sender: %s:%u]",
udp_size, intoa(ntohl(sender_sock->sin_addr.s_addr), buf, sizeof(buf)),
@ -995,9 +1001,9 @@ static int process_udp(n2n_sn_t * sss,
/* Common section to forward the final product. */
if(unicast)
try_forward(sss, comm, &cmn, pkt.dstMac, rec_buf, encx, from_supernode);
try_forward(sss, comm, &cmn, pkt.dstMac, from_supernode, rec_buf, encx);
else
try_broadcast(sss, comm, &cmn, pkt.srcMac, rec_buf, encx, from_supernode);
try_broadcast(sss, comm, &cmn, pkt.srcMac, from_supernode, rec_buf, encx);
break;
}
case MSG_TYPE_REGISTER:
@ -1061,8 +1067,7 @@ static int process_udp(n2n_sn_t * sss,
packet_header_encrypt (rec_buf, encx, comm->header_encryption_ctx,
comm->header_iv_ctx,
time_stamp (), pearson_hash_16 (rec_buf, encx));
try_forward(sss, comm, &cmn, reg.dstMac, rec_buf, encx, from_supernode); /* unicast only */
try_forward(sss, comm, &cmn, reg.dstMac, from_supernode, rec_buf, encx); /* unicast only */
} else
traceEvent(TRACE_ERROR, "Rx REGISTER with multicast destination");
break;
@ -1088,6 +1093,7 @@ static int process_udp(n2n_sn_t * sss,
n2n_ip_subnet_t ipaddr;
int num = 0;
int skip_add;
int skip;
memset(&ack, 0, sizeof(n2n_REGISTER_SUPER_ACK_t));
@ -1188,11 +1194,24 @@ static int process_udp(n2n_sn_t * sss,
// REVISIT: consider adding last_seen
/* Skip random numbers of supernodes before payload assembling, calculating an appropriate random_number.
* That way, all supernodes have a chance to be propagated with REGISTER_SUPER_ACK. */
skip = HASH_COUNT(sss->federation->edges) - (int)(REG_SUPER_ACK_PAYLOAD_ENTRY_SIZE / REG_SUPER_ACK_PAYLOAD_ENTRY_SIZE);
skip = (skip < 0) ? 0 : n2n_rand() % (skip +1);
/* Assembling supernode list for REGISTER_SUPER_ACK payload */
tmp_dst = tmpbuf;
HASH_ITER(hh, sss->federation->edges, peer, tmp_peer) {
if(skip){
skip--;
continue;
}
if(memcmp(&(peer->sock), &(ack.sock), sizeof(n2n_sock_t)) == 0) continue; /* a supernode doesn't add itself to the payload */
if((now - peer->last_seen) >= LAST_SEEN_SN_ACTIVE) continue; /* skip long-time-not-seen supernodes */
if((now - peer->last_seen) >= (2*LAST_SEEN_SN_ACTIVE)) continue; /* skip long-time-not-seen supernodes.
* We need to allow for a little extra time because supernodes sometimes exceed
* their SN_ACTIVE time before they get re-registred to. */
if(((++num)*REG_SUPER_ACK_PAYLOAD_ENTRY_SIZE) > REG_SUPER_ACK_PAYLOAD_SPACE) break; /* no more space available in REGISTER_SUPER_ACK payload */
memcpy((void*)tmp_dst, (void*)&(peer->sock), sizeof(n2n_sock_t));
tmp_dst += sizeof(n2n_sock_t);
@ -1359,9 +1378,8 @@ static int process_udp(n2n_sn_t * sss,
}
if(memcmp(query.targetMac, null_mac, sizeof(n2n_mac_t)) == 0){
traceEvent( TRACE_DEBUG, "Rx PING from %s. Requested data: %d",
macaddr_str( mac_buf, query.srcMac ),
query.req_data );
traceEvent( TRACE_DEBUG, "Rx PING from %s.",
macaddr_str( mac_buf, query.srcMac ));
cmn2.ttl = N2N_DEFAULT_TTL;
cmn2.pc = n2n_peer_info;
@ -1371,6 +1389,10 @@ static int process_udp(n2n_sn_t * sss,
pi.aflags = 0;
memcpy( pi.mac, query.targetMac, sizeof(n2n_mac_t) );
memcpy( pi.srcMac, sss->mac_addr, sizeof(n2n_mac_t) );
pi.sock.family = AF_INET;
pi.sock.port = ntohs(sender_sock->sin_port);
memcpy(pi.sock.addr.v4, &(sender_sock->sin_addr.s_addr), IPV4_SIZE);
pi.data = sn_selection_criterion_gather_data(sss);
encode_PEER_INFO( encbuf, &encx, &cmn2, &pi);
@ -1384,7 +1406,7 @@ static int process_udp(n2n_sn_t * sss,
sendto( sss->sock, encbuf, encx, 0,
(struct sockaddr *)sender_sock, sizeof(struct sockaddr_in) );
traceEvent( TRACE_DEBUG, "Tx PING to %s",
traceEvent( TRACE_DEBUG, "Tx PONG to %s",
macaddr_str( mac_buf, query.srcMac ) );
} else {
@ -1444,7 +1466,7 @@ static int process_udp(n2n_sn_t * sss,
comm->header_iv_ctx,
time_stamp (), pearson_hash_16 (encbuf, encx));
try_broadcast(sss, NULL, &cmn, query.srcMac, encbuf, encx, from_supernode);
try_broadcast(sss, NULL, &cmn, query.srcMac, from_supernode, encbuf, encx);
}
}
}

4
src/wire.c

@ -517,6 +517,7 @@ int encode_PEER_INFO(uint8_t *base,
retval += encode_mac(base, idx, pkt->srcMac);
retval += encode_mac(base, idx, pkt->mac);
retval += encode_sock(base, idx, &pkt->sock);
retval += encode_buf(base, idx, &pkt->data, sizeof(SN_SELECTION_CRITERION_DATA_TYPE));
return retval;
}
@ -533,6 +534,7 @@ int decode_PEER_INFO(n2n_PEER_INFO_t *pkt,
retval += decode_mac(pkt->srcMac, base, rem, idx);
retval += decode_mac(pkt->mac, base, rem, idx);
retval += decode_sock(&pkt->sock, base, rem, idx);
retval += decode_buf((uint8_t*)&pkt->data, sizeof(SN_SELECTION_CRITERION_DATA_TYPE), base, rem, idx);
return retval;
}
@ -547,7 +549,6 @@ int encode_QUERY_PEER( uint8_t * base,
retval += encode_common( base, idx, common );
retval += encode_mac( base, idx, pkt->srcMac );
retval += encode_mac( base, idx, pkt->targetMac );
retval += encode_uint8( base, idx, pkt->req_data);
return retval;
}
@ -562,7 +563,6 @@ int decode_QUERY_PEER( n2n_QUERY_PEER_t * pkt,
memset( pkt, 0, sizeof(n2n_QUERY_PEER_t) );
retval += decode_mac( pkt->srcMac, base, rem, idx );
retval += decode_mac( pkt->targetMac, base, rem, idx );
retval += decode_uint8( &pkt->req_data, base, rem, idx);
return retval;
}

Loading…
Cancel
Save