doc.go 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. /*
  2. Package nsq is the official Go package for NSQ (http://nsq.io/).
  3. It provides high-level Consumer and Producer types as well as low-level
  4. functions to communicate over the NSQ protocol.
  5. Consumer
  6. Consuming messages from NSQ can be done by creating an instance of a Consumer and supplying it a handler.
  7. package main
  8. import (
  9. "log"
  10. "os/signal"
  11. "github.com/nsqio/go-nsq"
  12. )
  13. type myMessageHandler struct {}
  14. // HandleMessage implements the Handler interface.
  15. func (h *myMessageHandler) HandleMessage(m *nsq.Message) error {
  16. if len(m.Body) == 0 {
  17. // Returning nil will automatically send a FIN command to NSQ to mark the message as processed.
  18. // In this case, a message with an empty body is simply ignored/discarded.
  19. return nil
  20. }
  21. // do whatever actual message processing is desired
  22. err := processMessage(m.Body)
  23. // Returning a non-nil error will automatically send a REQ command to NSQ to re-queue the message.
  24. return err
  25. }
  26. func main() {
  27. // Instantiate a consumer that will subscribe to the provided channel.
  28. config := nsq.NewConfig()
  29. consumer, err := nsq.NewConsumer("topic", "channel", config)
  30. if err != nil {
  31. log.Fatal(err)
  32. }
  33. // Set the Handler for messages received by this Consumer. Can be called multiple times.
  34. // See also AddConcurrentHandlers.
  35. consumer.AddHandler(&myMessageHandler{})
  36. // Use nsqlookupd to discover nsqd instances.
  37. // See also ConnectToNSQD, ConnectToNSQDs, ConnectToNSQLookupds.
  38. err = consumer.ConnectToNSQLookupd("localhost:4161")
  39. if err != nil {
  40. log.Fatal(err)
  41. }
  42. // wait for signal to exit
  43. sigChan := make(chan os.Signal, 1)
  44. signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
  45. <-sigChan
  46. // Gracefully stop the consumer.
  47. consumer.Stop()
  48. }
  49. Producer
  50. Producing messages can be done by creating an instance of a Producer.
  51. // Instantiate a producer.
  52. config := nsq.NewConfig()
  53. producer, err := nsq.NewProducer("127.0.0.1:4150", config)
  54. if err != nil {
  55. log.Fatal(err)
  56. }
  57. messageBody := []byte("hello")
  58. topicName := "topic"
  59. // Synchronously publish a single message to the specified topic.
  60. // Messages can also be sent asynchronously and/or in batches.
  61. err = producer.Publish(topicName, messageBody)
  62. if err != nil {
  63. log.Fatal(err)
  64. }
  65. // Gracefully stop the producer when appropriate (e.g. before shutting down the service)
  66. producer.Stop()
  67. */
  68. package nsq