apkipa před 2 týdny
revize
2f5cd6bbb0
4 změnil soubory, kde provedl 458 přidání a 0 odebrání
  1. 10 0
      go.mod
  2. 6 0
      go.sum
  3. 330 0
      msg/msg.go
  4. 112 0
      stck-nsq-msg.go

+ 10 - 0
go.mod

@@ -0,0 +1,10 @@
+module stck/stck-nsq-msg
+
+go 1.22.2
+
+require (
+	github.com/denisbrodbeck/machineid v1.0.1
+	github.com/go-viper/mapstructure/v2 v2.2.1
+)
+
+require golang.org/x/sys v0.28.0 // indirect

+ 6 - 0
go.sum

@@ -0,0 +1,6 @@
+github.com/denisbrodbeck/machineid v1.0.1 h1:geKr9qtkB876mXguW2X6TU4ZynleN6ezuMSRhl4D7AQ=
+github.com/denisbrodbeck/machineid v1.0.1/go.mod h1:dJUwb7PTidGDeYyUBmXZ2GphQBbjJCrnectwCyxcUSI=
+github.com/go-viper/mapstructure/v2 v2.2.1 h1:ZAaOCxANMuZx5RCeg0mBdEZk7DZasvvZIxtHqx8aGss=
+github.com/go-viper/mapstructure/v2 v2.2.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM=
+golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
+golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=

+ 330 - 0
msg/msg.go

