500行代码了解Mecached缓存客户端驱动原理

2022/2/14 6:12:23

本文主要是介绍500行代码了解Mecached缓存客户端驱动原理,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

原创不易,求分享、求一键三连

缓存一般是用来加速数据访问的效率,在获取数据耗时高的场景下使用缓存可以有效的提高数据获取的效率。

比如,先从memcached中获取数据,如果没有则查询mysql中的数据得到结果写入到memcached中然后返回,下次请求就能够从memcached中获取数据直接返回。

在行业中使用比较多的缓存数据库有Redis和Memcached。

今天用go实现Memcached的驱动程序,深入了解Memcached和咱们平时所写的业务代码如何进行数据交互的协议和原理。

什么是memcached

Memcached是LiveJournal旗下Danga Interactive公司的Brad Fitzpatric为首开发的一款自由开源、高性能的key-value缓存数据库软件。

Mecached协议

Mecached服务和应用程序是不同机器不同的进程,双方进行数据交互通讯涉及到tcp和通讯协议,在memcache中协议有两种类型一种是文本行方式,另一种是非结构化数据方式。

我们挑选文本行协议来实现,大多数Mecached的客户端也是采用文本行协议来开发的因为比较简单,特定格式文本字符串来约定数据交互,如下以客户端发送命令举例:

<command name> <key> <flags> <exptime> <bytes>\r\n
<data block>\r\n

<command name> 是协议的命令,大致分为三类:

  1. 存储命令:set、 add、replace、append、prepend、cas。
  2. 获取命令:get、gets。
  3. 其他命令:version、stats

<key>

要求存储数据的关键字;由于memached底层实现的限制,key的长度限制在250个字符内,并且key中不能包含控制字符或空格。

<flags>

是一个16位无符号整数。

<exptime>

是存储超时时间。如果值为0表示该数据项永不超时;

过期时间实现限制,过期时间要么是Unix时间(从1970-1-1开始计算的秒数),要么是从当前时间开始计算的秒数。

该值不能超过30天,否则服务器将该参数当做真正的Unix时间而不是当前时间的一个偏移值。

<bytes>

是随后数据的字节数,不包括终结符\r\n。<bytes>有可能是0,它后面将是一个空的数据块。

<data block>

存储数据流。

客户端以字符串的方式向服务端发送文本行的内容服务器端会返回对应的执行结果数据,也有返回错误的情况,memcache也对错误的数据格式定义三种不同错误类型的三种格式让错误的返回简单:

  1. ERROR\r\n

说明客户端发送了一个不存在命令

  1. CLIENT_ERROR\r\n

说明在输入行中存在某种类型的客户端错误,例如输入的信息没有遵循memcached的协议

  1. SERVER_ERROR\r\n

说明服务端存在某种类型的错误导致致命命令无法执行。

<error>是具有可读性的错误字符串。

当服务端错误发生后,将会导致服务器将不会再提供服务,服务器在发送该错误信息行后将关闭链接。只有在此场景下,服务器才会关闭与客户端链接。

以下就具体罗列memcached常用包含客户端发送和响应的命令格式列表:

需要注意的是command是区分大小写的,客户端使用tcp连接服务端发送客户端文本行命令,发送成功后等待服务器返回数据,根据格式解析获取需要的返回值这就是一个简单的协议命令执行流程。

Golang实现客户端驱动

有了对memache协议的了解现在来实现通讯就比较简单,首先需要定义Client结构体,保存客户端一些基本配置信息及链接信息:

type Client struct {
   Timeout      time.Duration
   MaxIdleConns int
   lock         sync.Mutex
   addr         net.Addr
   conns        []*conn
}  
  • Timeout tcp链接读写超时
  • conns 是memcache链接池存放的数组
  • MaxIdleConns 是Idle链接的数量
  • lock 是操作conns时加锁
  • addr则是链接的memcache的地址

memcached的单独一个conn连接结构体定义

type conn struct {
   nc   net.Conn
   rw   *bufio.ReadWriter
   addr net.Addr
   c    *Client
}  
  • nc 是建立好的tcp网络链接
  • rw 为了方便数据发送和读取设置bufio的ReadWriter
  • addr 存储memcached地址
  • c 存储客户端的引用

