conn.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765
  1. package nsq
  2. import (
  3. "bufio"
  4. "bytes"
  5. "compress/flate"
  6. "crypto/tls"
  7. "encoding/json"
  8. "errors"
  9. "fmt"
  10. "io"
  11. "net"
  12. "strings"
  13. "sync"
  14. "sync/atomic"
  15. "time"
  16. "github.com/golang/snappy"
  17. )
  18. // IdentifyResponse represents the metadata
  19. // returned from an IDENTIFY command to nsqd
  20. type IdentifyResponse struct {
  21. MaxRdyCount int64 `json:"max_rdy_count"`
  22. TLSv1 bool `json:"tls_v1"`
  23. Deflate bool `json:"deflate"`
  24. Snappy bool `json:"snappy"`
  25. AuthRequired bool `json:"auth_required"`
  26. }
  27. // AuthResponse represents the metadata
  28. // returned from an AUTH command to nsqd
  29. type AuthResponse struct {
  30. Identity string `json:"identity"`
  31. IdentityUrl string `json:"identity_url"`
  32. PermissionCount int64 `json:"permission_count"`
  33. }
  34. type msgResponse struct {
  35. msg *Message
  36. cmd *Command
  37. success bool
  38. backoff bool
  39. }
  40. // Conn represents a connection to nsqd
  41. //
  42. // Conn exposes a set of callbacks for the
  43. // various events that occur on a connection
  44. type Conn struct {
  45. // 64bit atomic vars need to be first for proper alignment on 32bit platforms
  46. messagesInFlight int64
  47. maxRdyCount int64
  48. rdyCount int64
  49. lastRdyTimestamp int64
  50. lastMsgTimestamp int64
  51. mtx sync.Mutex
  52. config *Config
  53. conn *net.TCPConn
  54. tlsConn *tls.Conn
  55. addr string
  56. delegate ConnDelegate
  57. logger []logger
  58. logLvl LogLevel
  59. logFmt []string
  60. logGuard sync.RWMutex
  61. r io.Reader
  62. w io.Writer
  63. cmdChan chan *Command
  64. msgResponseChan chan *msgResponse
  65. exitChan chan int
  66. drainReady chan int
  67. closeFlag int32
  68. stopper sync.Once
  69. wg sync.WaitGroup
  70. readLoopRunning int32
  71. }
  72. // NewConn returns a new Conn instance
  73. func NewConn(addr string, config *Config, delegate ConnDelegate) *Conn {
  74. if !config.initialized {
  75. panic("Config must be created with NewConfig()")
  76. }
  77. return &Conn{
  78. addr: addr,
  79. config: config,
  80. delegate: delegate,
  81. maxRdyCount: 2500,
  82. lastMsgTimestamp: time.Now().UnixNano(),
  83. cmdChan: make(chan *Command),
  84. msgResponseChan: make(chan *msgResponse),
  85. exitChan: make(chan int),
  86. drainReady: make(chan int),
  87. logger: make([]logger, LogLevelMax+1),
  88. logFmt: make([]string, LogLevelMax+1),
  89. }
  90. }
  91. // SetLogger assigns the logger to use as well as a level.
  92. //
  93. // The format parameter is expected to be a printf compatible string with
  94. // a single %s argument. This is useful if you want to provide additional
  95. // context to the log messages that the connection will print, the default
  96. // is '(%s)'.
  97. //
  98. // The logger parameter is an interface that requires the following
  99. // method to be implemented (such as the the stdlib log.Logger):
  100. //
  101. // Output(calldepth int, s string)
  102. //
  103. func (c *Conn) SetLogger(l logger, lvl LogLevel, format string) {
  104. c.logGuard.Lock()
  105. defer c.logGuard.Unlock()
  106. if format == "" {
  107. format = "(%s)"
  108. }
  109. for level := range c.logger {
  110. c.logger[level] = l
  111. c.logFmt[level] = format
  112. }
  113. c.logLvl = lvl
  114. }
  115. func (c *Conn) SetLoggerForLevel(l logger, lvl LogLevel, format string) {
  116. c.logGuard.Lock()
  117. defer c.logGuard.Unlock()
  118. if format == "" {
  119. format = "(%s)"
  120. }
  121. c.logger[lvl] = l
  122. c.logFmt[lvl] = format
  123. }
  124. // SetLoggerLevel sets the package logging level.
  125. func (c *Conn) SetLoggerLevel(lvl LogLevel) {
  126. c.logGuard.Lock()
  127. defer c.logGuard.Unlock()
  128. c.logLvl = lvl
  129. }
  130. func (c *Conn) getLogger(lvl LogLevel) (logger, LogLevel, string) {
  131. c.logGuard.RLock()
  132. defer c.logGuard.RUnlock()
  133. return c.logger[lvl], c.logLvl, c.logFmt[lvl]
  134. }
  135. func (c *Conn) getLogLevel() LogLevel {
  136. c.logGuard.RLock()
  137. defer c.logGuard.RUnlock()
  138. return c.logLvl
  139. }
  140. // Connect dials and bootstraps the nsqd connection
  141. // (including IDENTIFY) and returns the IdentifyResponse
  142. func (c *Conn) Connect() (*IdentifyResponse, error) {
  143. dialer := &net.Dialer{
  144. LocalAddr: c.config.LocalAddr,
  145. Timeout: c.config.DialTimeout,
  146. }
  147. conn, err := dialer.Dial("tcp", c.addr)
  148. if err != nil {
  149. return nil, err
  150. }
  151. c.conn = conn.(*net.TCPConn)
  152. c.r = conn
  153. c.w = conn
  154. _, err = c.Write(MagicV2)
  155. if err != nil {
  156. c.Close()
  157. return nil, fmt.Errorf("[%s] failed to write magic - %s", c.addr, err)
  158. }
  159. resp, err := c.identify()
  160. if err != nil {
  161. return nil, err
  162. }
  163. if resp != nil && resp.AuthRequired {
  164. if c.config.AuthSecret == "" {
  165. c.log(LogLevelError, "Auth Required")
  166. return nil, errors.New("Auth Required")
  167. }
  168. err := c.auth(c.config.AuthSecret)
  169. if err != nil {
  170. c.log(LogLevelError, "Auth Failed %s", err)
  171. return nil, err
  172. }
  173. }
  174. c.wg.Add(2)
  175. atomic.StoreInt32(&c.readLoopRunning, 1)
  176. go c.readLoop()
  177. go c.writeLoop()
  178. return resp, nil
  179. }
  180. // Close idempotently initiates connection close
  181. func (c *Conn) Close() error {
  182. atomic.StoreInt32(&c.closeFlag, 1)
  183. if c.conn != nil && atomic.LoadInt64(&c.messagesInFlight) == 0 {
  184. return c.conn.CloseRead()
  185. }
  186. return nil
  187. }
  188. // IsClosing indicates whether or not the
  189. // connection is currently in the processing of
  190. // gracefully closing
  191. func (c *Conn) IsClosing() bool {
  192. return atomic.LoadInt32(&c.closeFlag) == 1
  193. }
  194. // RDY returns the current RDY count
  195. func (c *Conn) RDY() int64 {
  196. return atomic.LoadInt64(&c.rdyCount)
  197. }
  198. // LastRDY returns the previously set RDY count
  199. func (c *Conn) LastRDY() int64 {
  200. return atomic.LoadInt64(&c.rdyCount)
  201. }
  202. // SetRDY stores the specified RDY count
  203. func (c *Conn) SetRDY(rdy int64) {
  204. atomic.StoreInt64(&c.rdyCount, rdy)
  205. if rdy > 0 {
  206. atomic.StoreInt64(&c.lastRdyTimestamp, time.Now().UnixNano())
  207. }
  208. }
  209. // MaxRDY returns the nsqd negotiated maximum
  210. // RDY count that it will accept for this connection
  211. func (c *Conn) MaxRDY() int64 {
  212. return c.maxRdyCount
  213. }
  214. // LastRdyTime returns the time of the last non-zero RDY
  215. // update for this connection
  216. func (c *Conn) LastRdyTime() time.Time {
  217. return time.Unix(0, atomic.LoadInt64(&c.lastRdyTimestamp))
  218. }
  219. // LastMessageTime returns a time.Time representing
  220. // the time at which the last message was received
  221. func (c *Conn) LastMessageTime() time.Time {
  222. return time.Unix(0, atomic.LoadInt64(&c.lastMsgTimestamp))
  223. }
  224. // RemoteAddr returns the configured destination nsqd address
  225. func (c *Conn) RemoteAddr() net.Addr {
  226. return c.conn.RemoteAddr()
  227. }
  228. // String returns the fully-qualified address
  229. func (c *Conn) String() string {
  230. return c.addr
  231. }
  232. // Read performs a deadlined read on the underlying TCP connection
  233. func (c *Conn) Read(p []byte) (int, error) {
  234. c.conn.SetReadDeadline(time.Now().Add(c.config.ReadTimeout))
  235. return c.r.Read(p)
  236. }
  237. // Write performs a deadlined write on the underlying TCP connection
  238. func (c *Conn) Write(p []byte) (int, error) {
  239. c.conn.SetWriteDeadline(time.Now().Add(c.config.WriteTimeout))
  240. return c.w.Write(p)
  241. }
  242. // WriteCommand is a goroutine safe method to write a Command
  243. // to this connection, and flush.
  244. func (c *Conn) WriteCommand(cmd *Command) error {
  245. c.mtx.Lock()
  246. _, err := cmd.WriteTo(c)
  247. if err != nil {
  248. goto exit
  249. }
  250. err = c.Flush()
  251. exit:
  252. c.mtx.Unlock()
  253. if err != nil {
  254. c.log(LogLevelError, "IO error - %s", err)
  255. c.delegate.OnIOError(c, err)
  256. }
  257. return err
  258. }
  259. type flusher interface {
  260. Flush() error
  261. }
  262. // Flush writes all buffered data to the underlying TCP connection
  263. func (c *Conn) Flush() error {
  264. if f, ok := c.w.(flusher); ok {
  265. return f.Flush()
  266. }
  267. return nil
  268. }
  269. func (c *Conn) identify() (*IdentifyResponse, error) {
  270. ci := make(map[string]interface{})
  271. ci["client_id"] = c.config.ClientID
  272. ci["hostname"] = c.config.Hostname
  273. ci["user_agent"] = c.config.UserAgent
  274. ci["short_id"] = c.config.ClientID // deprecated
  275. ci["long_id"] = c.config.Hostname // deprecated
  276. ci["tls_v1"] = c.config.TlsV1
  277. ci["deflate"] = c.config.Deflate
  278. ci["deflate_level"] = c.config.DeflateLevel
  279. ci["snappy"] = c.config.Snappy
  280. ci["feature_negotiation"] = true
  281. if c.config.HeartbeatInterval == -1 {
  282. ci["heartbeat_interval"] = -1
  283. } else {
  284. ci["heartbeat_interval"] = int64(c.config.HeartbeatInterval / time.Millisecond)
  285. }
  286. ci["sample_rate"] = c.config.SampleRate
  287. ci["output_buffer_size"] = c.config.OutputBufferSize
  288. if c.config.OutputBufferTimeout == -1 {
  289. ci["output_buffer_timeout"] = -1
  290. } else {
  291. ci["output_buffer_timeout"] = int64(c.config.OutputBufferTimeout / time.Millisecond)
  292. }
  293. ci["msg_timeout"] = int64(c.config.MsgTimeout / time.Millisecond)
  294. cmd, err := Identify(ci)
  295. if err != nil {
  296. return nil, ErrIdentify{err.Error()}
  297. }
  298. err = c.WriteCommand(cmd)
  299. if err != nil {
  300. return nil, ErrIdentify{err.Error()}
  301. }
  302. frameType, data, err := ReadUnpackedResponse(c)
  303. if err != nil {
  304. return nil, ErrIdentify{err.Error()}
  305. }
  306. if frameType == FrameTypeError {
  307. return nil, ErrIdentify{string(data)}
  308. }
  309. // check to see if the server was able to respond w/ capabilities
  310. // i.e. it was a JSON response
  311. if data[0] != '{' {
  312. return nil, nil
  313. }
  314. resp := &IdentifyResponse{}
  315. err = json.Unmarshal(data, resp)
  316. if err != nil {
  317. return nil, ErrIdentify{err.Error()}
  318. }
  319. c.log(LogLevelDebug, "IDENTIFY response: %+v", resp)
  320. c.maxRdyCount = resp.MaxRdyCount
  321. if resp.TLSv1 {
  322. c.log(LogLevelInfo, "upgrading to TLS")
  323. err := c.upgradeTLS(c.config.TlsConfig)
  324. if err != nil {
  325. return nil, ErrIdentify{err.Error()}
  326. }
  327. }
  328. if resp.Deflate {
  329. c.log(LogLevelInfo, "upgrading to Deflate")
  330. err := c.upgradeDeflate(c.config.DeflateLevel)
  331. if err != nil {
  332. return nil, ErrIdentify{err.Error()}
  333. }
  334. }
  335. if resp.Snappy {
  336. c.log(LogLevelInfo, "upgrading to Snappy")
  337. err := c.upgradeSnappy()
  338. if err != nil {
  339. return nil, ErrIdentify{err.Error()}
  340. }
  341. }
  342. // now that connection is bootstrapped, enable read buffering
  343. // (and write buffering if it's not already capable of Flush())
  344. c.r = bufio.NewReader(c.r)
  345. if _, ok := c.w.(flusher); !ok {
  346. c.w = bufio.NewWriter(c.w)
  347. }
  348. return resp, nil
  349. }
  350. func (c *Conn) upgradeTLS(tlsConf *tls.Config) error {
  351. host, _, err := net.SplitHostPort(c.addr)
  352. if err != nil {
  353. return err
  354. }
  355. // create a local copy of the config to set ServerName for this connection
  356. conf := &tls.Config{}
  357. if tlsConf != nil {
  358. conf = tlsConf.Clone()
  359. }
  360. conf.ServerName = host
  361. c.tlsConn = tls.Client(c.conn, conf)
  362. err = c.tlsConn.Handshake()
  363. if err != nil {
  364. return err
  365. }
  366. c.r = c.tlsConn
  367. c.w = c.tlsConn
  368. frameType, data, err := ReadUnpackedResponse(c)
  369. if err != nil {
  370. return err
  371. }
  372. if frameType != FrameTypeResponse || !bytes.Equal(data, []byte("OK")) {
  373. return errors.New("invalid response from TLS upgrade")
  374. }
  375. return nil
  376. }
  377. func (c *Conn) upgradeDeflate(level int) error {
  378. conn := net.Conn(c.conn)
  379. if c.tlsConn != nil {
  380. conn = c.tlsConn
  381. }
  382. fw, _ := flate.NewWriter(conn, level)
  383. c.r = flate.NewReader(conn)
  384. c.w = fw
  385. frameType, data, err := ReadUnpackedResponse(c)
  386. if err != nil {
  387. return err
  388. }
  389. if frameType != FrameTypeResponse || !bytes.Equal(data, []byte("OK")) {
  390. return errors.New("invalid response from Deflate upgrade")
  391. }
  392. return nil
  393. }
  394. func (c *Conn) upgradeSnappy() error {
  395. conn := net.Conn(c.conn)
  396. if c.tlsConn != nil {
  397. conn = c.tlsConn
  398. }
  399. c.r = snappy.NewReader(conn)
  400. c.w = snappy.NewWriter(conn)
  401. frameType, data, err := ReadUnpackedResponse(c)
  402. if err != nil {
  403. return err
  404. }
  405. if frameType != FrameTypeResponse || !bytes.Equal(data, []byte("OK")) {
  406. return errors.New("invalid response from Snappy upgrade")
  407. }
  408. return nil
  409. }
  410. func (c *Conn) auth(secret string) error {
  411. cmd, err := Auth(secret)
  412. if err != nil {
  413. return err
  414. }
  415. err = c.WriteCommand(cmd)
  416. if err != nil {
  417. return err
  418. }
  419. frameType, data, err := ReadUnpackedResponse(c)
  420. if err != nil {
  421. return err
  422. }
  423. if frameType == FrameTypeError {
  424. return errors.New("Error authenticating " + string(data))
  425. }
  426. resp := &AuthResponse{}
  427. err = json.Unmarshal(data, resp)
  428. if err != nil {
  429. return err
  430. }
  431. c.log(LogLevelInfo, "Auth accepted. Identity: %q %s Permissions: %d",
  432. resp.Identity, resp.IdentityUrl, resp.PermissionCount)
  433. return nil
  434. }
  435. func (c *Conn) readLoop() {
  436. delegate := &connMessageDelegate{c}
  437. for {
  438. if atomic.LoadInt32(&c.closeFlag) == 1 {
  439. goto exit
  440. }
  441. frameType, data, err := ReadUnpackedResponse(c)
  442. if err != nil {
  443. if err == io.EOF && atomic.LoadInt32(&c.closeFlag) == 1 {
  444. goto exit
  445. }
  446. if !strings.Contains(err.Error(), "use of closed network connection") {
  447. c.log(LogLevelError, "IO error - %s", err)
  448. c.delegate.OnIOError(c, err)
  449. }
  450. goto exit
  451. }
  452. if frameType == FrameTypeResponse && bytes.Equal(data, []byte("_heartbeat_")) {
  453. c.log(LogLevelDebug, "heartbeat received")
  454. c.delegate.OnHeartbeat(c)
  455. err := c.WriteCommand(Nop())
  456. if err != nil {
  457. c.log(LogLevelError, "IO error - %s", err)
  458. c.delegate.OnIOError(c, err)
  459. goto exit
  460. }
  461. continue
  462. }
  463. switch frameType {
  464. case FrameTypeResponse:
  465. c.delegate.OnResponse(c, data)
  466. case FrameTypeMessage:
  467. msg, err := DecodeMessage(data)
  468. if err != nil {
  469. c.log(LogLevelError, "IO error - %s", err)
  470. c.delegate.OnIOError(c, err)
  471. goto exit
  472. }
  473. msg.Delegate = delegate
  474. msg.NSQDAddress = c.String()
  475. atomic.AddInt64(&c.messagesInFlight, 1)
  476. atomic.StoreInt64(&c.lastMsgTimestamp, time.Now().UnixNano())
  477. c.delegate.OnMessage(c, msg)
  478. case FrameTypeError:
  479. c.log(LogLevelError, "protocol error - %s", data)
  480. c.delegate.OnError(c, data)
  481. default:
  482. c.log(LogLevelError, "IO error - %s", err)
  483. c.delegate.OnIOError(c, fmt.Errorf("unknown frame type %d", frameType))
  484. }
  485. }
  486. exit:
  487. atomic.StoreInt32(&c.readLoopRunning, 0)
  488. // start the connection close
  489. messagesInFlight := atomic.LoadInt64(&c.messagesInFlight)
  490. if messagesInFlight == 0 {
  491. // if we exited readLoop with no messages in flight
  492. // we need to explicitly trigger the close because
  493. // writeLoop won't
  494. c.close()
  495. } else {
  496. c.log(LogLevelWarning, "delaying close, %d outstanding messages", messagesInFlight)
  497. }
  498. c.wg.Done()
  499. c.log(LogLevelInfo, "readLoop exiting")
  500. }
  501. func (c *Conn) writeLoop() {
  502. for {
  503. select {
  504. case <-c.exitChan:
  505. c.log(LogLevelInfo, "breaking out of writeLoop")
  506. // Indicate drainReady because we will not pull any more off msgResponseChan
  507. close(c.drainReady)
  508. goto exit
  509. case cmd := <-c.cmdChan:
  510. err := c.WriteCommand(cmd)
  511. if err != nil {
  512. c.log(LogLevelError, "error sending command %s - %s", cmd, err)
  513. c.close()
  514. continue
  515. }
  516. case resp := <-c.msgResponseChan:
  517. // Decrement this here so it is correct even if we can't respond to nsqd
  518. msgsInFlight := atomic.AddInt64(&c.messagesInFlight, -1)
  519. if resp.success {
  520. c.log(LogLevelDebug, "FIN %s", resp.msg.ID)
  521. c.delegate.OnMessageFinished(c, resp.msg)
  522. c.delegate.OnResume(c)
  523. } else {
  524. c.log(LogLevelDebug, "REQ %s", resp.msg.ID)
  525. c.delegate.OnMessageRequeued(c, resp.msg)
  526. if resp.backoff {
  527. c.delegate.OnBackoff(c)
  528. } else {
  529. c.delegate.OnContinue(c)
  530. }
  531. }
  532. err := c.WriteCommand(resp.cmd)
  533. if err != nil {
  534. c.log(LogLevelError, "error sending command %s - %s", resp.cmd, err)
  535. c.close()
  536. continue
  537. }
  538. if msgsInFlight == 0 &&
  539. atomic.LoadInt32(&c.closeFlag) == 1 {
  540. c.close()
  541. continue
  542. }
  543. }
  544. }
  545. exit:
  546. c.wg.Done()
  547. c.log(LogLevelInfo, "writeLoop exiting")
  548. }
  549. func (c *Conn) close() {
  550. // a "clean" connection close is orchestrated as follows:
  551. //
  552. // 1. CLOSE cmd sent to nsqd
  553. // 2. CLOSE_WAIT response received from nsqd
  554. // 3. set c.closeFlag
  555. // 4. readLoop() exits
  556. // a. if messages-in-flight > 0 delay close()
  557. // i. writeLoop() continues receiving on c.msgResponseChan chan
  558. // x. when messages-in-flight == 0 call close()
  559. // b. else call close() immediately
  560. // 5. c.exitChan close
  561. // a. writeLoop() exits
  562. // i. c.drainReady close
  563. // 6a. launch cleanup() goroutine (we're racing with intraprocess
  564. // routed messages, see comments below)
  565. // a. wait on c.drainReady
  566. // b. loop and receive on c.msgResponseChan chan
  567. // until messages-in-flight == 0
  568. // i. ensure that readLoop has exited
  569. // 6b. launch waitForCleanup() goroutine
  570. // b. wait on waitgroup (covers readLoop() and writeLoop()
  571. // and cleanup goroutine)
  572. // c. underlying TCP connection close
  573. // d. trigger Delegate OnClose()
  574. //
  575. c.stopper.Do(func() {
  576. c.log(LogLevelInfo, "beginning close")
  577. close(c.exitChan)
  578. c.conn.CloseRead()
  579. c.wg.Add(1)
  580. go c.cleanup()
  581. go c.waitForCleanup()
  582. })
  583. }
  584. func (c *Conn) cleanup() {
  585. <-c.drainReady
  586. ticker := time.NewTicker(100 * time.Millisecond)
  587. lastWarning := time.Now()
  588. // writeLoop has exited, drain any remaining in flight messages
  589. for {
  590. // we're racing with readLoop which potentially has a message
  591. // for handling so infinitely loop until messagesInFlight == 0
  592. // and readLoop has exited
  593. var msgsInFlight int64
  594. select {
  595. case <-c.msgResponseChan:
  596. msgsInFlight = atomic.AddInt64(&c.messagesInFlight, -1)
  597. case <-ticker.C:
  598. msgsInFlight = atomic.LoadInt64(&c.messagesInFlight)
  599. }
  600. if msgsInFlight > 0 {
  601. if time.Now().Sub(lastWarning) > time.Second {
  602. c.log(LogLevelWarning, "draining... waiting for %d messages in flight", msgsInFlight)
  603. lastWarning = time.Now()
  604. }
  605. continue
  606. }
  607. // until the readLoop has exited we cannot be sure that there
  608. // still won't be a race
  609. if atomic.LoadInt32(&c.readLoopRunning) == 1 {
  610. if time.Now().Sub(lastWarning) > time.Second {
  611. c.log(LogLevelWarning, "draining... readLoop still running")
  612. lastWarning = time.Now()
  613. }
  614. continue
  615. }
  616. goto exit
  617. }
  618. exit:
  619. ticker.Stop()
  620. c.wg.Done()
  621. c.log(LogLevelInfo, "finished draining, cleanup exiting")
  622. }
  623. func (c *Conn) waitForCleanup() {
  624. // this blocks until readLoop and writeLoop
  625. // (and cleanup goroutine above) have exited
  626. c.wg.Wait()
  627. c.conn.CloseWrite()
  628. c.log(LogLevelInfo, "clean close complete")
  629. c.delegate.OnClose(c)
  630. }
  631. func (c *Conn) onMessageFinish(m *Message) {
  632. c.msgResponseChan <- &msgResponse{msg: m, cmd: Finish(m.ID), success: true}
  633. }
  634. func (c *Conn) onMessageRequeue(m *Message, delay time.Duration, backoff bool) {
  635. if delay == -1 {
  636. // linear delay
  637. delay = c.config.DefaultRequeueDelay * time.Duration(m.Attempts)
  638. // bound the requeueDelay to configured max
  639. if delay > c.config.MaxRequeueDelay {
  640. delay = c.config.MaxRequeueDelay
  641. }
  642. }
  643. c.msgResponseChan <- &msgResponse{msg: m, cmd: Requeue(m.ID, delay), success: false, backoff: backoff}
  644. }
  645. func (c *Conn) onMessageTouch(m *Message) {
  646. select {
  647. case c.cmdChan <- Touch(m.ID):
  648. case <-c.exitChan:
  649. }
  650. }
  651. func (c *Conn) log(lvl LogLevel, line string, args ...interface{}) {
  652. logger, logLvl, logFmt := c.getLogger(lvl)
  653. if logger == nil {
  654. return
  655. }
  656. if logLvl > lvl {
  657. return
  658. }
  659. logger.Output(2, fmt.Sprintf("%-4s %s %s", lvl,
  660. fmt.Sprintf(logFmt, c.String()),
  661. fmt.Sprintf(line, args...)))
  662. }