package msg import ( "github.com/Masterminds/semver/v3" ) /** Design notes: * * For server -> client: topic is `client_do_action` * For client -> server: topic is `server_do_notify` * * If you want to upgrade a service, you can send a `RequestDeviceUpdateMsg` to the client, * and the client will respond with a `DeviceActionDoneMsg`. When all clients have responded * with success, you can send a `ForceDeviceRebootMsg` to the client to reboot the service * for the upgrade to take effect. * */ const ClientDoActionTopic = "client_do_action" const ServerDoNotifyTopic = "server_do_notify" // ----- Client -> Server ----- // * Entity: Device type DeviceHeartbeatMsg struct { DeviceID string `mapstructure:"device_id"` LocalStartTime int64 `mapstructure:"local_start_time"` LocalCurrentTime int64 `mapstructure:"local_current_time"` IPAddress string `mapstructure:"ip_address"` ServiceName string `mapstructure:"service_name"` ServiceVersion string `mapstructure:"service_version"` // Compile datetime ProtocolVersion string `mapstructure:"protocol_version"` } const DeviceHeartbeatMsgType = "device_heartbeat" func (m *DeviceHeartbeatMsg) MsgType() string { return DeviceHeartbeatMsgType } // NOTE: Every time a new version of the protocol is released, this function should be updated const ProtocolVersion = "0.1.0" func MakeDeviceHeartbeatMsg(deviceID string, localStartTime, localCurrentTime int64, ipAddress, serviceName, serviceVersion string) DeviceHeartbeatMsg { return DeviceHeartbeatMsg{ DeviceID: deviceID, LocalStartTime: localStartTime, LocalCurrentTime: localCurrentTime, IPAddress: ipAddress, ServiceName: serviceName, ServiceVersion: serviceVersion, ProtocolVersion: ProtocolVersion, } } func (m *DeviceHeartbeatMsg) IsProtocolVersionCompatible() bool { // Check if the protocol version is compatible msgVer, err := semver.NewVersion(m.ProtocolVersion) if err != nil { return false } constraint, err := semver.NewConstraint(ProtocolVersion) if err != nil { return false } return constraint.Check(msgVer) } // * Entity: Device type DeviceActionDoneMsg struct { DeviceID string `mapstructure:"device_id"` ActionType string `mapstructure:"action_type"` // The same as message type ActionMsgID string `mapstructure:"action_msg_id"` ActionStartTime int64 `mapstructure:"action_start_time"` Status int `mapstructure:"status"` Msg string `mapstructure:"msg"` // Can be used to store errors } const DeviceActionDoneMsgType = "device_action_done" func (m *DeviceActionDoneMsg) MsgType() string { return DeviceActionDoneMsgType } func MakeDeviceActionDoneMsg(deviceID, actionType, actionMsgID string, actionStartTime int64, status int, msg string) DeviceActionDoneMsg { return DeviceActionDoneMsg{ DeviceID: deviceID, ActionType: actionType, ActionMsgID: actionMsgID, ActionStartTime: actionStartTime, Status: status, Msg: msg, } } // * Entity: Device type DevicePongMsg struct { DeviceID string `mapstructure:"device_id"` RecvTime int64 `mapstructure:"recv_time"` } const DevicePongMsgType = "device_pong" func (m *DevicePongMsg) MsgType() string { return DevicePongMsgType } func MakeDevicePongMsg(deviceID string, recvTime int64) DevicePongMsg { return DevicePongMsg{ DeviceID: deviceID, RecvTime: recvTime, } } // * Entity: Part type PartUploadStatusChangedMsg struct { DeviceID string `mapstructure:"device_id"` StartTime int64 `mapstructure:"start_time"` StreamName string `mapstructure:"stream_name"` PartNumber int64 `mapstructure:"part_number"` TotalPoints int64 `mapstructure:"total_points"` Status string `mapstructure:"status"` // "success", "failed", "unchanged" Msg string `mapstructure:"msg,omitempty"` // Can be used to store errors } const PartUploadStatusChangedMsgType = "part_upload_status_changed" func (m *PartUploadStatusChangedMsg) MsgType() string { return PartUploadStatusChangedMsgType } func MakePartUploadStatusChangedMsg(deviceID string, startTime int64, streamName string, partNumber, totalPoints int64, status, msg string) PartUploadStatusChangedMsg { return PartUploadStatusChangedMsg{ DeviceID: deviceID, StartTime: startTime, StreamName: streamName, PartNumber: partNumber, TotalPoints: totalPoints, Status: status, Msg: msg, } } // * Entity: Part type PartInsertToStckStatusChangedMsg struct { DeviceID string `mapstructure:"device_id"` StartTime int64 `mapstructure:"start_time"` StreamName string `mapstructure:"stream_name"` PartNumber int64 `mapstructure:"part_number"` TotalPoints int64 `mapstructure:"total_points"` Status string `mapstructure:"status"` // "success", "failed", "unchanged" Msg string `mapstructure:"msg,omitempty"` // Can be used to store errors } const PartInsertToStckStatusChangedMsgType = "part_insert_to_stck_status_changed" func (m *PartInsertToStckStatusChangedMsg) MsgType() string { return PartInsertToStckStatusChangedMsgType } func MakePartInsertToStckStatusChangedMsg(deviceID string, startTime int64, streamName string, partNumber, totalPoints int64, status, msg string) PartInsertToStckStatusChangedMsg { return PartInsertToStckStatusChangedMsg{ DeviceID: deviceID, StartTime: startTime, StreamName: streamName, PartNumber: partNumber, TotalPoints: totalPoints, Status: status, Msg: msg, } } // * Entity: Stream type StreamUploadStatusChangedMsg struct { DeviceID string `mapstructure:"device_id"` StartTime int64 `mapstructure:"start_time"` StreamName string `mapstructure:"stream_name"` TotalPointsCount int64 `mapstructure:"total_points_count"` TotalPartsCount int64 `mapstructure:"total_parts_count"` SuccessCount int64 `mapstructure:"success_count"` FailedCount int64 `mapstructure:"failed_count"` UnchangedCount int64 `mapstructure:"unchanged_count"` Details interface{} `mapstructure:"details,omitempty"` // Currently unused } const StreamUploadStatusChangedMsgType = "stream_upload_status_changed" func (m *StreamUploadStatusChangedMsg) MsgType() string { return StreamUploadStatusChangedMsgType } func MakeStreamUploadStatusChangedMsg(deviceID string, startTime int64, streamName string, totalPointsCount int64, totalPartsCount, successCount, failedCount, unchangedCount int64) StreamUploadStatusChangedMsg { return StreamUploadStatusChangedMsg{ DeviceID: deviceID, StartTime: startTime, StreamName: streamName, TotalPointsCount: totalPointsCount, TotalPartsCount: totalPartsCount, SuccessCount: successCount, FailedCount: failedCount, UnchangedCount: unchangedCount, } } // * Entity: Stream type StreamInsertToStckStatusChangedMsg struct { DeviceID string `mapstructure:"device_id"` StartTime int64 `mapstructure:"start_time"` StreamName string `mapstructure:"stream_name"` TotalPointsCount int64 `mapstructure:"total_points_count"` TotalPartsCount int64 `mapstructure:"total_parts_count"` SuccessCount int64 `mapstructure:"success_count"` FailedCount int64 `mapstructure:"failed_count"` UnchangedCount int64 `mapstructure:"unchanged_count"` Details interface{} `mapstructure:"details,omitempty"` // Currently unused } const StreamInsertToStckStatusChangedMsgType = "stream_insert_to_stck_status_changed" func (m *StreamInsertToStckStatusChangedMsg) MsgType() string { return StreamInsertToStckStatusChangedMsgType } func MakeStreamInsertToStckStatusChangedMsg(deviceID string, startTime int64, streamName string, totalPointsCount int64, totalPartsCount, successCount, failedCount, unchangedCount int64) StreamInsertToStckStatusChangedMsg { return StreamInsertToStckStatusChangedMsg{ DeviceID: deviceID, StartTime: startTime, StreamName: streamName, TotalPointsCount: totalPointsCount, TotalPartsCount: totalPartsCount, SuccessCount: successCount, FailedCount: failedCount, UnchangedCount: unchangedCount, } } // ----- Server -> Client ----- // * Entity: Device type ForceDeviceRebootMsg struct { DeviceID string `mapstructure:"device_id"` // Specific device ID or `*` for all devices } const ForceDeviceRebootMsgType = "force_device_reboot" func (m *ForceDeviceRebootMsg) MsgType() string { return ForceDeviceRebootMsgType } func MakeForceDeviceRebootMsg(deviceID string) ForceDeviceRebootMsg { return ForceDeviceRebootMsg{ DeviceID: deviceID, } } // * Entity: Device type ForceDeviceSendHeartbeatMsg struct { DeviceID string `mapstructure:"device_id"` // Specific device ID or `*` for all devices } const ForceDeviceSendHeartbeatMsgType = "force_device_send_heartbeat" func (m *ForceDeviceSendHeartbeatMsg) MsgType() string { return ForceDeviceSendHeartbeatMsgType } func MakeForceDeviceSendHeartbeatMsg(deviceID string) ForceDeviceSendHeartbeatMsg { return ForceDeviceSendHeartbeatMsg{ DeviceID: deviceID, } } // * Entity: Device // * Response: DeviceActionDoneMsg type RequestDeviceUpdateMsg struct { DeviceID string `mapstructure:"device_id"` // Specific device ID or `*` for all devices ServiceName string `mapstructure:"service_name"` ServiceVersion string `mapstructure:"service_version"` ServiceBinaryURL string `mapstructure:"service_binary_url"` BinaryHash string `mapstructure:"binary_hash"` } const RequestDeviceUpdateMsgType = "request_device_update" func (m *RequestDeviceUpdateMsg) MsgType() string { return RequestDeviceUpdateMsgType } func MakeRequestDeviceUpdateMsg(deviceID, serviceName, serviceVersion, serviceBinaryURL, binaryHash string) RequestDeviceUpdateMsg { return RequestDeviceUpdateMsg{ DeviceID: deviceID, ServiceName: serviceName, ServiceVersion: serviceVersion, ServiceBinaryURL: serviceBinaryURL, BinaryHash: binaryHash, } } // * Entity: Device // * Response: DeviceActionDoneMsg type RequestDeviceUploadLogsToMinioMsg struct { DeviceID string `mapstructure:"device_id"` // Specific device ID or `*` for all devices RemoteBucket string `mapstructure:"remote_bucket"` RemoteBasePath string `mapstructure:"remote_base_path"` } const RequestDeviceUploadLogsToMinioMsgType = "request_device_upload_logs_to_minio" func (m *RequestDeviceUploadLogsToMinioMsg) MsgType() string { return RequestDeviceUploadLogsToMinioMsgType } func MakeRequestDeviceUploadLogsToMinioMsg(deviceID, remoteBucket, remoteBasePath string) RequestDeviceUploadLogsToMinioMsg { return RequestDeviceUploadLogsToMinioMsg{ DeviceID: deviceID, RemoteBucket: remoteBucket, RemoteBasePath: remoteBasePath, } } // * Entity: Device // * Response: DevicePongMsg type DevicePingMsg struct { DeviceID string `mapstructure:"device_id"` // Specific device ID or `*` for all devices } const DevicePingMsgType = "device_ping" func (m *DevicePingMsg) MsgType() string { return DevicePingMsgType } func MakeDevicePingMsg(deviceID string) DevicePingMsg { return DevicePingMsg{ DeviceID: deviceID, } } // * Entity: Device // * Response: DeviceActionDoneMsg type RequestDeviceExecuteShellScriptMsg struct { DeviceID string `mapstructure:"device_id"` // Specific device ID or `*` for all devices Script string `mapstructure:"script"` } const RequestDeviceExecuteShellScriptMsgType = "request_device_execute_shell_script" func (m *RequestDeviceExecuteShellScriptMsg) MsgType() string { return RequestDeviceExecuteShellScriptMsgType } func MakeRequestDeviceExecuteShellScriptMsg(deviceID, script string) RequestDeviceExecuteShellScriptMsg { return RequestDeviceExecuteShellScriptMsg{ DeviceID: deviceID, Script: script, } }