下面是看如何获取链接和使用完之后如何将链接放回到链接池中

//获取memcached的链接
func (c *Client) getFreeConn() (cn *conn, ok bool) {
   c.lock.Lock()
   defer c.lock.Unlock()
   if c.conns == nil {
      return nil, false
   }
   freelist := c.conns
   if len(freelist) == 0 {
      return nil, false
   }
   cn = freelist[len(freelist)-1]
   c.conns = freelist[:len(freelist)-1]
   return cn, true
}
//将使用完的链接放回到conns中
func (c *Client) putFreeConn(cn *conn) {
   c.lock.Lock()
   defer c.lock.Unlock()
   if c.conns == nil {
      c.conns = make([]*conn, 0)
   }
   freelist := c.conns
   if len(freelist) >= c.maxIdleConns() {
      cn.nc.Close()
      return
   }
   c.conns = append(freelist, cn)
}

接下来以GET命令为例,来详细看如何进行网络传输和协议解析的实现

func (c *Client) Get(key string) (item *Item, err error) {
   //check key len 验证key是否长于250字符
   if !legalKey(key) {
      err = ErrMalformedKey
      return
   }
   keys := []string{key}
   cn, err := c.getConn() //获取memcached链接
   defer cn.condRelease(&err) // 方法执行完之后将链接release,返回到链接池中
   if err != nil {
      return
   }
   rw := cn.rw
   //将gets 命令用文本行协议写入到rw中
   if _, err = fmt.Fprintf(rw, "gets %s\r\n", strings.Join(keys, " ")); err != nil {
      return
   }
   if err = rw.Flush(); err != nil {
      return
   }
   //获取GET命令发送之后等待和获取返回的响应数据
   if err = parseGetResponse(rw.Reader, func(it *Item) { item = it }); err != nil {
      return
   }
   if item == nil {
      err = ErrCacheMiss
   }
   return
}

func parseGetResponse(r *bufio.Reader, cb func(*Item)) error {
 for {
  line, err := r.ReadSlice('\n')
  if err != nil {
   return err
  }
  if bytes.Equal(line, resultEnd) { //如果获取是 END\r\n 则数据返回完,则返回
   return nil
  }
  it := new(Item)
  size, err := scanGetResponseLine(line, it)//先根据格式获取第一行数据和<data> 部分的大小
  if err != nil {
   return err
  }
    //根据bytes获取数据
  it.Value = make([]byte, size+2)
  _, err = io.ReadFull(r, it.Value)
  if err != nil {
   it.Value = nil
   return err
  }
  if !bytes.HasSuffix(it.Value, crlf) {
   it.Value = nil
   return fmt.Errorf("memcache: corrupt get result read")
  }
  it.Value = it.Value[:size]
  cb(it)
 }
}

//根据返回数据格式获取返回值设置到Item结构中。
func scanGetResponseLine(line []byte, it *Item) (size int, err error) {
 // 返回的数据格式 VALUE <key> <falgs> <bytes> <casid>
  pattern := "VALUE %s %d %d %d\r\n"
 dest := []interface{}{&it.Key, &it.Flags, &size, &it.casid}
 if bytes.Count(line, space) == 3 {
  pattern = "VALUE %s %d %d\r\n"
  dest = dest[:3]
 }
 n, err := fmt.Sscanf(string(line), pattern, dest...)
 if err != nil || n != len(dest) {
  return -1, fmt.Errorf("memcache: unexpected line in get response: %q", line)
 }
 return size, nil
}

//判断key是否符合要求
func legalKey(key string) bool {
 if len(key) > 250 {
  return false
 }
 for i := 0; i < len(key); i++ {
  if key[i] <= ' ' || key[i] == 0x7f {
   return false
  }
 }
 return true
}  