@@ -0,0 +1,330 @@
+package msg
+
+/** Design notes:
+ *
+ * For server -> client: topic is `client_do_action`
+ * For client -> server: topic is `server_do_notify`
+ *
+ * If you want to upgrade a service, you can send a `RequestDeviceUpdateMsg` to the client,
+ * and the client will respond with a `DeviceActionDoneMsg`. When all clients have responded
+ * with success, you can send a `ForceDeviceRebootMsg` to the client to reboot the service
+ * for the upgrade to take effect.
+ *
+ */
+
+// ----- Client -> Server -----
+
+// * Entity: Device
+type DeviceHeartbeatMsg struct {
+	DeviceID        string `mapstructure:"device_id"`
+	IPAddress       string `mapstructure:"ip_address"`
+	ServiceName     string `mapstructure:"service_name"`
+	ServiceVersion  string `mapstructure:"service_version"` // Compile datetime
+	ProtocolVersion string `mapstructure:"protocol_version"`
+}
+
+const DeviceHeartbeatMsgType = "device_heartbeat"
+
+func (m *DeviceHeartbeatMsg) MsgType() string {
+	return DeviceHeartbeatMsgType
+}
+
+// NOTE: Every time a new version of the protocol is released, this function should be updated
+const ProtocolVersion = "0.1.0"
+
+func MakeDeviceHeartbeatMsg(deviceID, ipAddress, serviceName, serviceVersion string) DeviceHeartbeatMsg {
+	return DeviceHeartbeatMsg{
+		DeviceID:        deviceID,
+		IPAddress:       ipAddress,
+		ServiceName:     serviceName,
+		ServiceVersion:  serviceVersion,
+		ProtocolVersion: ProtocolVersion,
+	}
+}
+
+// * Entity: Device
+type DeviceActionDoneMsg struct {
+	DeviceID    string `mapstructure:"device_id"`
+	ActionType  string `mapstructure:"action_type"` // The same as message type
+	ActionMsgID string `mapstructure:"action_msg_id"`
+	Status      int    `mapstructure:"status"`
+	Msg         string `mapstructure:"msg"` // Can be used to store errors
+}
+
+const DeviceActionDoneMsgType = "device_action_done"
+
+func (m *DeviceActionDoneMsg) MsgType() string {
+	return DeviceActionDoneMsgType
+}
+
+func MakeDeviceActionDoneMsg(deviceID, actionMsgID string, status int, msg string) DeviceActionDoneMsg {
+	return DeviceActionDoneMsg{
+		DeviceID:    deviceID,
+		ActionMsgID: actionMsgID,
+		Status:      status,
+		Msg:         msg,
+	}
+}
+
+// * Entity: Device
+type DevicePongMsg struct {
+	DeviceID string `mapstructure:"device_id"`
+	RecvTime int64  `mapstructure:"recv_time"`
+}
+
+const DevicePongMsgType = "device_pong"
+
+func (m *DevicePongMsg) MsgType() string {
+	return DevicePongMsgType
+}
+
+func MakeDevicePongMsg(deviceID string, recvTime int64) DevicePongMsg {
+	return DevicePongMsg{
+		DeviceID: deviceID,
+		RecvTime: recvTime,
+	}
+}
+
+// * Entity: Part
+type PartUploadStatusChangedMsg struct {
+	DeviceID    string `mapstructure:"device_id"`
+	StartTime   int64  `mapstructure:"start_time"`
+	StreamName  string `mapstructure:"stream_name"`
+	PartNumber  int    `mapstructure:"part_number"`
+	TotalPoints int64  `mapstructure:"total_points"`
+	Status      string `mapstructure:"status"`        // "success", "failed", "unchanged"
+	Msg         string `mapstructure:"msg,omitempty"` // Can be used to store errors
+}
+
+const PartUploadStatusChangedMsgType = "part_upload_status_changed"
+
+func (m *PartUploadStatusChangedMsg) MsgType() string {
+	return PartUploadStatusChangedMsgType
+}
+
+func MakePartUploadStatusChangedMsg(deviceID string, startTime int64, streamName string, partNumber int, totalPoints int64, status, msg string) PartUploadStatusChangedMsg {
+	return PartUploadStatusChangedMsg{
+		DeviceID:    deviceID,
+		StartTime:   startTime,
+		StreamName:  streamName,
+		PartNumber:  partNumber,
+		TotalPoints: totalPoints,
+		Status:      status,
+		Msg:         msg,
+	}
+}
+
+// * Entity: Part
+type PartInsertToStckStatusChangedMsg struct {
+	DeviceID    string `mapstructure:"device_id"`
+	StartTime   int64  `mapstructure:"start_time"`
+	StreamName  string `mapstructure:"stream_name"`
+	PartNumber  int    `mapstructure:"part_number"`
+	TotalPoints int64  `mapstructure:"total_points"`
+	Status      string `mapstructure:"status"`        // "success", "failed", "unchanged"
+	Msg         string `mapstructure:"msg,omitempty"` // Can be used to store errors
+}
+
+const PartInsertToStckStatusChangedMsgType = "part_insert_to_stck_status_changed"
+
+func (m *PartInsertToStckStatusChangedMsg) MsgType() string {
+	return PartInsertToStckStatusChangedMsgType
+}
+
+func MakePartInsertToStckStatusChangedMsg(deviceID string, startTime int64, streamName string, partNumber int, totalPoints int64, status, msg string) PartInsertToStckStatusChangedMsg {
+	return PartInsertToStckStatusChangedMsg{
+		DeviceID:    deviceID,
+		StartTime:   startTime,
+		StreamName:  streamName,
+		PartNumber:  partNumber,
+		TotalPoints: totalPoints,
+		Status:      status,
+		Msg:         msg,
+	}
+}
+
+// * Entity: Stream
+type StreamUploadStatusChangedMsg struct {
+	DeviceID         string      `mapstructure:"device_id"`
+	StartTime        int64       `mapstructure:"start_time"`
+	StreamName       string      `mapstructure:"stream_name"`
+	TotalPointsCount int64       `mapstructure:"total_points_count"`
+	TotalPartsCount  int         `mapstructure:"total_parts_count"`
+	SuccessCount     int         `mapstructure:"success_count"`
+	FailedCount      int         `mapstructure:"failed_count"`
+	UnchangedCount   int         `mapstructure:"unchanged_count"`
+	Details          interface{} `mapstructure:"details,omitempty"` // Currently unused
+}
+
+const StreamUploadStatusChangedMsgType = "stream_upload_status_changed"
+
+func (m *StreamUploadStatusChangedMsg) MsgType() string {
+	return StreamUploadStatusChangedMsgType
+}
+
+func MakeStreamUploadStatusChangedMsg(deviceID string, startTime int64, streamName string, totalPointsCount int64, totalPartsCount, successCount, failedCount, unchangedCount int) StreamUploadStatusChangedMsg {
+	return StreamUploadStatusChangedMsg{
+		DeviceID:         deviceID,
+		StartTime:        startTime,
+		StreamName:       streamName,
+		TotalPointsCount: totalPointsCount,
+		TotalPartsCount:  totalPartsCount,
+		SuccessCount:     successCount,
+		FailedCount:      failedCount,
+		UnchangedCount:   unchangedCount,
+	}
+}
+
+// * Entity: Stream
+type StreamInsertToStckStatusChangedMsg struct {
+	DeviceID         string      `mapstructure:"device_id"`
+	StartTime        int64       `mapstructure:"start_time"`
+	StreamName       string      `mapstructure:"stream_name"`
+	TotalPointsCount int64       `mapstructure:"total_points_count"`
+	TotalPartsCount  int         `mapstructure:"total_parts_count"`
+	SuccessCount     int         `mapstructure:"success_count"`
+	FailedCount      int         `mapstructure:"failed_count"`
+	UnchangedCount   int         `mapstructure:"unchanged_count"`
+	Details          interface{} `mapstructure:"details,omitempty"` // Currently unused
+}
+
+const StreamInsertToStckStatusChangedMsgType = "stream_insert_to_stck_status_changed"
+
+func (m *StreamInsertToStckStatusChangedMsg) MsgType() string {
+	return StreamInsertToStckStatusChangedMsgType
+}
+
+func MakeStreamInsertToStckStatusChangedMsg(deviceID string, startTime int64, streamName string, totalPointsCount int64, totalPartsCount, successCount, failedCount, unchangedCount int) StreamInsertToStckStatusChangedMsg {
+	return StreamInsertToStckStatusChangedMsg{
+		DeviceID:         deviceID,
+		StartTime:        startTime,
+		StreamName:       streamName,
+		TotalPointsCount: totalPointsCount,
+		TotalPartsCount:  totalPartsCount,
+		SuccessCount:     successCount,
+		FailedCount:      failedCount,
+		UnchangedCount:   unchangedCount,
+	}
+}
+
+// ----- Server -> Client -----
+
+// * Entity: Device
+type ForceDeviceRebootMsg struct {
+	DeviceID string `mapstructure:"device_id"` // Specific device ID or `*` for all devices
+}
+
+const ForceDeviceRebootMsgType = "force_device_reboot"
+
+func (m *ForceDeviceRebootMsg) MsgType() string {
+	return ForceDeviceRebootMsgType
+}
+
+func MakeForceDeviceRebootMsg(deviceID string) ForceDeviceRebootMsg {
+	return ForceDeviceRebootMsg{
+		DeviceID: deviceID,
+	}
+}
+
+// * Entity: Device
+type ForceDeviceSendHeartbeatMsg struct {
+	DeviceID string `mapstructure:"device_id"` // Specific device ID or `*` for all devices
+}
+
+const ForceDeviceSendHeartbeatMsgType = "force_device_send_heartbeat"
+
+func (m *ForceDeviceSendHeartbeatMsg) MsgType() string {
+	return ForceDeviceSendHeartbeatMsgType
+}
+
+func MakeForceDeviceSendHeartbeatMsg(deviceID string) ForceDeviceSendHeartbeatMsg {
+	return ForceDeviceSendHeartbeatMsg{
+		DeviceID: deviceID,
+	}
+}
+
+// * Entity: Device
+// * Response: DeviceActionDoneMsg
+type RequestDeviceUpdateMsg struct {
+	DeviceID         string `mapstructure:"device_id"` // Specific device ID or `*` for all devices
+	ServiceName      string `mapstructure:"service_name"`
+	ServiceVersion   string `mapstructure:"service_version"`
+	ServiceBinaryURL string `mapstructure:"service_binary_url"`
+	BinaryHash       string `mapstructure:"binary_hash"`
+}
+
+const RequestDeviceUpdateMsgType = "request_device_update"
+
+func (m *RequestDeviceUpdateMsg) MsgType() string {
+	return RequestDeviceUpdateMsgType
+}
+
+func MakeRequestDeviceUpdateMsg(deviceID, serviceName, serviceVersion, serviceBinaryURL, binaryHash string) RequestDeviceUpdateMsg {
+	return RequestDeviceUpdateMsg{
+		DeviceID:         deviceID,
+		ServiceName:      serviceName,
+		ServiceVersion:   serviceVersion,
+		ServiceBinaryURL: serviceBinaryURL,
+		BinaryHash:       binaryHash,
+	}
+}
+
+// * Entity: Device
+// * Response: DeviceActionDoneMsg
+type RequestDeviceUploadLogsToMinioMsg struct {
+	DeviceID       string `mapstructure:"device_id"` // Specific device ID or `*` for all devices
+	RemoteBucket   string `mapstructure:"remote_bucket"`
+	RemoteBasePath string `mapstructure:"remote_base_path"`
+}
+
+const RequestDeviceUploadLogsToMinioMsgType = "request_device_upload_logs_to_minio"
+
+func (m *RequestDeviceUploadLogsToMinioMsg) MsgType() string {
+	return RequestDeviceUploadLogsToMinioMsgType
+}
+
+func MakeRequestDeviceUploadLogsToMinioMsg(deviceID, remoteBucket, remoteBasePath string) RequestDeviceUploadLogsToMinioMsg {
+	return RequestDeviceUploadLogsToMinioMsg{
+		DeviceID:       deviceID,
+		RemoteBucket:   remoteBucket,
+		RemoteBasePath: remoteBasePath,
+	}
+}
+
+// * Entity: Device
+// * Response: DevicePongMsg
+type DevicePingMsg struct {
+	DeviceID string `mapstructure:"device_id"` // Specific device ID or `*` for all devices
+}
+
+const DevicePingMsgType = "device_ping"
+
+func (m *DevicePingMsg) MsgType() string {
+	return DevicePingMsgType
+}
+
+func MakeDevicePingMsg(deviceID string) DevicePingMsg {
+	return DevicePingMsg{
+		DeviceID: deviceID,
+	}
+}
+
+// * Entity: Device
+// * Response: DeviceActionDoneMsg
+type RequestDeviceExecuteShellScriptMsg struct {
+	DeviceID string `mapstructure:"device_id"` // Specific device ID or `*` for all devices
+	Script   string `mapstructure:"script"`
+}
+
+const RequestDeviceExecuteShellScriptMsgType = "request_device_execute_shell_script"
+
+func (m *RequestDeviceExecuteShellScriptMsg) MsgType() string {
+	return RequestDeviceExecuteShellScriptMsgType
+}
+
+func MakeRequestDeviceExecuteShellScriptMsg(deviceID, script string) RequestDeviceExecuteShellScriptMsg {
+	return RequestDeviceExecuteShellScriptMsg{
+		DeviceID: deviceID,
+		Script:   script,
+	}
+}

