tcp_server.go 990 B

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. package protocol
  2. import (
  3. "fmt"
  4. "net"
  5. "runtime"
  6. "strings"
  7. "sync"
  8. "github.com/nsqio/nsq/internal/lg"
  9. )
  10. type TCPHandler interface {
  11. Handle(net.Conn)
  12. }
  13. func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error {
  14. logf(lg.INFO, "TCP: listening on %s", listener.Addr())
  15. var wg sync.WaitGroup
  16. for {
  17. clientConn, err := listener.Accept()
  18. if err != nil {
  19. if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
  20. logf(lg.WARN, "temporary Accept() failure - %s", err)
  21. runtime.Gosched()
  22. continue
  23. }
  24. // theres no direct way to detect this error because it is not exposed
  25. if !strings.Contains(err.Error(), "use of closed network connection") {
  26. return fmt.Errorf("listener.Accept() error - %s", err)
  27. }
  28. break
  29. }
  30. wg.Add(1)
  31. go func() {
  32. handler.Handle(clientConn)
  33. wg.Done()
  34. }()
  35. }
  36. // wait to return until all handler goroutines complete
  37. wg.Wait()
  38. logf(lg.INFO, "TCP: closing %s", listener.Addr())
  39. return nil
  40. }