client_v2.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690
  1. package nsqd
  2. import (
  3. "bufio"
  4. "compress/flate"
  5. "crypto/tls"
  6. "fmt"
  7. "net"
  8. "strings"
  9. "sync"
  10. "sync/atomic"
  11. "time"
  12. "github.com/golang/snappy"
  13. "github.com/nsqio/nsq/internal/auth"
  14. )
  15. const defaultBufferSize = 16 * 1024
  16. const (
  17. stateInit = iota
  18. stateDisconnected
  19. stateConnected
  20. stateSubscribed
  21. stateClosing
  22. )
  23. type identifyDataV2 struct {
  24. ClientID string `json:"client_id"`
  25. Hostname string `json:"hostname"`
  26. HeartbeatInterval int `json:"heartbeat_interval"`
  27. OutputBufferSize int `json:"output_buffer_size"`
  28. OutputBufferTimeout int `json:"output_buffer_timeout"`
  29. FeatureNegotiation bool `json:"feature_negotiation"`
  30. TLSv1 bool `json:"tls_v1"`
  31. Deflate bool `json:"deflate"`
  32. DeflateLevel int `json:"deflate_level"`
  33. Snappy bool `json:"snappy"`
  34. SampleRate int32 `json:"sample_rate"`
  35. UserAgent string `json:"user_agent"`
  36. MsgTimeout int `json:"msg_timeout"`
  37. }
  38. type identifyEvent struct {
  39. OutputBufferTimeout time.Duration
  40. HeartbeatInterval time.Duration
  41. SampleRate int32
  42. MsgTimeout time.Duration
  43. }
  44. type PubCount struct {
  45. Topic string `json:"topic"`
  46. Count uint64 `json:"count"`
  47. }
  48. type ClientV2Stats struct {
  49. ClientID string `json:"client_id"`
  50. Hostname string `json:"hostname"`
  51. Version string `json:"version"`
  52. RemoteAddress string `json:"remote_address"`
  53. State int32 `json:"state"`
  54. ReadyCount int64 `json:"ready_count"`
  55. InFlightCount int64 `json:"in_flight_count"`
  56. MessageCount uint64 `json:"message_count"`
  57. FinishCount uint64 `json:"finish_count"`
  58. RequeueCount uint64 `json:"requeue_count"`
  59. ConnectTime int64 `json:"connect_ts"`
  60. SampleRate int32 `json:"sample_rate"`
  61. Deflate bool `json:"deflate"`
  62. Snappy bool `json:"snappy"`
  63. UserAgent string `json:"user_agent"`
  64. Authed bool `json:"authed,omitempty"`
  65. AuthIdentity string `json:"auth_identity,omitempty"`
  66. AuthIdentityURL string `json:"auth_identity_url,omitempty"`
  67. PubCounts []PubCount `json:"pub_counts,omitempty"`
  68. TLS bool `json:"tls"`
  69. CipherSuite string `json:"tls_cipher_suite"`
  70. TLSVersion string `json:"tls_version"`
  71. TLSNegotiatedProtocol string `json:"tls_negotiated_protocol"`
  72. TLSNegotiatedProtocolIsMutual bool `json:"tls_negotiated_protocol_is_mutual"`
  73. }
  74. func (s ClientV2Stats) String() string {
  75. connectTime := time.Unix(s.ConnectTime, 0)
  76. duration := time.Since(connectTime).Truncate(time.Second)
  77. _, port, _ := net.SplitHostPort(s.RemoteAddress)
  78. id := fmt.Sprintf("%s:%s %s", s.Hostname, port, s.UserAgent)
  79. // producer
  80. if len(s.PubCounts) > 0 {
  81. var total uint64
  82. var topicOut []string
  83. for _, v := range s.PubCounts {
  84. total += v.Count
  85. topicOut = append(topicOut, fmt.Sprintf("%s=%d", v.Topic, v.Count))
  86. }
  87. return fmt.Sprintf("[%s %-21s] msgs: %-8d topics: %s connected: %s",
  88. s.Version,
  89. id,
  90. total,
  91. strings.Join(topicOut, ","),
  92. duration,
  93. )
  94. }
  95. // consumer
  96. return fmt.Sprintf("[%s %-21s] state: %d inflt: %-4d rdy: %-4d fin: %-8d re-q: %-8d msgs: %-8d connected: %s",
  97. s.Version,
  98. id,
  99. s.State,
  100. s.InFlightCount,
  101. s.ReadyCount,
  102. s.FinishCount,
  103. s.RequeueCount,
  104. s.MessageCount,
  105. duration,
  106. )
  107. }
  108. type clientV2 struct {
  109. // 64bit atomic vars need to be first for proper alignment on 32bit platforms
  110. ReadyCount int64
  111. InFlightCount int64
  112. MessageCount uint64
  113. FinishCount uint64
  114. RequeueCount uint64
  115. pubCounts map[string]uint64
  116. writeLock sync.RWMutex
  117. metaLock sync.RWMutex
  118. ID int64
  119. nsqd *NSQD
  120. UserAgent string
  121. // original connection
  122. net.Conn
  123. // connections based on negotiated features
  124. tlsConn *tls.Conn
  125. flateWriter *flate.Writer
  126. // reading/writing interfaces
  127. Reader *bufio.Reader
  128. Writer *bufio.Writer
  129. OutputBufferSize int
  130. OutputBufferTimeout time.Duration
  131. HeartbeatInterval time.Duration
  132. MsgTimeout time.Duration
  133. State int32
  134. ConnectTime time.Time
  135. Channel *Channel
  136. ReadyStateChan chan int
  137. ExitChan chan int
  138. ClientID string
  139. Hostname string
  140. SampleRate int32
  141. IdentifyEventChan chan identifyEvent
  142. SubEventChan chan *Channel
  143. TLS int32
  144. Snappy int32
  145. Deflate int32
  146. // re-usable buffer for reading the 4-byte lengths off the wire
  147. lenBuf [4]byte
  148. lenSlice []byte
  149. AuthSecret string
  150. AuthState *auth.State
  151. }
  152. func newClientV2(id int64, conn net.Conn, nsqd *NSQD) *clientV2 {
  153. var identifier string
  154. if conn != nil {
  155. identifier, _, _ = net.SplitHostPort(conn.RemoteAddr().String())
  156. }
  157. c := &clientV2{
  158. ID: id,
  159. nsqd: nsqd,
  160. Conn: conn,
  161. Reader: bufio.NewReaderSize(conn, defaultBufferSize),
  162. Writer: bufio.NewWriterSize(conn, defaultBufferSize),
  163. OutputBufferSize: defaultBufferSize,
  164. OutputBufferTimeout: nsqd.getOpts().OutputBufferTimeout,
  165. MsgTimeout: nsqd.getOpts().MsgTimeout,
  166. // ReadyStateChan has a buffer of 1 to guarantee that in the event
  167. // there is a race the state update is not lost
  168. ReadyStateChan: make(chan int, 1),
  169. ExitChan: make(chan int),
  170. ConnectTime: time.Now(),
  171. State: stateInit,
  172. ClientID: identifier,
  173. Hostname: identifier,
  174. SubEventChan: make(chan *Channel, 1),
  175. IdentifyEventChan: make(chan identifyEvent, 1),
  176. // heartbeats are client configurable but default to 30s
  177. HeartbeatInterval: nsqd.getOpts().ClientTimeout / 2,
  178. pubCounts: make(map[string]uint64),
  179. }
  180. c.lenSlice = c.lenBuf[:]
  181. return c
  182. }
  183. func (c *clientV2) String() string {
  184. return c.RemoteAddr().String()
  185. }
  186. func (c *clientV2) Type() int {
  187. c.metaLock.RLock()
  188. hasPublished := len(c.pubCounts) > 0
  189. c.metaLock.RUnlock()
  190. if hasPublished {
  191. return typeProducer
  192. }
  193. return typeConsumer
  194. }
  195. func (c *clientV2) Identify(data identifyDataV2) error {
  196. c.nsqd.logf(LOG_INFO, "[%s] IDENTIFY: %+v", c, data)
  197. c.metaLock.Lock()
  198. c.ClientID = data.ClientID
  199. c.Hostname = data.Hostname
  200. c.UserAgent = data.UserAgent
  201. c.metaLock.Unlock()
  202. err := c.SetHeartbeatInterval(data.HeartbeatInterval)
  203. if err != nil {
  204. return err
  205. }
  206. err = c.SetOutputBuffer(data.OutputBufferSize, data.OutputBufferTimeout)
  207. if err != nil {
  208. return err
  209. }
  210. err = c.SetSampleRate(data.SampleRate)
  211. if err != nil {
  212. return err
  213. }
  214. err = c.SetMsgTimeout(data.MsgTimeout)
  215. if err != nil {
  216. return err
  217. }
  218. ie := identifyEvent{
  219. OutputBufferTimeout: c.OutputBufferTimeout,
  220. HeartbeatInterval: c.HeartbeatInterval,
  221. SampleRate: c.SampleRate,
  222. MsgTimeout: c.MsgTimeout,
  223. }
  224. // update the client's message pump
  225. select {
  226. case c.IdentifyEventChan <- ie:
  227. default:
  228. }
  229. return nil
  230. }
  231. func (c *clientV2) Stats(topicName string) ClientStats {
  232. c.metaLock.RLock()
  233. clientID := c.ClientID
  234. hostname := c.Hostname
  235. userAgent := c.UserAgent
  236. var identity string
  237. var identityURL string
  238. if c.AuthState != nil {
  239. identity = c.AuthState.Identity
  240. identityURL = c.AuthState.IdentityURL
  241. }
  242. pubCounts := make([]PubCount, 0, len(c.pubCounts))
  243. for topic, count := range c.pubCounts {
  244. if len(topicName) > 0 && topic != topicName {
  245. continue
  246. }
  247. pubCounts = append(pubCounts, PubCount{
  248. Topic: topic,
  249. Count: count,
  250. })
  251. break
  252. }
  253. c.metaLock.RUnlock()
  254. stats := ClientV2Stats{
  255. Version: "V2",
  256. RemoteAddress: c.RemoteAddr().String(),
  257. ClientID: clientID,
  258. Hostname: hostname,
  259. UserAgent: userAgent,
  260. State: atomic.LoadInt32(&c.State),
  261. ReadyCount: atomic.LoadInt64(&c.ReadyCount),
  262. InFlightCount: atomic.LoadInt64(&c.InFlightCount),
  263. MessageCount: atomic.LoadUint64(&c.MessageCount),
  264. FinishCount: atomic.LoadUint64(&c.FinishCount),
  265. RequeueCount: atomic.LoadUint64(&c.RequeueCount),
  266. ConnectTime: c.ConnectTime.Unix(),
  267. SampleRate: atomic.LoadInt32(&c.SampleRate),
  268. TLS: atomic.LoadInt32(&c.TLS) == 1,
  269. Deflate: atomic.LoadInt32(&c.Deflate) == 1,
  270. Snappy: atomic.LoadInt32(&c.Snappy) == 1,
  271. Authed: c.HasAuthorizations(),
  272. AuthIdentity: identity,
  273. AuthIdentityURL: identityURL,
  274. PubCounts: pubCounts,
  275. }
  276. if stats.TLS {
  277. p := prettyConnectionState{c.tlsConn.ConnectionState()}
  278. stats.CipherSuite = p.GetCipherSuite()
  279. stats.TLSVersion = p.GetVersion()
  280. stats.TLSNegotiatedProtocol = p.NegotiatedProtocol
  281. stats.TLSNegotiatedProtocolIsMutual = p.NegotiatedProtocolIsMutual
  282. }
  283. return stats
  284. }
  285. // struct to convert from integers to the human readable strings
  286. type prettyConnectionState struct {
  287. tls.ConnectionState
  288. }
  289. func (p *prettyConnectionState) GetCipherSuite() string {
  290. switch p.CipherSuite {
  291. case tls.TLS_RSA_WITH_RC4_128_SHA:
  292. return "TLS_RSA_WITH_RC4_128_SHA"
  293. case tls.TLS_RSA_WITH_3DES_EDE_CBC_SHA:
  294. return "TLS_RSA_WITH_3DES_EDE_CBC_SHA"
  295. case tls.TLS_RSA_WITH_AES_128_CBC_SHA:
  296. return "TLS_RSA_WITH_AES_128_CBC_SHA"
  297. case tls.TLS_RSA_WITH_AES_256_CBC_SHA:
  298. return "TLS_RSA_WITH_AES_256_CBC_SHA"
  299. case tls.TLS_ECDHE_ECDSA_WITH_RC4_128_SHA:
  300. return "TLS_ECDHE_ECDSA_WITH_RC4_128_SHA"
  301. case tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA:
  302. return "TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA"
  303. case tls.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA:
  304. return "TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA"
  305. case tls.TLS_ECDHE_RSA_WITH_RC4_128_SHA:
  306. return "TLS_ECDHE_RSA_WITH_RC4_128_SHA"
  307. case tls.TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA:
  308. return "TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA"
  309. case tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA:
  310. return "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA"
  311. case tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA:
  312. return "TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA"
  313. case tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256:
  314. return "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"
  315. case tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256:
  316. return "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256"
  317. }
  318. return fmt.Sprintf("Unknown %d", p.CipherSuite)
  319. }
  320. func (p *prettyConnectionState) GetVersion() string {
  321. switch p.Version {
  322. case tls.VersionSSL30:
  323. return "SSL30"
  324. case tls.VersionTLS10:
  325. return "TLS1.0"
  326. case tls.VersionTLS11:
  327. return "TLS1.1"
  328. case tls.VersionTLS12:
  329. return "TLS1.2"
  330. default:
  331. return fmt.Sprintf("Unknown %d", p.Version)
  332. }
  333. }
  334. func (c *clientV2) IsReadyForMessages() bool {
  335. if c.Channel.IsPaused() {
  336. return false
  337. }
  338. readyCount := atomic.LoadInt64(&c.ReadyCount)
  339. inFlightCount := atomic.LoadInt64(&c.InFlightCount)
  340. c.nsqd.logf(LOG_DEBUG, "[%s] state rdy: %4d inflt: %4d", c, readyCount, inFlightCount)
  341. if inFlightCount >= readyCount || readyCount <= 0 {
  342. return false
  343. }
  344. return true
  345. }
  346. func (c *clientV2) SetReadyCount(count int64) {
  347. oldCount := atomic.SwapInt64(&c.ReadyCount, count)
  348. if oldCount != count {
  349. c.tryUpdateReadyState()
  350. }
  351. }
  352. func (c *clientV2) tryUpdateReadyState() {
  353. // you can always *try* to write to ReadyStateChan because in the cases
  354. // where you cannot the message pump loop would have iterated anyway.
  355. // the atomic integer operations guarantee correctness of the value.
  356. select {
  357. case c.ReadyStateChan <- 1:
  358. default:
  359. }
  360. }
  361. func (c *clientV2) FinishedMessage() {
  362. atomic.AddUint64(&c.FinishCount, 1)
  363. atomic.AddInt64(&c.InFlightCount, -1)
  364. c.tryUpdateReadyState()
  365. }
  366. func (c *clientV2) Empty() {
  367. atomic.StoreInt64(&c.InFlightCount, 0)
  368. c.tryUpdateReadyState()
  369. }
  370. func (c *clientV2) SendingMessage() {
  371. atomic.AddInt64(&c.InFlightCount, 1)
  372. atomic.AddUint64(&c.MessageCount, 1)
  373. }
  374. func (c *clientV2) PublishedMessage(topic string, count uint64) {
  375. c.metaLock.Lock()
  376. c.pubCounts[topic] += count
  377. c.metaLock.Unlock()
  378. }
  379. func (c *clientV2) TimedOutMessage() {
  380. atomic.AddInt64(&c.InFlightCount, -1)
  381. c.tryUpdateReadyState()
  382. }
  383. func (c *clientV2) RequeuedMessage() {
  384. atomic.AddUint64(&c.RequeueCount, 1)
  385. atomic.AddInt64(&c.InFlightCount, -1)
  386. c.tryUpdateReadyState()
  387. }
  388. func (c *clientV2) StartClose() {
  389. // Force the client into ready 0
  390. c.SetReadyCount(0)
  391. // mark this client as closing
  392. atomic.StoreInt32(&c.State, stateClosing)
  393. }
  394. func (c *clientV2) Pause() {
  395. c.tryUpdateReadyState()
  396. }
  397. func (c *clientV2) UnPause() {
  398. c.tryUpdateReadyState()
  399. }
  400. func (c *clientV2) SetHeartbeatInterval(desiredInterval int) error {
  401. c.writeLock.Lock()
  402. defer c.writeLock.Unlock()
  403. switch {
  404. case desiredInterval == -1:
  405. c.HeartbeatInterval = 0
  406. case desiredInterval == 0:
  407. // do nothing (use default)
  408. case desiredInterval >= 1000 &&
  409. desiredInterval <= int(c.nsqd.getOpts().MaxHeartbeatInterval/time.Millisecond):
  410. c.HeartbeatInterval = time.Duration(desiredInterval) * time.Millisecond
  411. default:
  412. return fmt.Errorf("heartbeat interval (%d) is invalid", desiredInterval)
  413. }
  414. return nil
  415. }
  416. func (c *clientV2) SetOutputBuffer(desiredSize int, desiredTimeout int) error {
  417. c.writeLock.Lock()
  418. defer c.writeLock.Unlock()
  419. switch {
  420. case desiredTimeout == -1:
  421. c.OutputBufferTimeout = 0
  422. case desiredTimeout == 0:
  423. // do nothing (use default)
  424. case true &&
  425. desiredTimeout >= int(c.nsqd.getOpts().MinOutputBufferTimeout/time.Millisecond) &&
  426. desiredTimeout <= int(c.nsqd.getOpts().MaxOutputBufferTimeout/time.Millisecond):
  427. c.OutputBufferTimeout = time.Duration(desiredTimeout) * time.Millisecond
  428. default:
  429. return fmt.Errorf("output buffer timeout (%d) is invalid", desiredTimeout)
  430. }
  431. switch {
  432. case desiredSize == -1:
  433. // effectively no buffer (every write will go directly to the wrapped net.Conn)
  434. c.OutputBufferSize = 1
  435. c.OutputBufferTimeout = 0
  436. case desiredSize == 0:
  437. // do nothing (use default)
  438. case desiredSize >= 64 && desiredSize <= int(c.nsqd.getOpts().MaxOutputBufferSize):
  439. c.OutputBufferSize = desiredSize
  440. default:
  441. return fmt.Errorf("output buffer size (%d) is invalid", desiredSize)
  442. }
  443. if desiredSize != 0 {
  444. err := c.Writer.Flush()
  445. if err != nil {
  446. return err
  447. }
  448. c.Writer = bufio.NewWriterSize(c.Conn, c.OutputBufferSize)
  449. }
  450. return nil
  451. }
  452. func (c *clientV2) SetSampleRate(sampleRate int32) error {
  453. if sampleRate < 0 || sampleRate > 99 {
  454. return fmt.Errorf("sample rate (%d) is invalid", sampleRate)
  455. }
  456. atomic.StoreInt32(&c.SampleRate, sampleRate)
  457. return nil
  458. }
  459. func (c *clientV2) SetMsgTimeout(msgTimeout int) error {
  460. c.writeLock.Lock()
  461. defer c.writeLock.Unlock()
  462. switch {
  463. case msgTimeout == 0:
  464. // do nothing (use default)
  465. case msgTimeout >= 1000 &&
  466. msgTimeout <= int(c.nsqd.getOpts().MaxMsgTimeout/time.Millisecond):
  467. c.MsgTimeout = time.Duration(msgTimeout) * time.Millisecond
  468. default:
  469. return fmt.Errorf("msg timeout (%d) is invalid", msgTimeout)
  470. }
  471. return nil
  472. }
  473. func (c *clientV2) UpgradeTLS() error {
  474. c.writeLock.Lock()
  475. defer c.writeLock.Unlock()
  476. tlsConn := tls.Server(c.Conn, c.nsqd.tlsConfig)
  477. tlsConn.SetDeadline(time.Now().Add(5 * time.Second))
  478. err := tlsConn.Handshake()
  479. if err != nil {
  480. return err
  481. }
  482. c.tlsConn = tlsConn
  483. c.Reader = bufio.NewReaderSize(c.tlsConn, defaultBufferSize)
  484. c.Writer = bufio.NewWriterSize(c.tlsConn, c.OutputBufferSize)
  485. atomic.StoreInt32(&c.TLS, 1)
  486. return nil
  487. }
  488. func (c *clientV2) UpgradeDeflate(level int) error {
  489. c.writeLock.Lock()
  490. defer c.writeLock.Unlock()
  491. conn := c.Conn
  492. if c.tlsConn != nil {
  493. conn = c.tlsConn
  494. }
  495. c.Reader = bufio.NewReaderSize(flate.NewReader(conn), defaultBufferSize)
  496. fw, _ := flate.NewWriter(conn, level)
  497. c.flateWriter = fw
  498. c.Writer = bufio.NewWriterSize(fw, c.OutputBufferSize)
  499. atomic.StoreInt32(&c.Deflate, 1)
  500. return nil
  501. }
  502. func (c *clientV2) UpgradeSnappy() error {
  503. c.writeLock.Lock()
  504. defer c.writeLock.Unlock()
  505. conn := c.Conn
  506. if c.tlsConn != nil {
  507. conn = c.tlsConn
  508. }
  509. c.Reader = bufio.NewReaderSize(snappy.NewReader(conn), defaultBufferSize)
  510. c.Writer = bufio.NewWriterSize(snappy.NewWriter(conn), c.OutputBufferSize)
  511. atomic.StoreInt32(&c.Snappy, 1)
  512. return nil
  513. }
  514. func (c *clientV2) Flush() error {
  515. var zeroTime time.Time
  516. if c.HeartbeatInterval > 0 {
  517. c.SetWriteDeadline(time.Now().Add(c.HeartbeatInterval))
  518. } else {
  519. c.SetWriteDeadline(zeroTime)
  520. }
  521. err := c.Writer.Flush()
  522. if err != nil {
  523. return err
  524. }
  525. if c.flateWriter != nil {
  526. return c.flateWriter.Flush()
  527. }
  528. return nil
  529. }
  530. func (c *clientV2) QueryAuthd() error {
  531. remoteIP, _, err := net.SplitHostPort(c.String())
  532. if err != nil {
  533. return err
  534. }
  535. tlsEnabled := atomic.LoadInt32(&c.TLS) == 1
  536. commonName := ""
  537. if tlsEnabled {
  538. tlsConnState := c.tlsConn.ConnectionState()
  539. if len(tlsConnState.PeerCertificates) > 0 {
  540. commonName = tlsConnState.PeerCertificates[0].Subject.CommonName
  541. }
  542. }
  543. authState, err := auth.QueryAnyAuthd(c.nsqd.getOpts().AuthHTTPAddresses,
  544. remoteIP, tlsEnabled, commonName, c.AuthSecret,
  545. c.nsqd.getOpts().HTTPClientConnectTimeout,
  546. c.nsqd.getOpts().HTTPClientRequestTimeout)
  547. if err != nil {
  548. return err
  549. }
  550. c.AuthState = authState
  551. return nil
  552. }
  553. func (c *clientV2) Auth(secret string) error {
  554. c.AuthSecret = secret
  555. return c.QueryAuthd()
  556. }
  557. func (c *clientV2) IsAuthorized(topic, channel string) (bool, error) {
  558. if c.AuthState == nil {
  559. return false, nil
  560. }
  561. if c.AuthState.IsExpired() {
  562. err := c.QueryAuthd()
  563. if err != nil {
  564. return false, err
  565. }
  566. }
  567. if c.AuthState.IsAllowed(topic, channel) {
  568. return true, nil
  569. }
  570. return false, nil
  571. }
  572. func (c *clientV2) HasAuthorizations() bool {
  573. if c.AuthState != nil {
  574. return len(c.AuthState.Authorizations) != 0
  575. }
  576. return false
  577. }