deliverer_test.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. package deliverer
  2. import (
  3. "fmt"
  4. "github.com/go-test/deep"
  5. "golang.org/x/time/rate"
  6. "net/url"
  7. "sync"
  8. "testing"
  9. "time"
  10. )
  11. const (
  12. id1 = "id1"
  13. id2 = "id2"
  14. sending = "sending"
  15. cancel = "cancel"
  16. successful = "successful"
  17. retrying = "retrying"
  18. undeliverable = "undeliverable"
  19. noState = "noState"
  20. )
  21. var (
  22. testBytes []byte = []byte{0, 1, 2, 3}
  23. testURL *url.URL
  24. )
  25. func init() {
  26. var err error
  27. testURL, err = url.Parse("example.com")
  28. if err != nil {
  29. panic(err)
  30. }
  31. }
  32. var _ DeliveryPersister = &mockDeliveryPersister{}
  33. type mockDeliveryPersister struct {
  34. t *testing.T
  35. i int
  36. mu *sync.Mutex
  37. id1State string
  38. id2State string
  39. }
  40. func newMockDeliveryPersister(t *testing.T) *mockDeliveryPersister {
  41. return &mockDeliveryPersister{
  42. t: t,
  43. mu: &sync.Mutex{},
  44. id1State: noState,
  45. id2State: noState,
  46. }
  47. }
  48. func (m *mockDeliveryPersister) Sending(b []byte, to *url.URL) string {
  49. m.mu.Lock()
  50. defer m.mu.Unlock()
  51. if m.i == 0 {
  52. m.i++
  53. return id1
  54. } else if m.i == 1 {
  55. m.i++
  56. return id2
  57. } else {
  58. m.t.Fatal("too many calls to Sending")
  59. }
  60. return ""
  61. }
  62. func (m *mockDeliveryPersister) Cancel(id string) {
  63. m.mu.Lock()
  64. defer m.mu.Unlock()
  65. if id == id1 {
  66. m.id1State = cancel
  67. } else if id == id2 {
  68. m.id2State = cancel
  69. } else {
  70. m.t.Fatalf("unknown Cancel id: %s", id)
  71. }
  72. }
  73. func (m *mockDeliveryPersister) Successful(id string) {
  74. m.mu.Lock()
  75. defer m.mu.Unlock()
  76. if id == id1 {
  77. m.id1State = successful
  78. } else if id == id2 {
  79. m.id2State = successful
  80. } else {
  81. m.t.Fatalf("unknown Successful id: %s", id)
  82. }
  83. }
  84. func (m *mockDeliveryPersister) Retrying(id string) {
  85. m.mu.Lock()
  86. defer m.mu.Unlock()
  87. if id == id1 {
  88. m.id1State = retrying
  89. } else if id == id2 {
  90. m.id2State = retrying
  91. } else {
  92. m.t.Fatalf("unknown Retrying id: %s", id)
  93. }
  94. }
  95. func (m *mockDeliveryPersister) Undeliverable(id string) {
  96. m.mu.Lock()
  97. defer m.mu.Unlock()
  98. if id == id1 {
  99. m.id1State = undeliverable
  100. } else if id == id2 {
  101. m.id2State = undeliverable
  102. } else {
  103. m.t.Fatalf("unknown Retrying id: %s", id)
  104. }
  105. }
  106. func TestDelivererPoolSuccessNoPersister(t *testing.T) {
  107. testSendFn := func(b []byte, u *url.URL) error {
  108. if diff := deep.Equal(b, testBytes); diff != nil {
  109. t.Fatal(diff)
  110. } else if u != testURL {
  111. t.Fatal("wrong testURL")
  112. }
  113. return nil
  114. }
  115. pool := NewDelivererPool(DeliveryOptions{
  116. InitialRetryTime: time.Microsecond,
  117. MaximumRetryTime: time.Microsecond,
  118. BackoffFactor: 2,
  119. MaxRetries: 1,
  120. RateLimit: rate.NewLimiter(1, 1),
  121. })
  122. pool.Do(testBytes, testURL, testSendFn)
  123. time.Sleep(time.Microsecond * 500)
  124. }
  125. func TestDelivererPoolSuccessPersister(t *testing.T) {
  126. testSendFn := func(b []byte, u *url.URL) error {
  127. if diff := deep.Equal(b, testBytes); diff != nil {
  128. t.Fatal(diff)
  129. } else if u != testURL {
  130. t.Fatal("wrong testURL")
  131. }
  132. return nil
  133. }
  134. p := newMockDeliveryPersister(t)
  135. pool := NewDelivererPool(DeliveryOptions{
  136. InitialRetryTime: time.Microsecond,
  137. MaximumRetryTime: time.Microsecond,
  138. BackoffFactor: 2,
  139. MaxRetries: 1,
  140. RateLimit: rate.NewLimiter(1, 1),
  141. Persister: p,
  142. })
  143. pool.Do(testBytes, testURL, testSendFn)
  144. time.Sleep(time.Microsecond * 500)
  145. if p.id1State != successful {
  146. t.Fatalf("want: %s, got %s", successful, p.id1State)
  147. }
  148. }
  149. func TestRestartSuccess(t *testing.T) {
  150. testSendFn := func(b []byte, u *url.URL) error {
  151. if diff := deep.Equal(b, testBytes); diff != nil {
  152. t.Fatal(diff)
  153. } else if u != testURL {
  154. t.Fatal("wrong testURL")
  155. }
  156. return nil
  157. }
  158. p := newMockDeliveryPersister(t)
  159. pool := NewDelivererPool(DeliveryOptions{
  160. InitialRetryTime: time.Microsecond,
  161. MaximumRetryTime: time.Microsecond,
  162. BackoffFactor: 2,
  163. MaxRetries: 1,
  164. RateLimit: rate.NewLimiter(1, 1),
  165. Persister: p,
  166. })
  167. pool.Restart(testBytes, testURL, id2, testSendFn)
  168. time.Sleep(time.Microsecond * 500)
  169. if p.id2State != successful {
  170. t.Fatalf("want: %s, got %s", successful, p.id1State)
  171. }
  172. }
  173. func TestDelivererPoolRetrying(t *testing.T) {
  174. testSendFn := func(b []byte, u *url.URL) error {
  175. if diff := deep.Equal(b, testBytes); diff != nil {
  176. t.Fatal(diff)
  177. } else if u != testURL {
  178. t.Fatal("wrong testURL")
  179. }
  180. return fmt.Errorf("expected")
  181. }
  182. p := newMockDeliveryPersister(t)
  183. pool := NewDelivererPool(DeliveryOptions{
  184. InitialRetryTime: time.Microsecond,
  185. MaximumRetryTime: time.Microsecond,
  186. BackoffFactor: 2,
  187. MaxRetries: 1,
  188. RateLimit: rate.NewLimiter(1000000, 10000000),
  189. Persister: p,
  190. })
  191. pool.Do(testBytes, testURL, testSendFn)
  192. time.Sleep(time.Microsecond * 500)
  193. select {
  194. case <-pool.Errors():
  195. default:
  196. t.Fatal("expected error")
  197. }
  198. time.Sleep(time.Microsecond * 500)
  199. if p.id1State != retrying {
  200. t.Fatalf("want: %s, got %s", retrying, p.id1State)
  201. }
  202. }
  203. func TestDelivererPoolUndeliverable(t *testing.T) {
  204. testSendFn := func(b []byte, u *url.URL) error {
  205. if diff := deep.Equal(b, testBytes); diff != nil {
  206. t.Fatal(diff)
  207. } else if u != testURL {
  208. t.Fatal("wrong testURL")
  209. }
  210. return fmt.Errorf("expected")
  211. }
  212. p := newMockDeliveryPersister(t)
  213. pool := NewDelivererPool(DeliveryOptions{
  214. InitialRetryTime: time.Microsecond,
  215. MaximumRetryTime: time.Microsecond,
  216. BackoffFactor: 2,
  217. MaxRetries: 1,
  218. RateLimit: rate.NewLimiter(1000000, 10000000),
  219. Persister: p,
  220. })
  221. pool.Do(testBytes, testURL, testSendFn)
  222. time.Sleep(time.Microsecond * 500)
  223. <-pool.Errors()
  224. time.Sleep(time.Microsecond * 500)
  225. <-pool.Errors()
  226. time.Sleep(time.Microsecond * 500)
  227. <-pool.Errors()
  228. time.Sleep(time.Microsecond * 500)
  229. if p.id1State != undeliverable {
  230. t.Fatalf("want: %s, got %s", undeliverable, p.id1State)
  231. }
  232. }
  233. func TestRestartRetrying(t *testing.T) {
  234. testSendFn := func(b []byte, u *url.URL) error {
  235. if diff := deep.Equal(b, testBytes); diff != nil {
  236. t.Fatal(diff)
  237. } else if u != testURL {
  238. t.Fatal("wrong testURL")
  239. }
  240. return fmt.Errorf("expected")
  241. }
  242. p := newMockDeliveryPersister(t)
  243. pool := NewDelivererPool(DeliveryOptions{
  244. InitialRetryTime: time.Microsecond,
  245. MaximumRetryTime: time.Microsecond,
  246. BackoffFactor: 2,
  247. MaxRetries: 1,
  248. RateLimit: rate.NewLimiter(1000000, 10000000),
  249. Persister: p,
  250. })
  251. pool.Restart(testBytes, testURL, id2, testSendFn)
  252. time.Sleep(time.Microsecond * 500)
  253. select {
  254. case <-pool.Errors():
  255. default:
  256. t.Fatal("expected error")
  257. }
  258. time.Sleep(time.Microsecond * 500)
  259. if p.id2State != retrying {
  260. t.Fatalf("want: %s, got %s", retrying, p.id2State)
  261. }
  262. }
  263. func TestRestartUndeliverable(t *testing.T) {
  264. testSendFn := func(b []byte, u *url.URL) error {
  265. if diff := deep.Equal(b, testBytes); diff != nil {
  266. t.Fatal(diff)
  267. } else if u != testURL {
  268. t.Fatal("wrong testURL")
  269. }
  270. return fmt.Errorf("expected")
  271. }
  272. p := newMockDeliveryPersister(t)
  273. pool := NewDelivererPool(DeliveryOptions{
  274. InitialRetryTime: time.Microsecond,
  275. MaximumRetryTime: time.Microsecond,
  276. BackoffFactor: 2,
  277. MaxRetries: 1,
  278. RateLimit: rate.NewLimiter(1000000, 10000000),
  279. Persister: p,
  280. })
  281. pool.Restart(testBytes, testURL, id2, testSendFn)
  282. time.Sleep(time.Microsecond * 500)
  283. <-pool.Errors()
  284. time.Sleep(time.Microsecond * 500)
  285. <-pool.Errors()
  286. time.Sleep(time.Microsecond * 500)
  287. <-pool.Errors()
  288. time.Sleep(time.Microsecond * 500)
  289. if p.id2State != undeliverable {
  290. t.Fatalf("want: %s, got %s", undeliverable, p.id2State)
  291. }
  292. }