message.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. package nsqd
  2. import (
  3. "encoding/binary"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "time"
  8. )
  9. const (
  10. MsgIDLength = 16
  11. minValidMsgLength = MsgIDLength + 8 + 2 // Timestamp + Attempts
  12. )
  13. type MessageID [MsgIDLength]byte
  14. type Message struct {
  15. ID MessageID
  16. Body []byte
  17. Timestamp int64
  18. Attempts uint16
  19. // for in-flight handling
  20. deliveryTS time.Time
  21. clientID int64
  22. pri int64
  23. index int
  24. deferred time.Duration
  25. }
  26. func NewMessage(id MessageID, body []byte) *Message {
  27. return &Message{
  28. ID: id,
  29. Body: body,
  30. Timestamp: time.Now().UnixNano(),
  31. }
  32. }
  33. type Data struct {
  34. Date string `json:"date"`
  35. Msg string `json:"msg"`
  36. Latency int64 `json:"latency"`
  37. Priority int64 `json:"priority"`
  38. }
  39. func (m *Message) getLatency() (int64, error) {
  40. var data Data
  41. if err := json.Unmarshal(m.Body, &data); err == nil {
  42. return data.Latency, err
  43. } else {
  44. return 0, err
  45. }
  46. }
  47. func (m *Message) getPriority() (int64, error) {
  48. var data Data
  49. if err := json.Unmarshal(m.Body, &data); err == nil {
  50. return data.Priority, err
  51. } else {
  52. return 0, err
  53. }
  54. }
  55. func (m *Message) WriteTo(w io.Writer) (int64, error) {
  56. var buf [10]byte
  57. var total int64
  58. binary.BigEndian.PutUint64(buf[:8], uint64(m.Timestamp))
  59. binary.BigEndian.PutUint16(buf[8:10], uint16(m.Attempts))
  60. n, err := w.Write(buf[:])
  61. total += int64(n)
  62. if err != nil {
  63. return total, err
  64. }
  65. n, err = w.Write(m.ID[:])
  66. total += int64(n)
  67. if err != nil {
  68. return total, err
  69. }
  70. n, err = w.Write(m.Body)
  71. total += int64(n)
  72. if err != nil {
  73. return total, err
  74. }
  75. return total, nil
  76. }
  77. // decodeMessage deserializes data (as []byte) and creates a new Message
  78. // message format:
  79. // [x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]...
  80. // | (int64) || || (hex string encoded in ASCII) || (binary)
  81. // | 8-byte || || 16-byte || N-byte
  82. // ------------------------------------------------------------------------------------------...
  83. // nanosecond timestamp ^^ message ID message body
  84. // (uint16)
  85. // 2-byte
  86. // attempts
  87. func decodeMessage(b []byte) (*Message, error) {
  88. var msg Message
  89. if len(b) < minValidMsgLength {
  90. return nil, fmt.Errorf("invalid message buffer size (%d)", len(b))
  91. }
  92. msg.Timestamp = int64(binary.BigEndian.Uint64(b[:8]))
  93. msg.Attempts = binary.BigEndian.Uint16(b[8:10])
  94. copy(msg.ID[:], b[10:10+MsgIDLength])
  95. msg.Body = b[10+MsgIDLength:]
  96. return &msg, nil
  97. }
  98. func writeMessageToBackend(msg *Message, bq BackendQueue) error {
  99. buf := bufferPoolGet()
  100. defer bufferPoolPut(buf)
  101. _, err := msg.WriteTo(buf)
  102. if err != nil {
  103. return err
  104. }
  105. return bq.Put(buf.Bytes())
  106. }