其它命令不在详细描述,完整代码如下:

  1 package memcache
  2 
  3 import (
  4     "bufio"
  5     "bytes"
  6     "errors"
  7     "fmt"
  8     "io"
  9     "net"
 10     "strconv"
 11     "strings"
 12     "sync"
 13     "time"
 14 )
 15 
 16 //memcached -m 1024  -u root -l 127.0.0.1 -p 12001 -c 55535
 17 //# memcached -d -m 10  -u root -l 127.0.0.1 -p 12001 -c 256 -P /tmp/memcached.pid
 18 //-d选项是启动一个守护进程
 19 //-m是分配给Memcache使用的内存数量,单位是MB,我这里是10MB
 20 //-u是运行Memcache的用户
 21 //-l是监听的服务器IP地址,如果有多个地址的话,我这里指定了服务器的IP地址127.0.0.1
 22 //-p是设置 Memcache监听的端口,我这里设置了12001,最好是1024以上的端口
 23 //-c选项是最大运行的并发连接数,默认是1024,我这里设置了 256,按照你服务器的负载量来设定
 24 //-P是设置保存Memcache的pid文件,我这里是保存在 /tmp/memcached.pid
 25 //停止进程:# kill `cat /tmp/memcached.pid`
 26 
 27 const (
 28     DefaultTimeout      = 100 * time.Millisecond
 29     DefaultMaxIdleConns = 40
 30 )
 31 
 32 var (
 33     resultClientErrorPrefix = []byte("CLIENT_ERROR")
 34     resultErrPrefix         = []byte("ERROR")
 35     resultServerErrPrefix   = []byte("SERVER_ERROR")
 36 
 37     crlf            = []byte("\r\n")
 38     space           = []byte(" ")
 39     resultOK        = []byte("OK\r\n")
 40     resultStored    = []byte("STORED\r\n")
 41     resultNotStored = []byte("NOT_STORED\r\n")
 42     resultExists    = []byte("EXISTS\r\n")
 43     resultNotFound  = []byte("NOT_FOUND\r\n")
 44     resultDeleted   = []byte("DELETED\r\n")
 45     resultEnd       = []byte("END\r\n")
 46     resultTouched   = []byte("TOUCHED\r\n")
 47     versionPrefix   = []byte("VERSION")
 48 
 49     ErrMalformedKey = errors.New("malformed: key is too long or contains invalid characters")
 50     ErrCacheMiss    = errors.New("memcache: cache miss")
 51     ErrCASConflict  = errors.New("memcache: compare-and-swap conflict")
 52     ErrNotStored    = errors.New("memcache: item not stored")
 53 )
 54 
 55 type Client struct {
 56     Timeout      time.Duration
 57     MaxIdleConns int
 58     lock         sync.Mutex
 59     addr         net.Addr
 60     conns        []*conn
 61 }
 62 
 63 func NewClient(timeout time.Duration, maxIdleConns int, addr net.Addr) *Client {
 64     return &Client{
 65         Timeout:      timeout,
 66         MaxIdleConns: maxIdleConns,
 67         lock:         sync.Mutex{},
 68         addr:         addr,
 69         conns:        nil,
 70     }
 71 }
 72 
 73 type Item struct {
 74     // Key is the Item's key (250 bytes maximum).
 75     Key string
 76     // Value is the Item's value.
 77     Value []byte
 78     // Flags are server-opaque flags whose semantics are entirely
 79     // up to the app.
 80     Flags uint32
 81     // Expiration is the cache expiration time, in seconds: either a relative
 82     // time from now (up to 1 month), or an absolute Unix epoch time.
 83     // Zero means the Item has no expiration time.
 84     Expiration int32
 85     // Compare and swap ID.
 86     casid uint64
 87 }
 88 
 89 func (c *Client) maxIdleConns() int {
 90     if c.MaxIdleConns > 0 {
 91         return c.MaxIdleConns
 92     }
 93     return DefaultMaxIdleConns
 94 }
 95 
 96 func (c *Client) netTimeout() time.Duration {
 97     if c.Timeout != 0 {
 98         return c.Timeout
 99     }
100     return DefaultTimeout
101 }
102 
103 type conn struct {
104     nc   net.Conn
105     rw   *bufio.ReadWriter
106     addr net.Addr
107     c    *Client
108 }
109 
110 // 设置超时时间
111 func (cn *conn) extendDeadline() {
112     cn.nc.SetDeadline(time.Now().Add(cn.c.netTimeout()))
113 }
114 
115 // Release 如果是正常的err 则放回到conns中,如果不是这直接close掉conn
116 func (cn *conn) condRelease(err *error) {
117     if *err == nil || resumableError(*err) {
118         cn.release()
119     } else {
120         fmt.Println("xxx", fmt.Sprintf("%s", (*err).Error()))
121         cn.nc.Close()
122     }
123 }
124 
125 // release returns this connection back to the client's free pool
126 func (cn *conn) release() {
127     cn.c.putFreeConn(cn)
128 }
129 
130 func (c *Client) putFreeConn(cn *conn) {
131     c.lock.Lock()
132     defer c.lock.Unlock()
133     if c.conns == nil {
134         c.conns = make([]*conn, 0)
135     }
136     freelist := c.conns
137     if len(freelist) >= c.maxIdleConns() {
138         cn.nc.Close()
139         return
140     }
141     c.conns = append(freelist, cn)
142 }
143 
144 func (c *Client) getFreeConn() (cn *conn, ok bool) {
145     c.lock.Lock()
146     defer c.lock.Unlock()
147     if c.conns == nil {
148         return nil, false
149     }
150     freelist := c.conns
151     if len(freelist) == 0 {
152         return nil, false
153     }
154     cn = freelist[len(freelist)-1]
155     c.conns = freelist[:len(freelist)-1]
156     return cn, true
157 }
158 
159 type ConnectTimeoutError struct {
160     Addr net.Addr
161 }
162 
163 func (cte *ConnectTimeoutError) Error() string {
164     return "memcache: connect timeout to " + cte.Addr.String()
165 }
166 
167 //获取memcached连接
168 func (c *Client) getConn() (*conn, error) {
169     cn, ok := c.getFreeConn()
170     if ok {
171         cn.extendDeadline()
172         return cn, nil
173     }
174     nc, err := c.dial(c.addr)
175     if err != nil {
176         return nil, err
177     }
178     cn = &conn{
179         nc:   nc,
180         addr: c.addr,
181         rw:   bufio.NewReadWriter(bufio.NewReader(nc), bufio.NewWriter(nc)),
182         c:    c,
183     }
184     cn.extendDeadline()
185     return cn, nil
186 }
187 
188 func (c *Client) dial(addr net.Addr) (net.Conn, error) {
189     nc, err := net.DialTimeout(addr.Network(), addr.String(), c.netTimeout())
190     if err == nil {
191         return nc, nil
192     }
193     if ne, ok := err.(net.Error); ok && ne.Timeout() {
194         return nil, &ConnectTimeoutError{Addr: addr}
195     }
196     return nil, err
197 }
198 
199 func (c *Client) Get(key string) (item *Item, err error) {
200     //check key len
201     if !legalKey(key) {
202         err = ErrMalformedKey
203         return
204     }
205     keys := []string{key}
206     cn, err := c.getConn()
207     defer cn.condRelease(&err)
208     if err != nil {
209         return
210     }
211     rw := cn.rw
212     if _, err = fmt.Fprintf(rw, "gets %s\r\n", strings.Join(keys, " ")); err != nil {
213         return
214     }
215     if err = rw.Flush(); err != nil {
216         return
217     }
218     if err = parseGetResponse(rw.Reader, func(it *Item) { item = it }); err != nil {
219         return
220     }
221     if item == nil {
222         err = ErrCacheMiss
223     }
224     return
225 }
226 
227 func (c *Client) GetMulti(keys []string) (map[string]*Item, error) {
228     var lk sync.Mutex
229     m := make(map[string]*Item)
230     addItemToMap := func(it *Item) {
231         lk.Lock()
232         defer lk.Unlock()
233         m[it.Key] = it
234     }
235     for _, key := range keys {
236         if !legalKey(key) {
237             return nil, ErrMalformedKey
238         }
239     }
240     cn, err := c.getConn()
241     defer cn.condRelease(&err)
242     if err != nil {
243         return nil, err
244     }
245     if _, err = fmt.Fprintf(cn.rw, "gets %s\r\n", strings.Join(keys, " ")); err != nil {
246         return nil, err
247     }
248     if err = cn.rw.Flush(); err != nil {
249         return nil, err
250     }
251     if err = parseGetResponse(cn.rw.Reader, addItemToMap); err != nil {
252         return nil, err
253     }
254     return m, err
255 
256 }
257 
258 func (c *Client) Touch(key string, seconds int32) (err error) {
259 
260     cn, err := c.getConn()
261     if err != nil {
262         return
263     }
264     defer cn.condRelease(&err)
265 
266     if _, err = fmt.Fprintf(cn.rw, "touch %s %d\r\n", key, seconds); err != nil {
267         return
268     }
269     if err = cn.rw.Flush(); err != nil {
270         return
271     }
272     line, err := cn.rw.ReadSlice('\n')
273     if err != nil {
274         return
275     }
276     switch {
277     case bytes.Equal(line, resultTouched):
278         break
279     case bytes.Equal(line, resultNotFound):
280         return ErrCacheMiss
281     default:
282         return fmt.Errorf("memcache: unexpected response line from touch: %q", string(line))
283     }
284     return nil
285 }
286 
287 func (c *Client) Add(item *Item) error {
288     return c.onItem(item, func(client *Client, rw *bufio.ReadWriter, item *Item) error {
289         return client.populateOne(rw, "add", item)
290     })
291 }
292 
293 func (c *Client) Set(item *Item) error {
294     return c.onItem(item, func(client *Client, rw *bufio.ReadWriter, item *Item) error {
295         return client.populateOne(rw, "set", item)
296     })
297 }
298 
299 func (c *Client) CompareAndSwap(item *Item) error {
300     return c.onItem(item, func(client *Client, rw *bufio.ReadWriter, item *Item) error {
301         return client.populateOne(rw, "cas", item)
302     })
303 }
304 
305 func (c *Client) Replace(item *Item) error {
306     return c.onItem(item, func(client *Client, rw *bufio.ReadWriter, item *Item) error {
307         return client.populateOne(rw, "replace", item)
308     })
309 }
310 
311 func (c *Client) Delete(key string) error {
312     if !legalKey(key) {
313         return ErrMalformedKey
314     }
315     cn, err := c.getConn()
316     if err != nil {
317         return err
318     }
319     defer cn.condRelease(&err)
320     return writeExpectf(cn.rw, resultDeleted, "delete %s\r\n", key)
321 }
322 
323 func (c *Client) FlushAll() error {
324     cn, err := c.getConn()
325     if err != nil {
326         return err
327     }
328     defer cn.condRelease(&err)
329     return writeExpectf(cn.rw, resultDeleted, "flush_all\r\n")
330 }
331 
332 func (c *Client) Version() error {
333     cn, err := c.getConn()
334     defer cn.condRelease(&err)
335     if err != nil {
336         return err
337     }
338     return func(rw *bufio.ReadWriter) error {
339         if _, e := fmt.Fprintf(rw, "version\r\n"); e != nil {
340             return err
341         }
342         if e := rw.Flush(); e != nil {
343             return e
344         }
345         line, e := rw.ReadSlice('\n')
346         if e != nil {
347             return e
348         }
349         switch {
350         case bytes.HasPrefix(line, versionPrefix):
351             break
352         default:
353             return fmt.Errorf("memcache: unexpected response line from ping: %q", string(line))
354         }
355         return nil
356     }(cn.rw)
357 
358 }
359 
360 func (c *Client) Increment(key string, delta uint64) (newValue uint64, err error) {
361     return c.incrDecr("incr", key, delta)
362 
363 }
364 
365 func (c *Client) Decrement(key string, delta uint64) (newValue uint64, err error) {
366     return c.incrDecr("decr", key, delta)
367 }
368 
369 func (c *Client) onItem(item *Item, fn func(*Client, *bufio.ReadWriter, *Item) error) error {
370     cn, err := c.getConn()
371     defer cn.condRelease(&err)
372     if err != nil {
373         return err
374     }
375     if err = fn(c, cn.rw, item); err != nil {
376         return err
377     }
378     return nil
379 }
380 
381 func parseGetResponse(r *bufio.Reader, cb func(*Item)) error {
382     for {
383         line, err := r.ReadSlice('\n')
384         if err != nil {
385             return err
386         }
387         if bytes.Equal(line, resultEnd) {
388             return nil
389         }
390         it := new(Item)
391         size, err := scanGetResponseLine(line, it)
392         if err != nil {
393             return err
394         }
395         it.Value = make([]byte, size+2)
396         _, err = io.ReadFull(r, it.Value)
397         if err != nil {
398             it.Value = nil
399             return err
400         }
401         if !bytes.HasSuffix(it.Value, crlf) {
402             it.Value = nil
403             return fmt.Errorf("memcache: corrupt get result read")
404         }
405         it.Value = it.Value[:size]
406         cb(it)
407     }
408 }
409 
410 func scanGetResponseLine(line []byte, it *Item) (size int, err error) {
411     pattern := "VALUE %s %d %d %d\r\n"
412     dest := []interface{}{&it.Key, &it.Flags, &size, &it.casid}
413     if bytes.Count(line, space) == 3 {
414         pattern = "VALUE %s %d %d\r\n"
415         dest = dest[:3]
416     }
417     n, err := fmt.Sscanf(string(line), pattern, dest...)
418     if err != nil || n != len(dest) {
419         return -1, fmt.Errorf("memcache: unexpected line in get response: %q", line)
420     }
421     return size, nil
422 }
423 
424 func legalKey(key string) bool {
425     if len(key) > 250 {
426         return false
427     }
428     for i := 0; i < len(key); i++ {
429         if key[i] <= ' ' || key[i] == 0x7f {
430             return false
431         }
432     }
433     return true
434 }
435 
436 func resumableError(err error) bool {
437     switch err {
438     case ErrCacheMiss, ErrCASConflict, ErrNotStored, ErrMalformedKey:
439         return true
440     }
441     return false
442 }
443 
444 func (c *Client) populateOne(rw *bufio.ReadWriter, verb string, item *Item) error {
445     if !legalKey(item.Key) {
446         return ErrMalformedKey
447     }
448     var err error
449     if verb == "cas" {
450         _, err = fmt.Fprintf(rw, "%s %s %d %d %d %d\r\n",
451             verb, item.Key, item.Flags, item.Expiration, len(item.Value), item.casid)
452     } else {
453         _, err = fmt.Fprintf(rw, "%s %s %d %d %d\r\n",
454             verb, item.Key, item.Flags, item.Expiration, len(item.Value))
455     }
456     if err != nil {
457         return err
458     }
459     if _, err = rw.Write(item.Value); err != nil {
460         return err
461     }
462     if _, err = rw.Write(crlf); err != nil {
463         return err
464     }
465     if err = rw.Flush(); err != nil {
466         return err
467     }
468     line, err := rw.ReadSlice('\n')
469     if err != nil {
470         return err
471     }
472     switch {
473     case bytes.Equal(line, resultStored):
474         return nil
475     case bytes.Equal(line, resultNotStored):
476         return ErrNotStored
477     case bytes.Equal(line, resultExists):
478         return ErrCASConflict
479     case bytes.Equal(line, resultNotFound):
480         return ErrCacheMiss
481     }
482     return fmt.Errorf("memcache: unexpected response line from %q: %q", verb, string(line))
483 }
484 
485 func writeExpectf(rw *bufio.ReadWriter, expect []byte, format string, args ...interface{}) error {
486     line, err := writeReadLine(rw, format, args...)
487     if err != nil {
488         return err
489     }
490     switch {
491     case bytes.Equal(line, resultOK):
492         return nil
493     case bytes.Equal(line, expect):
494         return nil
495     case bytes.Equal(line, resultNotStored):
496         return ErrNotStored
497     case bytes.Equal(line, resultExists):
498         return ErrCASConflict
499     case bytes.Equal(line, resultNotFound):
500         return ErrCacheMiss
501     }
502     return fmt.Errorf("memcache: unexpected response line: %q", string(line))
503 }
504 
505 func writeReadLine(rw *bufio.ReadWriter, format string, args ...interface{}) ([]byte, error) {
506     _, err := fmt.Fprintf(rw, format, args...)
507     if err != nil {
508         return nil, err
509     }
510     if e := rw.Flush(); e != nil {
511         return nil, e
512     }
513     line, err := rw.ReadSlice('\n')
514     return line, err
515 }
516 
517 func (c *Client) incrDecr(verb, key string, delta uint64) (uint64, error) {
518     var val uint64
519     cn, err := c.getConn()
520     defer cn.condRelease(&err)
521     if err != nil {
522         return 0, err
523     }
524     func(rw *bufio.ReadWriter) error {
525         line, e := writeReadLine(rw, "%s %s %d\r\n", verb, key, delta)
526         if e != nil {
527             return e
528         }
529         switch {
530         case bytes.Equal(line, resultNotFound):
531             return ErrCacheMiss
532         case bytes.HasPrefix(line, resultClientErrorPrefix):
533             errMsg := line[len(resultClientErrorPrefix) : len(line)-2]
534             return errors.New("memcache: client error: " + string(errMsg))
535         }
536         val, e = strconv.ParseUint(string(line[:len(line)-2]), 10, 64)
537         if e != nil {
538             return e
539         }
540         return nil
541     }(cn.rw)
542     return val, err
543 }
View Code

