deliverer.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. package deliverer
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/go-fed/activity/pub"
  6. "golang.org/x/time/rate"
  7. "math"
  8. "net/url"
  9. "sync"
  10. "time"
  11. )
  12. // DeliveryPersister allows applications to keep track of delivery states of
  13. // the messages being sent, including during retries. This permits clients to
  14. // also resume delivery of messages that were in the process of being delivered
  15. // when the application server was shut down.
  16. type DeliveryPersister interface {
  17. // Sending informs the delivery persister that the provided bytes are
  18. // being delivered to the specified url. It must return a unique id for
  19. // this delivery.
  20. Sending(b []byte, to *url.URL) string
  21. // Cancel informs the delivery persister that the provided delivery was
  22. // interrupted by the server cancelling. These should be retried once
  23. // the server is back online.
  24. Cancel(id string)
  25. // Successful informs the delivery persister that the request has been
  26. // successfully delivered and no further retries are needed.
  27. Successful(id string)
  28. // Retrying indicates the specified delivery is being retried.
  29. Retrying(id string)
  30. // Undeliverable indicates the specified delivery has failed and is no
  31. // longer being retried.
  32. Undeliverable(id string)
  33. }
  34. // DeliveryOptions provides options when delivering messages to federated
  35. // servers. All are required unless explicitly stated otherwise.
  36. type DeliveryOptions struct {
  37. // Initial amount of time to wait before retrying delivery.
  38. InitialRetryTime time.Duration
  39. // The longest amount of time to wait before retrying delivery.
  40. MaximumRetryTime time.Duration
  41. // Rate of backing off retries. Must be at least 1.
  42. BackoffFactor float64
  43. // Maximum number of retries to do when delivering a message. Must be at
  44. // least 1.
  45. MaxRetries int
  46. // Global rate limiter across all deliveries, to prevent spamming
  47. // outbound messages.
  48. RateLimit *rate.Limiter
  49. // Persister allows implementations to save messages that are enqueued
  50. // for delivery between downtimes. It also permits metrics gathering and
  51. // monitoring of outbound messages.
  52. //
  53. // This field is optional.
  54. Persister DeliveryPersister
  55. }
  56. var _ pub.Deliverer = &DelivererPool{}
  57. type DelivererPool struct {
  58. // When present, permits clients to be notified of all state changes
  59. // when delivering a request to another federated server.
  60. //
  61. // Optional.
  62. persister DeliveryPersister
  63. // Limit speed of retries.
  64. initialRetryTime time.Duration
  65. maxRetryTime time.Duration
  66. retryTimeFactor float64
  67. // Limit total number of retries.
  68. maxNumberRetries int
  69. // Enforces speed limit of retries
  70. limiter *rate.Limiter
  71. // Allow graceful cancelling
  72. ctx context.Context
  73. cancel context.CancelFunc
  74. timerId uint64
  75. timerMap map[uint64]*time.Timer
  76. mu sync.Mutex // Limits concurrent access to timerId and timerMap
  77. // Allow graceful error handling
  78. errChan chan error
  79. }
  80. func NewDelivererPool(d DeliveryOptions) *DelivererPool {
  81. ctx, cancel := context.WithCancel(context.Background())
  82. return &DelivererPool{
  83. persister: d.Persister,
  84. initialRetryTime: d.InitialRetryTime,
  85. maxRetryTime: d.MaximumRetryTime,
  86. retryTimeFactor: d.BackoffFactor,
  87. maxNumberRetries: d.MaxRetries,
  88. limiter: d.RateLimit,
  89. ctx: ctx,
  90. cancel: cancel,
  91. timerId: 0,
  92. timerMap: make(map[uint64]*time.Timer, 0),
  93. mu: sync.Mutex{},
  94. errChan: make(chan error, 0),
  95. }
  96. }
  97. type retryData struct {
  98. nextWait time.Duration
  99. n int
  100. f func() error
  101. id string
  102. }
  103. func (r retryData) NextRetry(factor float64, max time.Duration) retryData {
  104. w := time.Duration(int64(math.Floor((float64(r.nextWait) * factor) + 0.5)))
  105. if w > max {
  106. w = max
  107. }
  108. return retryData{
  109. nextWait: w,
  110. n: r.n + 1,
  111. f: r.f,
  112. id: r.id,
  113. }
  114. }
  115. func (r retryData) ShouldRetry(max int) bool {
  116. return r.n < max
  117. }
  118. // Do spawns a goroutine that retries f until it returns no error. Retry
  119. // behavior is determined by the DeliveryOptions passed to the DelivererPool
  120. // upon construction.
  121. func (d *DelivererPool) Do(b []byte, to *url.URL, sendFn func([]byte, *url.URL) error) {
  122. f := func() error {
  123. return sendFn(b, to)
  124. }
  125. go func() {
  126. id := ""
  127. if d.persister != nil {
  128. id = d.persister.Sending(b, to)
  129. }
  130. d.do(retryData{
  131. nextWait: d.initialRetryTime,
  132. n: 0,
  133. f: f,
  134. id: id,
  135. })
  136. }()
  137. }
  138. // Restart resumes a previous attempt at delivering a payload to the specified
  139. // URL. Retry behavior is determined by the DeliveryOptions passed to this
  140. // DelivererPool upon construction, and is not governed by the previous
  141. // DelivererPool that attempted to deliver the message.
  142. func (d *DelivererPool) Restart(b []byte, to *url.URL, id string, sendFn func([]byte, *url.URL) error) {
  143. f := func() error {
  144. return sendFn(b, to)
  145. }
  146. go func() {
  147. d.do(retryData{
  148. nextWait: d.initialRetryTime,
  149. n: 0,
  150. f: f,
  151. id: id,
  152. })
  153. }()
  154. }
  155. // Stop turns down and stops any in-flight requests or retries.
  156. func (d *DelivererPool) Stop() {
  157. d.cancel()
  158. d.closeTimers()
  159. }
  160. // Provides a channel streaming any errors the pool encounters, including errors
  161. // that it retries on.
  162. func (d *DelivererPool) Errors() <-chan error {
  163. return d.errChan
  164. }
  165. func (d *DelivererPool) do(r retryData) {
  166. if err := d.limiter.Wait(d.ctx); err != nil {
  167. if d.persister != nil {
  168. d.persister.Cancel(r.id)
  169. }
  170. d.errChan <- err
  171. return
  172. }
  173. if err := r.f(); err != nil {
  174. d.errChan <- err
  175. if r.ShouldRetry(d.maxNumberRetries) {
  176. if d.persister != nil {
  177. d.persister.Retrying(r.id)
  178. }
  179. d.addClosableTimer(r)
  180. } else {
  181. d.errChan <- fmt.Errorf("delivery tried maximum number of times")
  182. if d.persister != nil {
  183. d.persister.Undeliverable(r.id)
  184. }
  185. }
  186. return
  187. }
  188. if d.persister != nil {
  189. d.persister.Successful(r.id)
  190. }
  191. }
  192. func (d *DelivererPool) addClosableTimer(r retryData) {
  193. d.mu.Lock()
  194. defer d.mu.Unlock()
  195. id := d.timerId
  196. d.timerId++
  197. d.timerMap[id] = time.AfterFunc(r.nextWait, func() {
  198. d.do(r.NextRetry(d.retryTimeFactor, d.maxRetryTime))
  199. d.removeTimer(id)
  200. })
  201. }
  202. func (d *DelivererPool) removeTimer(id uint64) {
  203. d.mu.Lock()
  204. defer d.mu.Unlock()
  205. if _, ok := d.timerMap[id]; ok {
  206. delete(d.timerMap, id)
  207. }
  208. }
  209. func (d *DelivererPool) closeTimers() {
  210. d.mu.Lock()
  211. defer d.mu.Unlock()
  212. for _, v := range d.timerMap {
  213. v.Stop()
  214. }
  215. d.timerMap = make(map[uint64]*time.Timer, 0)
  216. }