logsRetry.test.ts 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. import { lastValueFrom, of, throwError } from 'rxjs';
  2. import { toArray } from 'rxjs/operators';
  3. import { dataFrameToJSON, MutableDataFrame } from '@grafana/data';
  4. import { DataResponse, FetchError } from '@grafana/runtime';
  5. import { StartQueryRequest } from '../types';
  6. import { runWithRetry } from './logsRetry';
  7. describe('runWithRetry', () => {
  8. const timeoutPass = () => false;
  9. const timeoutFail = () => true;
  10. it('returns results if no retry is needed', async () => {
  11. const queryFunc = jest.fn();
  12. queryFunc.mockReturnValueOnce(of([createResponseFrame('A')]));
  13. const targets = [targetA];
  14. const values = await lastValueFrom(runWithRetry(queryFunc, targets, timeoutPass).pipe(toArray()));
  15. expect(queryFunc).toBeCalledTimes(1);
  16. expect(queryFunc).toBeCalledWith(targets);
  17. expect(values).toEqual([{ frames: [createResponseFrame('A')] }]);
  18. });
  19. it('retries if error', async () => {
  20. jest.useFakeTimers();
  21. const targets = [targetA];
  22. const queryFunc = jest.fn();
  23. queryFunc.mockReturnValueOnce(throwError(() => createErrorResponse(targets)));
  24. queryFunc.mockReturnValueOnce(of([createResponseFrame('A')]));
  25. const valuesPromise = lastValueFrom(runWithRetry(queryFunc, targets, timeoutPass).pipe(toArray()));
  26. jest.runAllTimers();
  27. const values = await valuesPromise;
  28. expect(queryFunc).toBeCalledTimes(2);
  29. expect(queryFunc).nthCalledWith(1, targets);
  30. expect(queryFunc).nthCalledWith(2, targets);
  31. expect(values).toEqual([{ frames: [createResponseFrame('A')] }]);
  32. });
  33. it('fails if reaching timeout and no data was retrieved', async () => {
  34. jest.useFakeTimers();
  35. const targets = [targetA];
  36. const queryFunc = jest.fn();
  37. queryFunc.mockReturnValueOnce(throwError(() => createErrorResponse(targets)));
  38. queryFunc.mockReturnValueOnce(of([createResponseFrame('A')]));
  39. const valuesPromise = lastValueFrom(runWithRetry(queryFunc, targets, timeoutFail).pipe(toArray()));
  40. jest.runAllTimers();
  41. let error;
  42. try {
  43. await valuesPromise;
  44. } catch (e) {
  45. error = e;
  46. }
  47. expect(queryFunc).toBeCalledTimes(1);
  48. expect(queryFunc).nthCalledWith(1, targets);
  49. expect(error).toEqual({ message: 'LimitExceededException', refId: 'A' });
  50. });
  51. it('fails if we get unexpected error', async () => {
  52. jest.useFakeTimers();
  53. const targets = [targetA];
  54. const queryFunc = jest.fn();
  55. queryFunc.mockReturnValueOnce(throwError(() => 'random error'));
  56. const valuesPromise = lastValueFrom(runWithRetry(queryFunc, targets, timeoutPass).pipe(toArray()));
  57. jest.runAllTimers();
  58. let error;
  59. try {
  60. await valuesPromise;
  61. } catch (e) {
  62. error = e;
  63. }
  64. expect(queryFunc).toBeCalledTimes(1);
  65. expect(queryFunc).nthCalledWith(1, targets);
  66. expect(error).toEqual('random error');
  67. });
  68. it('works with multiple queries if there is no error', async () => {
  69. const targets = [targetA, targetB];
  70. const queryFunc = jest.fn();
  71. queryFunc.mockReturnValueOnce(of([createResponseFrame('A'), createResponseFrame('B')]));
  72. const values = await lastValueFrom(runWithRetry(queryFunc, targets, timeoutPass).pipe(toArray()));
  73. expect(queryFunc).toBeCalledTimes(1);
  74. expect(queryFunc).nthCalledWith(1, targets);
  75. expect(values).toEqual([{ frames: [createResponseFrame('A'), createResponseFrame('B')] }]);
  76. });
  77. it('works with multiple queries only one errors out', async () => {
  78. jest.useFakeTimers();
  79. const targets = [targetA, targetB];
  80. const queryFunc = jest.fn();
  81. queryFunc.mockReturnValueOnce(
  82. throwError(() =>
  83. createErrorResponse(targets, {
  84. A: { frames: [dataFrameToJSON(createResponseFrame('A'))] },
  85. B: { error: 'LimitExceededException' },
  86. })
  87. )
  88. );
  89. queryFunc.mockReturnValueOnce(of([createResponseFrame('B')]));
  90. const valuesPromise = lastValueFrom(runWithRetry(queryFunc, targets, timeoutPass).pipe(toArray()));
  91. jest.runAllTimers();
  92. const values = await valuesPromise;
  93. expect(queryFunc).toBeCalledTimes(2);
  94. expect(queryFunc).nthCalledWith(1, targets);
  95. expect(queryFunc).nthCalledWith(2, [targetB]);
  96. // Bit more involved because dataFrameToJSON and dataFrameFromJSON are not symmetrical and add some attributes to the
  97. // dataframe fields
  98. expect(values.length).toBe(1);
  99. expect(values[0].frames.length).toBe(2);
  100. expect(values[0].frames[0].fields[0].values.get(0)).toBe('A');
  101. expect(values[0].frames[1].fields[0].values.get(0)).toBe('B');
  102. });
  103. it('sends data and also error if only one query gets limit error', async () => {
  104. jest.useFakeTimers();
  105. const targets = [targetA, targetB];
  106. const queryFunc = jest.fn();
  107. queryFunc.mockReturnValueOnce(
  108. throwError(() =>
  109. createErrorResponse(targets, {
  110. A: { frames: [dataFrameToJSON(createResponseFrame('A'))] },
  111. B: { error: 'LimitExceededException' },
  112. })
  113. )
  114. );
  115. const valuesPromise = lastValueFrom(runWithRetry(queryFunc, targets, timeoutFail).pipe(toArray()));
  116. jest.runAllTimers();
  117. const values = await valuesPromise;
  118. expect(queryFunc).toBeCalledTimes(1);
  119. expect(queryFunc).nthCalledWith(1, targets);
  120. expect(values.length).toBe(1);
  121. expect(values[0].frames.length).toBe(1);
  122. expect(values[0].frames[0].fields[0].values.get(0)).toBe('A');
  123. expect(values[0].error).toEqual({ message: 'Some queries timed out: LimitExceededException' });
  124. });
  125. it('sends all collected successful responses on timeout', async () => {
  126. jest.useFakeTimers();
  127. const targets = [targetA, targetB, targetC];
  128. const queryFunc = jest.fn();
  129. queryFunc.mockReturnValueOnce(
  130. throwError(() =>
  131. createErrorResponse(targets, {
  132. A: { frames: [dataFrameToJSON(createResponseFrame('A'))] },
  133. B: { error: 'LimitExceededException' },
  134. C: { error: 'LimitExceededException' },
  135. })
  136. )
  137. );
  138. queryFunc.mockReturnValueOnce(
  139. throwError(() =>
  140. createErrorResponse(targets, {
  141. B: { frames: [dataFrameToJSON(createResponseFrame('B'))] },
  142. C: { error: 'LimitExceededException' },
  143. })
  144. )
  145. );
  146. queryFunc.mockReturnValueOnce(
  147. throwError(() =>
  148. createErrorResponse(targets, {
  149. C: { error: 'LimitExceededException' },
  150. })
  151. )
  152. );
  153. const valuesPromise = lastValueFrom(runWithRetry(queryFunc, targets, (retry) => retry >= 2).pipe(toArray()));
  154. jest.runAllTimers();
  155. const values = await valuesPromise;
  156. expect(queryFunc).toBeCalledTimes(3);
  157. expect(queryFunc).nthCalledWith(1, targets);
  158. expect(queryFunc).nthCalledWith(2, [targetB, targetC]);
  159. expect(queryFunc).nthCalledWith(3, [targetC]);
  160. expect(values.length).toBe(1);
  161. expect(values[0].frames.length).toBe(2);
  162. expect(values[0].frames[0].fields[0].values.get(0)).toBe('A');
  163. expect(values[0].frames[1].fields[0].values.get(0)).toBe('B');
  164. expect(values[0].error).toEqual({ message: 'Some queries timed out: LimitExceededException' });
  165. });
  166. });
  167. const targetA = makeTarget('A');
  168. const targetB = makeTarget('B');
  169. const targetC = makeTarget('C');
  170. function makeTarget(refId: string) {
  171. return { queryString: 'query ' + refId, refId, region: 'test' };
  172. }
  173. function createResponseFrame(ref: string) {
  174. return new MutableDataFrame({
  175. fields: [{ name: 'queryId', values: [ref] }],
  176. refId: ref,
  177. });
  178. }
  179. function createErrorResponse(targets: StartQueryRequest[], results?: Record<string, DataResponse>): FetchError {
  180. return {
  181. status: 400,
  182. data: {
  183. results: results || {
  184. A: {
  185. error: 'LimitExceededException',
  186. },
  187. },
  188. },
  189. config: {
  190. url: '',
  191. data: {
  192. queries: targets,
  193. },
  194. },
  195. };
  196. }