测试代码,启动一个http服务器先设置到memcached中,通过/hello 接口从memcached中获取对应的值

package main

import (
 "fmt"
 "memcache_go/memcache"
 "net"
 "net/http"
 "time"
)

var client *memcache.Client

func IndexHandler(w http.ResponseWriter, r *http.Request) {
 ret, err := client.Get("tcp_key")
 if err != nil {
  fmt.Fprintln(w, "err ")
 }
 str := ""
 if ret != nil {
  str = string(ret.Value)
  fmt.Fprintln(w, "hello world", str)
 } else {
  fmt.Fprintln(w, "nil")
 }
}

func touchHandler(w http.ResponseWriter, r *http.Request) {
 err := client.Touch("tcp_key", 1000)
 if err != nil {
  fmt.Fprintln(w, "err ")
  return
 }
 fmt.Fprintln(w, "succ")
}

func main() {
 addr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:12001")
 if err != nil {
  fmt.Println(fmt.Sprintf("get err %v", err))
  return
 }
 client = memcache.NewClient(30*time.Second, 30, addr)

 err = client.Add(&memcache.Item{
  Key:        "tcp_key",
  Value:      []byte(fmt.Sprintf("tcp_key_value_%d", time.Now().UnixNano())),
  Flags:      0,
  Expiration: 0,
 })
 if err != nil {
  fmt.Println("执行失败")
  //return
 }
 http.HandleFunc("/get", IndexHandler)
 http.HandleFunc("/touch", touchHandler)
 http.ListenAndServe("127.0.0.1:8000", nil)
 //fmt.Println("memcache_test...")
 for {
  time.Sleep(1000 * 30)
 }
}  

启动memcached服务进行测试

memcached -m 1024  -u root -l 127.0.0.1 -p 12001 -c 55535
//-d选项是启动一个守护进程
//-m是分配给Memcache使用的内存数量,单位是MB,我这里是10MB
//-u是运行Memcache的用户
//-l是监听的服务器IP地址,如果有多个地址的话,我这里指定了服务器的IP地址127.0.0.1
//-p是设置 Memcache监听的端口,我这里设置了12001,最好是1024以上的端口
//-c选项是最大运行的并发连接数,默认是1024,我这里设置了 256,按照你服务器的负载量来设定
//-P是设置保存Memcache的pid文件,我这里是保存在 /tmp/memcached.pid
//停止进程:# kill `cat /tmp/memcached.pid`  

到这里我们大概用了几百行代码实现了一个简单的memcached链接驱动的子集,对应用程序和memcached如何通讯有了大致了解。

好了,今天的分享就到这,希望对各位有用。 「原创不易,多多分享」

想要更多交流可以加群:

也可以加入知识星球免费提问:



这篇关于500行代码了解Mecached缓存客户端驱动原理的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程