msg.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. package msg
  2. import (
  3. "github.com/Masterminds/semver/v3"
  4. )
  5. /** Design notes:
  6. *
  7. * For server -> client: topic is `client_do_action`
  8. * For client -> server: topic is `server_do_notify`
  9. *
  10. * If you want to upgrade a service, you can send a `RequestDeviceUpdateMsg` to the client,
  11. * and the client will respond with a `DeviceActionDoneMsg`. When all clients have responded
  12. * with success, you can send a `ForceDeviceRebootMsg` to the client to reboot the service
  13. * for the upgrade to take effect.
  14. *
  15. */
  16. const ClientDoActionTopic = "client_do_action"
  17. const ServerDoNotifyTopic = "server_do_notify"
  18. // ----- Client -> Server -----
  19. // * Entity: Device
  20. type DeviceHeartbeatMsg struct {
  21. DeviceID string `mapstructure:"device_id"`
  22. LocalStartTime int64 `mapstructure:"local_start_time"`
  23. LocalCurrentTime int64 `mapstructure:"local_current_time"`
  24. IPAddress string `mapstructure:"ip_address"`
  25. ServiceName string `mapstructure:"service_name"`
  26. ServiceVersion string `mapstructure:"service_version"` // Compile datetime
  27. ProtocolVersion string `mapstructure:"protocol_version"`
  28. }
  29. const DeviceHeartbeatMsgType = "device_heartbeat"
  30. func (m *DeviceHeartbeatMsg) MsgType() string {
  31. return DeviceHeartbeatMsgType
  32. }
  33. // NOTE: Every time a new version of the protocol is released, this function should be updated
  34. const ProtocolVersion = "0.1.0"
  35. func MakeDeviceHeartbeatMsg(deviceID string, localStartTime, localCurrentTime int64, ipAddress, serviceName, serviceVersion string) DeviceHeartbeatMsg {
  36. return DeviceHeartbeatMsg{
  37. DeviceID: deviceID,
  38. LocalStartTime: localStartTime,
  39. LocalCurrentTime: localCurrentTime,
  40. IPAddress: ipAddress,
  41. ServiceName: serviceName,
  42. ServiceVersion: serviceVersion,
  43. ProtocolVersion: ProtocolVersion,
  44. }
  45. }
  46. func (m *DeviceHeartbeatMsg) IsProtocolVersionCompatible() bool {
  47. // Check if the protocol version is compatible
  48. msgVer, err := semver.NewVersion(m.ProtocolVersion)
  49. if err != nil {
  50. return false
  51. }
  52. constraint, err := semver.NewConstraint(ProtocolVersion)
  53. if err != nil {
  54. return false
  55. }
  56. return constraint.Check(msgVer)
  57. }
  58. // * Entity: Device
  59. type DeviceActionDoneMsg struct {
  60. DeviceID string `mapstructure:"device_id"`
  61. ActionType string `mapstructure:"action_type"` // The same as message type
  62. ActionMsgID string `mapstructure:"action_msg_id"`
  63. ActionStartTime int64 `mapstructure:"action_start_time"`
  64. Status int `mapstructure:"status"`
  65. Msg string `mapstructure:"msg"` // Can be used to store errors
  66. }
  67. const DeviceActionDoneMsgType = "device_action_done"
  68. func (m *DeviceActionDoneMsg) MsgType() string {
  69. return DeviceActionDoneMsgType
  70. }
  71. func MakeDeviceActionDoneMsg(deviceID, actionType, actionMsgID string, actionStartTime int64, status int, msg string) DeviceActionDoneMsg {
  72. return DeviceActionDoneMsg{
  73. DeviceID: deviceID,
  74. ActionType: actionType,
  75. ActionMsgID: actionMsgID,
  76. ActionStartTime: actionStartTime,
  77. Status: status,
  78. Msg: msg,
  79. }
  80. }
  81. // * Entity: Device
  82. type DevicePongMsg struct {
  83. DeviceID string `mapstructure:"device_id"`
  84. RecvTime int64 `mapstructure:"recv_time"`
  85. }
  86. const DevicePongMsgType = "device_pong"
  87. func (m *DevicePongMsg) MsgType() string {
  88. return DevicePongMsgType
  89. }
  90. func MakeDevicePongMsg(deviceID string, recvTime int64) DevicePongMsg {
  91. return DevicePongMsg{
  92. DeviceID: deviceID,
  93. RecvTime: recvTime,
  94. }
  95. }
  96. // * Entity: Part
  97. type PartUploadStatusChangedMsg struct {
  98. DeviceID string `mapstructure:"device_id"`
  99. StartTime int64 `mapstructure:"start_time"`
  100. StreamName string `mapstructure:"stream_name"`
  101. PartNumber int64 `mapstructure:"part_number"`
  102. TotalPoints int64 `mapstructure:"total_points"`
  103. Status string `mapstructure:"status"` // "success", "failed", "unchanged"
  104. Msg string `mapstructure:"msg,omitempty"` // Can be used to store errors
  105. }
  106. const PartUploadStatusChangedMsgType = "part_upload_status_changed"
  107. func (m *PartUploadStatusChangedMsg) MsgType() string {
  108. return PartUploadStatusChangedMsgType
  109. }
  110. func MakePartUploadStatusChangedMsg(deviceID string, startTime int64, streamName string, partNumber, totalPoints int64, status, msg string) PartUploadStatusChangedMsg {
  111. return PartUploadStatusChangedMsg{
  112. DeviceID: deviceID,
  113. StartTime: startTime,
  114. StreamName: streamName,
  115. PartNumber: partNumber,
  116. TotalPoints: totalPoints,
  117. Status: status,
  118. Msg: msg,
  119. }
  120. }
  121. // * Entity: Part
  122. type PartInsertToStckStatusChangedMsg struct {
  123. DeviceID string `mapstructure:"device_id"`
  124. StartTime int64 `mapstructure:"start_time"`
  125. StreamName string `mapstructure:"stream_name"`
  126. PartNumber int64 `mapstructure:"part_number"`
  127. TotalPoints int64 `mapstructure:"total_points"`
  128. Status string `mapstructure:"status"` // "success", "failed", "unchanged"
  129. Msg string `mapstructure:"msg,omitempty"` // Can be used to store errors
  130. }
  131. const PartInsertToStckStatusChangedMsgType = "part_insert_to_stck_status_changed"
  132. func (m *PartInsertToStckStatusChangedMsg) MsgType() string {
  133. return PartInsertToStckStatusChangedMsgType
  134. }
  135. func MakePartInsertToStckStatusChangedMsg(deviceID string, startTime int64, streamName string, partNumber, totalPoints int64, status, msg string) PartInsertToStckStatusChangedMsg {
  136. return PartInsertToStckStatusChangedMsg{
  137. DeviceID: deviceID,
  138. StartTime: startTime,
  139. StreamName: streamName,
  140. PartNumber: partNumber,
  141. TotalPoints: totalPoints,
  142. Status: status,
  143. Msg: msg,
  144. }
  145. }
  146. // * Entity: Stream
  147. type StreamUploadStatusChangedMsg struct {
  148. DeviceID string `mapstructure:"device_id"`
  149. StartTime int64 `mapstructure:"start_time"`
  150. StreamName string `mapstructure:"stream_name"`
  151. TotalPointsCount int64 `mapstructure:"total_points_count"`
  152. TotalPartsCount int64 `mapstructure:"total_parts_count"`
  153. SuccessCount int64 `mapstructure:"success_count"`
  154. FailedCount int64 `mapstructure:"failed_count"`
  155. UnchangedCount int64 `mapstructure:"unchanged_count"`
  156. Details interface{} `mapstructure:"details,omitempty"` // Currently unused
  157. }
  158. const StreamUploadStatusChangedMsgType = "stream_upload_status_changed"
  159. func (m *StreamUploadStatusChangedMsg) MsgType() string {
  160. return StreamUploadStatusChangedMsgType
  161. }
  162. func MakeStreamUploadStatusChangedMsg(deviceID string, startTime int64, streamName string, totalPointsCount int64, totalPartsCount, successCount, failedCount, unchangedCount int64) StreamUploadStatusChangedMsg {
  163. return StreamUploadStatusChangedMsg{
  164. DeviceID: deviceID,
  165. StartTime: startTime,
  166. StreamName: streamName,
  167. TotalPointsCount: totalPointsCount,
  168. TotalPartsCount: totalPartsCount,
  169. SuccessCount: successCount,
  170. FailedCount: failedCount,
  171. UnchangedCount: unchangedCount,
  172. }
  173. }
  174. // * Entity: Stream
  175. type StreamInsertToStckStatusChangedMsg struct {
  176. DeviceID string `mapstructure:"device_id"`
  177. StartTime int64 `mapstructure:"start_time"`
  178. StreamName string `mapstructure:"stream_name"`
  179. TotalPointsCount int64 `mapstructure:"total_points_count"`
  180. TotalPartsCount int64 `mapstructure:"total_parts_count"`
  181. SuccessCount int64 `mapstructure:"success_count"`
  182. FailedCount int64 `mapstructure:"failed_count"`
  183. UnchangedCount int64 `mapstructure:"unchanged_count"`
  184. Details interface{} `mapstructure:"details,omitempty"` // Currently unused
  185. }
  186. const StreamInsertToStckStatusChangedMsgType = "stream_insert_to_stck_status_changed"
  187. func (m *StreamInsertToStckStatusChangedMsg) MsgType() string {
  188. return StreamInsertToStckStatusChangedMsgType
  189. }
  190. func MakeStreamInsertToStckStatusChangedMsg(deviceID string, startTime int64, streamName string, totalPointsCount int64, totalPartsCount, successCount, failedCount, unchangedCount int64) StreamInsertToStckStatusChangedMsg {
  191. return StreamInsertToStckStatusChangedMsg{
  192. DeviceID: deviceID,
  193. StartTime: startTime,
  194. StreamName: streamName,
  195. TotalPointsCount: totalPointsCount,
  196. TotalPartsCount: totalPartsCount,
  197. SuccessCount: successCount,
  198. FailedCount: failedCount,
  199. UnchangedCount: unchangedCount,
  200. }
  201. }
  202. // ----- Server -> Client -----
  203. // * Entity: Device
  204. type ForceDeviceRebootMsg struct {
  205. DeviceID string `mapstructure:"device_id"` // Specific device ID or `*` for all devices
  206. }
  207. const ForceDeviceRebootMsgType = "force_device_reboot"
  208. func (m *ForceDeviceRebootMsg) MsgType() string {
  209. return ForceDeviceRebootMsgType
  210. }
  211. func MakeForceDeviceRebootMsg(deviceID string) ForceDeviceRebootMsg {
  212. return ForceDeviceRebootMsg{
  213. DeviceID: deviceID,
  214. }
  215. }
  216. // * Entity: Device
  217. type ForceDeviceSendHeartbeatMsg struct {
  218. DeviceID string `mapstructure:"device_id"` // Specific device ID or `*` for all devices
  219. }
  220. const ForceDeviceSendHeartbeatMsgType = "force_device_send_heartbeat"
  221. func (m *ForceDeviceSendHeartbeatMsg) MsgType() string {
  222. return ForceDeviceSendHeartbeatMsgType
  223. }
  224. func MakeForceDeviceSendHeartbeatMsg(deviceID string) ForceDeviceSendHeartbeatMsg {
  225. return ForceDeviceSendHeartbeatMsg{
  226. DeviceID: deviceID,
  227. }
  228. }
  229. // * Entity: Device
  230. // * Response: DeviceActionDoneMsg
  231. type RequestDeviceUpdateMsg struct {
  232. DeviceID string `mapstructure:"device_id"` // Specific device ID or `*` for all devices
  233. ServiceName string `mapstructure:"service_name"`
  234. ServiceVersion string `mapstructure:"service_version"`
  235. ServiceBinaryURL string `mapstructure:"service_binary_url"`
  236. BinaryHash string `mapstructure:"binary_hash"`
  237. }
  238. const RequestDeviceUpdateMsgType = "request_device_update"
  239. func (m *RequestDeviceUpdateMsg) MsgType() string {
  240. return RequestDeviceUpdateMsgType
  241. }
  242. func MakeRequestDeviceUpdateMsg(deviceID, serviceName, serviceVersion, serviceBinaryURL, binaryHash string) RequestDeviceUpdateMsg {
  243. return RequestDeviceUpdateMsg{
  244. DeviceID: deviceID,
  245. ServiceName: serviceName,
  246. ServiceVersion: serviceVersion,
  247. ServiceBinaryURL: serviceBinaryURL,
  248. BinaryHash: binaryHash,
  249. }
  250. }
  251. // * Entity: Device
  252. // * Response: DeviceActionDoneMsg
  253. type RequestDeviceUploadLogsToMinioMsg struct {
  254. DeviceID string `mapstructure:"device_id"` // Specific device ID or `*` for all devices
  255. RemoteBucket string `mapstructure:"remote_bucket"`
  256. RemoteBasePath string `mapstructure:"remote_base_path"`
  257. }
  258. const RequestDeviceUploadLogsToMinioMsgType = "request_device_upload_logs_to_minio"
  259. func (m *RequestDeviceUploadLogsToMinioMsg) MsgType() string {
  260. return RequestDeviceUploadLogsToMinioMsgType
  261. }
  262. func MakeRequestDeviceUploadLogsToMinioMsg(deviceID, remoteBucket, remoteBasePath string) RequestDeviceUploadLogsToMinioMsg {
  263. return RequestDeviceUploadLogsToMinioMsg{
  264. DeviceID: deviceID,
  265. RemoteBucket: remoteBucket,
  266. RemoteBasePath: remoteBasePath,
  267. }
  268. }
  269. // * Entity: Device
  270. // * Response: DevicePongMsg
  271. type DevicePingMsg struct {
  272. DeviceID string `mapstructure:"device_id"` // Specific device ID or `*` for all devices
  273. }
  274. const DevicePingMsgType = "device_ping"
  275. func (m *DevicePingMsg) MsgType() string {
  276. return DevicePingMsgType
  277. }
  278. func MakeDevicePingMsg(deviceID string) DevicePingMsg {
  279. return DevicePingMsg{
  280. DeviceID: deviceID,
  281. }
  282. }
  283. // * Entity: Device
  284. // * Response: DeviceActionDoneMsg
  285. type RequestDeviceExecuteShellScriptMsg struct {
  286. DeviceID string `mapstructure:"device_id"` // Specific device ID or `*` for all devices
  287. Script string `mapstructure:"script"`
  288. }
  289. const RequestDeviceExecuteShellScriptMsgType = "request_device_execute_shell_script"
  290. func (m *RequestDeviceExecuteShellScriptMsg) MsgType() string {
  291. return RequestDeviceExecuteShellScriptMsgType
  292. }
  293. func MakeRequestDeviceExecuteShellScriptMsg(deviceID, script string) RequestDeviceExecuteShellScriptMsg {
  294. return RequestDeviceExecuteShellScriptMsg{
  295. DeviceID: deviceID,
  296. Script: script,
  297. }
  298. }