123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126 |
- package nsqd
- import (
- "encoding/binary"
- "encoding/json"
- "fmt"
- "io"
- "time"
- )
- const (
- MsgIDLength = 16
- minValidMsgLength = MsgIDLength + 8 + 2 // Timestamp + Attempts
- )
- type MessageID [MsgIDLength]byte
- type Message struct {
- ID MessageID
- Body []byte
- Timestamp int64
- Attempts uint16
- // for in-flight handling
- deliveryTS time.Time
- clientID int64
- pri int64
- index int
- deferred time.Duration
- }
- func NewMessage(id MessageID, body []byte) *Message {
- return &Message{
- ID: id,
- Body: body,
- Timestamp: time.Now().UnixNano(),
- }
- }
- type Data struct {
- Date string `json:"date"`
- Msg string `json:"msg"`
- Latency int64 `json:"latency"`
- Priority int64 `json:"priority"`
- }
- func (m *Message) getLatency() (int64, error) {
- var data Data
- if err := json.Unmarshal(m.Body, &data); err == nil {
- return data.Latency, err
- } else {
- return 0, err
- }
- }
- func (m *Message) getPriority() (int64, error) {
- var data Data
- if err := json.Unmarshal(m.Body, &data); err == nil {
- return data.Priority, err
- } else {
- return 0, err
- }
- }
- func (m *Message) WriteTo(w io.Writer) (int64, error) {
- var buf [10]byte
- var total int64
- binary.BigEndian.PutUint64(buf[:8], uint64(m.Timestamp))
- binary.BigEndian.PutUint16(buf[8:10], uint16(m.Attempts))
- n, err := w.Write(buf[:])
- total += int64(n)
- if err != nil {
- return total, err
- }
- n, err = w.Write(m.ID[:])
- total += int64(n)
- if err != nil {
- return total, err
- }
- n, err = w.Write(m.Body)
- total += int64(n)
- if err != nil {
- return total, err
- }
- return total, nil
- }
- // decodeMessage deserializes data (as []byte) and creates a new Message
- // message format:
- // [x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]...
- // | (int64) || || (hex string encoded in ASCII) || (binary)
- // | 8-byte || || 16-byte || N-byte
- // ------------------------------------------------------------------------------------------...
- // nanosecond timestamp ^^ message ID message body
- // (uint16)
- // 2-byte
- // attempts
- func decodeMessage(b []byte) (*Message, error) {
- var msg Message
- if len(b) < minValidMsgLength {
- return nil, fmt.Errorf("invalid message buffer size (%d)", len(b))
- }
- msg.Timestamp = int64(binary.BigEndian.Uint64(b[:8]))
- msg.Attempts = binary.BigEndian.Uint16(b[8:10])
- copy(msg.ID[:], b[10:10+MsgIDLength])
- msg.Body = b[10+MsgIDLength:]
- return &msg, nil
- }
- func writeMessageToBackend(msg *Message, bq BackendQueue) error {
- buf := bufferPoolGet()
- defer bufferPoolPut(buf)
- _, err := msg.WriteTo(buf)
- if err != nil {
- return err
- }
- return bq.Put(buf.Bytes())
- }
|