+ 112 - 0
stck-nsq-msg.go

@@ -0,0 +1,112 @@
+package stcknsqmsg
+
+import (
+	"encoding/json"
+	"fmt"
+	"net"
+	"os"
+	"stck/stck-nsq-msg/msg"
+
+	"github.com/denisbrodbeck/machineid"
+	"github.com/go-viper/mapstructure/v2"
+)
+
+type StckNsqMsg struct {
+	Type string            `mapstructure:"type"`
+	Data StckNsqMsgVariant `mapstructure:"data"`
+}
+
+type StckNsqMsgRaw struct {
+	Type string                 `mapstructure:"type"`
+	Data map[string]interface{} `mapstructure:"data"`
+}
+
+type StckNsqMsgVariant interface {
+	MsgType() string
+}
+
+func (m *StckNsqMsg) ToString() (string, error) {
+	b, err := json.Marshal(m)
+	if err != nil {
+		return "", err
+	}
+	return string(b), nil
+}
+
+func FromString(s string) (StckNsqMsg, error) {
+	var rm StckNsqMsgRaw
+	var m StckNsqMsg
+	err := json.Unmarshal([]byte(s), &rm)
+	if err != nil {
+		return m, err
+	}
+	// Cast to strongly typed struct
+	m.Type = rm.Type
+	switch rm.Type {
+	case msg.DeviceHeartbeatMsgType:
+		var d msg.DeviceHeartbeatMsg
+		err = mapstructure.Decode(rm.Data, &d)
+		if err != nil {
+			return m, err
+		}
+		m.Data = &d
+	default:
+		return m, fmt.Errorf("unknown message type: %s", rm.Type)
+	}
+	return m, nil
+}
+
+func DeviceIdMatches(id1, id2 string) bool {
+	return id1 == id2 || id1 == "*" || id2 == "*"
+}
+
+func GetLocalIP() (net.IP, error) {
+	ifaces, err := net.Interfaces()
+	if err != nil {
+		return nil, err
+	}
+
+	for _, iface := range ifaces {
+		addrs, err := iface.Addrs()
+		if err != nil {
+			return nil, err
+		}
+
+		for _, addr := range addrs {
+			var ip net.IP
+			switch v := addr.(type) {
+			case *net.IPNet:
+				ip = v.IP
+			case *net.IPAddr:
+				ip = v.IP
+			}
+
+			if ip == nil || ip.IsLoopback() {
+				continue
+			}
+
+			ip = ip.To4()
+			if ip == nil {
+				continue // not an ipv4 address
+			}
+
+			return ip, nil
+		}
+	}
+
+	return nil, fmt.Errorf("no non-loopback IP address found")
+}
+
+func MakeUniqueMachineID() string {
+	ip, err := GetLocalIP()
+	if err != nil {
+		fmt.Fprintf(os.Stderr, "get local IP failed: %v\n", err)
+	}
+
+	machineid, err := machineid.ProtectedID("unique")
+	if err != nil {
+		fmt.Fprintf(os.Stderr, "get machine ID failed: %v\n", err)
+	}
+
+	return fmt.Sprintf("%s-%s", ip.String(), machineid)
+}