引言:总所周知,NoSQL,Memcached等作为Key—Value 存储的模型的数据路由都采用Hash表来达到目的。如何解决Hash冲突和Hash表大小的设计是一个很头疼的问题。
借助于Radix树,我们同样可以达到对于uint32_t 的数据类型的路由。这个灵感就来自于Linux内核的IP路由表的设计。
作为传统的Hash表,我们把接口简化一下,可以抽象为这么几个接口。
void Hash_create( size_t Max );int Hash_insert( uint32_t hash_value , value_type value ) ;value_type *Hash_get( uint32_t hashvalue ); int Hash_delete( uint32_t hash_value );
接口的含义如其名,创建一个Hash表,插入,取得,删除。
同样,把这个接口的功能抽象后,利用radix同样可以实现相同的接口方式。
1 int mc_radix_hash_ini(mc_radix_t *t ,int nodenum ); 2 3 int mc_radix_hash_insert( mc_radix_t *t , unsigned int hashvalue , void *data ,size_t size ); 4 5 int mc_radix_hash_del( mc_radix_t *t , unsigned int hashvalue ) ; 6 7 void *mc_radix_hash_get( mc_radix_t *t , unsigned int hashvalue ) ;
那我们简单介绍一下Radix树:
Radix Tree(基树) 其实就差不多是传统的二叉树,只是在寻找方式上,利用比如一个unsigned int 的类型的每一个比特位作为树节点的判断。
可以这样说,比如一个数 1000101010101010010101010010101010 (随便写的)那么按照Radix 树的插入就是在根节点,如果遇到 0 ,就指向左节点,如果遇到1就指向右节点,在插入过程中构造树节点,在删除过程中删除树节点。如果觉得太多的调用Malloc的话,可以采用池化技术,预先分配多个节点,本博文就采用这种方式。
1 typedef struct _node_t 2 { 3 char zo ; // zero or one 4 int used_num ; 5 struct _node_t *parent ; 6 struct _node_t *left ; 7 struct _node_t *right ; 8 void *data ;//for nodes array list finding next empty node 9 int index ;10 }mc_radix_node_t ;
节点的结构定义如上。
zo 可以忽略,父节点,坐指针,右指针顾名思义,data 用于保存数据的指针,index 是作为 node 池的数组的下标。
树的结构定义如下:
1 ypedef struct _radix_t 2 { 3 mc_radix_nodes_array_t * nodes ; 4 mc_radix_node_t * root ; 5 6 mc_slab_t * slab ; 7 8 9 /*10 pthread_mutex_t lock ;11 */12 int magic ;13 int totalnum ;14 size_t pool_nodenum ;15 16 mc_item_queue queue ;17 }mc_radix_t ;
暂且不用看 nodes 的结构,这里只是作为一个node池的指针
root 指针顾名思义是指向根结构,slab 是作为存放数据时候的内存分配器,如果要使用内存管理来减少开销的话(参见slab内存分配器一章)
magic用来判断是否初始化,totalnum 是叶节点个数,poll_nodenum 是节点池内节点的个数。
queue是作为数据项中数据的队列。
我们采用8421编码的宏来作为每一个二进制位的判断:
#define U01_MASK 0x80000000#define U02_MASK 0x40000000#define U03_MASK 0x20000000#define U04_MASK 0x10000000 . . . .
#define U31_MASK 0x00000002
#define U32_MASK 0x00000001类似这样的方式来对每一位二进制位做判断,还有其他更好的办法,这里只是作为简化和快速。
unsigned int MASKARRAY[32] = { U01_MASK,U02_MASK,U03_MASK,U04_MASK,U05_MASK,U06_MASK,U07_MASK,U08_MASK, U09_MASK,U10_MASK,U11_MASK,U12_MASK,U13_MASK,U14_MASK,U15_MASK,U16_MASK, U17_MASK,U18_MASK,U19_MASK,U20_MASK,U21_MASK,U22_MASK,U23_MASK,U24_MASK, U25_MASK,U26_MASK,U27_MASK,U28_MASK,U29_MASK,U30_MASK,U31_MASK,U32_MASK};
我们为Radix 提供了一些静态函数,不对外声明:
初始化节点池
static int mc_radix_nodes_ini(mc_radix_nodes_array_t *par_nodearray ,size_t par_maxnum )
取得一个节点:
static mc_radix_node_t *mc_get_radix_node(mc_radix_nodes_array_t *par_nodearray )
归还一个节点:
static void mc_free_radix_node( mc_radix_nodes_array_t *par_nodearray , mc_radix_node_t * par_free_node )
这里是初始化radix 树:
1 int mc_radix_hash_ini(mc_radix_t *t ,size_t nodenum ) 2 { 3 /* init the node pool */ 4 t->nodes = (mc_radix_nodes_array_t *)malloc( sizeof(mc_radix_nodes_array_t) ); //为节点池分配空间 5 t->slab = mc_slab_create(); //使用slab分配器 6 mc_radix_nodes_ini( t->nodes , nodenum ); //初始化节点 7 t->magic = MC_MAGIC ; 8 t->totalnum = 0 ; 9 t->pool_nodenum = nodenum ;10 t->root = NULL ;11 12 13 t->queue.head = NULL ;14 t->queue.pear = NULL ;15 t->queue.max_num = nodenum ;16 t->queue.cur_num = 0 ;17 }
1 int mc_radix_hash_insert( mc_radix_t *t , unsigned int hashvalue , void *data ,size_t size ) 2 { 3 unsigned int i = 0 ; 4 mc_radix_node_t * root = t->root ; 5 6 if( t->root == NULL ) 7 { 8 t->root = mc_get_radix_node( t->nodes ) ; 9 }10 11 /* LRU */12 /*其中涉及到LRU算法,原理是将所有的叶子节点链接为双向队列,然后更新和插入放入队列头,按照一定的比例从队列尾删除数据*/13 if( t->queue.cur_num >= (t->queue.max_num)*PERCENT )14 {15 for( i = 0 ; i < (t->queue.max_num)*(1-PERCENT) ; i++ )16 {17 mc_del_item( t , t->queue.pear );18 }19 }20 mc_radix_node_t * cur = t->root ;21 for(i = 0 ; i < 32 ; i++ )22 {23 /* 1 ---> right */24 /*按位来探测树节点*/25 if( hashvalue & MASKARRAY[i] )26 {27 28 if( cur -> right != NULL )29 {30 cur->used_num++ ;31 cur->right->parent = cur ;32 cur = cur->right ; 33 }34 else35 {36 cur->right = mc_get_radix_node( t->nodes ) ;37 if( cur->right == NULL )38 {39 fprintf(stderr,"mc_get_radix_node error\n");40 return -1;41 }42 cur->used_num++ ;43 cur->right->parent = cur ;44 cur = cur->right ;45 }46 }47 /* 0 ---> left */48 else49 {50 51 if( cur->left != NULL )52 {53 cur->used_num++;54 cur->left->parent = cur ;55 cur = cur->left ;56 }57 else58 {59 cur->left = mc_get_radix_node( t->nodes ) ;60 if( cur->left == NULL )61 {62 fprintf(stderr,"mc_get_radix_node error\n");63 return -1;64 }65 66 cur->used_num++;67 cur->left->parent = cur ;68 cur = cur->left ;69 }70 } 71 }72 73 t->totalnum ++ ;74 mc_slot_t * l_slot = mc_slot_alloc( t->slab, size ) ;75 cur->data = ( mc_slot_t *)(cur->data);76 memcpy( l_slot->star , data , size );77 cur->data = l_slot ;78 79 /*add to t->queue */80 if( t->queue.head == NULL )81 {82 t->queue.head = cur ;83 t->queue.pear = cur ;84 cur->left = NULL ;85 cur->right = NULL ;86 87 t->queue.cur_num++ ;88 }89 else90 {91 cur->left = NULL ;92 cur->right = t->queue.head ;93 t->queue.head->left = cur ;94 t->queue.head = cur ;95 96 t->queue.cur_num++ ;97 }98 return 1;99 }
删除一个节点,通过hashvalue作为其value,顾名思义
1 int mc_radix_hash_del( mc_radix_t *t , unsigned int hashvalue ) 2 { 3 if( t == NULL || t->root == NULL ) 4 { 5 return -1; 6 } 7 /* non initialized */ 8 if( t->magic != MC_MAGIC ) 9 { 10 return -1;11 }12 mc_radix_node_t * cur = t->root ; 13 mc_radix_node_t * cur_par ;14 int i = 0 ;15 for( ; i < 32 ; i++ )16 {17 if( hashvalue & MASKARRAY[i] )18 {19 20 if( cur->right != NULL )21 {22 cur->used_num-- ;23 cur = cur->right ;24 }25 else26 return -1;27 }28 else29 {30 31 if( cur->left != NULL )32 {33 cur->used_num-- ;34 cur = cur->left ;35 }36 else37 return -1;38 }39 }40 41 if( cur->used_num >= 0 )42 mc_slot_free(cur->data);43 44 /*remove from t->queue */45 if( cur == t->queue.pear && cur == t->queue.head )46 {47 t->queue.pear = NULL ;48 t->queue.head = NULL ;49 t->queue.cur_num -- ;50 }51 /* the last item */52 else if( cur == t->queue.pear && cur != t->queue.head)53 {54 cur->left->right = NULL ;55 cur->left = NULL ;56 t->queue.cur_num -- ;57 }58 else if( cur != t->queue.pear )59 {60 cur->left->right = cur->right ;61 cur->right->left = cur->left ;62 t->queue.cur_num -- ;63 }64 else65 {66 cur->left->right = cur->right ;67 cur->right->left = cur->left ;68 t->queue.cur_num -- ;69 }70 71 for(;;)72 {73 74 if( cur->used_num == 0 )75 {76 cur_par = cur->parent ;77 mc_free_radix_node( t->nodes , cur );78 cur = cur_par ;79 }80 if( cur == NULL )81 break ;82 if( cur->used_num > 0 )83 break ;84 85 }86 87 return 1;88 89 }
取得值:通过void * 指向
1 void *mc_radix_hash_get( mc_radix_t *t , unsigned int hashvalue ) 2 { 3 if( t == NULL || t->root == NULL ) 4 { 5 fprintf(stderr,"t == NULL || t->root == NULL\n"); 6 return (void *)(0); 7 } 8 /* non initialized */ 9 if( t->magic != MC_MAGIC )10 { 11 fprintf(stderr,"t->magic != MC_MAGIC\n");12 return (void *)(0);13 }14 mc_radix_node_t * cur = t->root ; 15 mc_slot_t *ret_slot ;16 int i = 0 ; 17 for( ; i < 32 ; i++ )18 {19 if( hashvalue & MASKARRAY[i] )20 {21 if( cur->right == NULL )22 break;23 else24 cur = cur->right ;25 }26 else27 {28 if( cur->left == NULL )29 break;30 else31 cur = cur->left ;32 }33 }34 if( i == 32 )35 {36 ret_slot = cur->data;37 38 /* update LRU queue*/39 if( cur->left != NULL )40 {41 if( cur->right != NULL )42 {43 cur->left->right = cur->right ;44 cur->right->left = cur->left ;45 cur->left = t->queue.head ;46 t->queue.head->left = cur ;47 t->queue.head = cur ;48 }49 else50 {51 /* cur->right == NULL last element of LRU queue */52 cur->left->right = NULL ;53 cur->left = t->queue.head ;54 t->queue.head->left = cur ;55 t->queue.head = cur ;56 57 }58 }59 return (void *)(ret_slot->star) ;60 }61 else62 {63 fprintf(stderr,"i = %d \n",i);64 return (void *)(0);65 }66 }
1 int mc_free_radix( mc_radix_t *t ) 2 { 3 mc_free_all_radix_node(t->nodes); 4 mc_slab_free(t->slab); 5 free(t->nodes); 6 } 7 8 static void mc_del_item( mc_radix_t *t , mc_radix_node_t * cur ) 9 {10 if( cur->left == NULL )11 {12 fprintf(stderr,"item number in LRU queue is too small \n");13 return ;14 }15 if( cur->right != NULL )16 {17 fprintf(stderr,"cur should be the last of LRU queue \n");18 }19 /* remove from LRU queue */20 mc_radix_node_t * pcur = cur->left ;21 cur->left = NULL ;22 pcur->right = NULL ;23 24 pcur = cur->parent ;25 /* remove from radix tree */26 while( pcur != NULL )27 {28 cur->used_num -- ;29 if( cur->used_num <=0 )30 {31 mc_free_radix_node( t->nodes , cur );32 }33 cur = pcur ;34 pcur = pcur->parent ;35 } 36 37 }
总结:radix 树作为key-value 路由最大的好处就是在于减少了hash表的动态和一部分碰撞问题等。还可以在此结构上方便的扩展 LRU算法,淘汰数据等。
如果担心node 的初始化和申请太过于浪费资源,可以采用节点池的方式设计。