package nsqd import ( "encoding/binary" "encoding/json" "fmt" "io" "time" ) const ( MsgIDLength = 16 minValidMsgLength = MsgIDLength + 8 + 2 // Timestamp + Attempts ) type MessageID [MsgIDLength]byte type Message struct { ID MessageID Body []byte Timestamp int64 Attempts uint16 // for in-flight handling deliveryTS time.Time clientID int64 pri int64 index int deferred time.Duration } func NewMessage(id MessageID, body []byte) *Message { return &Message{ ID: id, Body: body, Timestamp: time.Now().UnixNano(), } } type Data struct { Date string `json:"date"` Msg string `json:"msg"` Latency int64 `json:"latency"` Priority int64 `json:"priority"` } func (m *Message) getLatency() (int64, error) { var data Data if err := json.Unmarshal(m.Body, &data); err == nil { return data.Latency, err } else { return 0, err } } func (m *Message) getPriority() (int64, error) { var data Data if err := json.Unmarshal(m.Body, &data); err == nil { return data.Priority, err } else { return 0, err } } func (m *Message) WriteTo(w io.Writer) (int64, error) { var buf [10]byte var total int64 binary.BigEndian.PutUint64(buf[:8], uint64(m.Timestamp)) binary.BigEndian.PutUint16(buf[8:10], uint16(m.Attempts)) n, err := w.Write(buf[:]) total += int64(n) if err != nil { return total, err } n, err = w.Write(m.ID[:]) total += int64(n) if err != nil { return total, err } n, err = w.Write(m.Body) total += int64(n) if err != nil { return total, err } return total, nil } // decodeMessage deserializes data (as []byte) and creates a new Message // message format: // [x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]... // | (int64) || || (hex string encoded in ASCII) || (binary) // | 8-byte || || 16-byte || N-byte // ------------------------------------------------------------------------------------------... // nanosecond timestamp ^^ message ID message body // (uint16) // 2-byte // attempts func decodeMessage(b []byte) (*Message, error) { var msg Message if len(b) < minValidMsgLength { return nil, fmt.Errorf("invalid message buffer size (%d)", len(b)) } msg.Timestamp = int64(binary.BigEndian.Uint64(b[:8])) msg.Attempts = binary.BigEndian.Uint16(b[8:10]) copy(msg.ID[:], b[10:10+MsgIDLength]) msg.Body = b[10+MsgIDLength:] return &msg, nil } func writeMessageToBackend(msg *Message, bq BackendQueue) error { buf := bufferPoolGet() defer bufferPoolPut(buf) _, err := msg.WriteTo(buf) if err != nil { return err } return bq.Put(buf.Bytes()) }