123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228 |
- import { lastValueFrom, of, throwError } from 'rxjs';
- import { toArray } from 'rxjs/operators';
- import { dataFrameToJSON, MutableDataFrame } from '@grafana/data';
- import { DataResponse, FetchError } from '@grafana/runtime';
- import { StartQueryRequest } from '../types';
- import { runWithRetry } from './logsRetry';
- describe('runWithRetry', () => {
- const timeoutPass = () => false;
- const timeoutFail = () => true;
- it('returns results if no retry is needed', async () => {
- const queryFunc = jest.fn();
- queryFunc.mockReturnValueOnce(of([createResponseFrame('A')]));
- const targets = [targetA];
- const values = await lastValueFrom(runWithRetry(queryFunc, targets, timeoutPass).pipe(toArray()));
- expect(queryFunc).toBeCalledTimes(1);
- expect(queryFunc).toBeCalledWith(targets);
- expect(values).toEqual([{ frames: [createResponseFrame('A')] }]);
- });
- it('retries if error', async () => {
- jest.useFakeTimers();
- const targets = [targetA];
- const queryFunc = jest.fn();
- queryFunc.mockReturnValueOnce(throwError(() => createErrorResponse(targets)));
- queryFunc.mockReturnValueOnce(of([createResponseFrame('A')]));
- const valuesPromise = lastValueFrom(runWithRetry(queryFunc, targets, timeoutPass).pipe(toArray()));
- jest.runAllTimers();
- const values = await valuesPromise;
- expect(queryFunc).toBeCalledTimes(2);
- expect(queryFunc).nthCalledWith(1, targets);
- expect(queryFunc).nthCalledWith(2, targets);
- expect(values).toEqual([{ frames: [createResponseFrame('A')] }]);
- });
- it('fails if reaching timeout and no data was retrieved', async () => {
- jest.useFakeTimers();
- const targets = [targetA];
- const queryFunc = jest.fn();
- queryFunc.mockReturnValueOnce(throwError(() => createErrorResponse(targets)));
- queryFunc.mockReturnValueOnce(of([createResponseFrame('A')]));
- const valuesPromise = lastValueFrom(runWithRetry(queryFunc, targets, timeoutFail).pipe(toArray()));
- jest.runAllTimers();
- let error;
- try {
- await valuesPromise;
- } catch (e) {
- error = e;
- }
- expect(queryFunc).toBeCalledTimes(1);
- expect(queryFunc).nthCalledWith(1, targets);
- expect(error).toEqual({ message: 'LimitExceededException', refId: 'A' });
- });
- it('fails if we get unexpected error', async () => {
- jest.useFakeTimers();
- const targets = [targetA];
- const queryFunc = jest.fn();
- queryFunc.mockReturnValueOnce(throwError(() => 'random error'));
- const valuesPromise = lastValueFrom(runWithRetry(queryFunc, targets, timeoutPass).pipe(toArray()));
- jest.runAllTimers();
- let error;
- try {
- await valuesPromise;
- } catch (e) {
- error = e;
- }
- expect(queryFunc).toBeCalledTimes(1);
- expect(queryFunc).nthCalledWith(1, targets);
- expect(error).toEqual('random error');
- });
- it('works with multiple queries if there is no error', async () => {
- const targets = [targetA, targetB];
- const queryFunc = jest.fn();
- queryFunc.mockReturnValueOnce(of([createResponseFrame('A'), createResponseFrame('B')]));
- const values = await lastValueFrom(runWithRetry(queryFunc, targets, timeoutPass).pipe(toArray()));
- expect(queryFunc).toBeCalledTimes(1);
- expect(queryFunc).nthCalledWith(1, targets);
- expect(values).toEqual([{ frames: [createResponseFrame('A'), createResponseFrame('B')] }]);
- });
- it('works with multiple queries only one errors out', async () => {
- jest.useFakeTimers();
- const targets = [targetA, targetB];
- const queryFunc = jest.fn();
- queryFunc.mockReturnValueOnce(
- throwError(() =>
- createErrorResponse(targets, {
- A: { frames: [dataFrameToJSON(createResponseFrame('A'))] },
- B: { error: 'LimitExceededException' },
- })
- )
- );
- queryFunc.mockReturnValueOnce(of([createResponseFrame('B')]));
- const valuesPromise = lastValueFrom(runWithRetry(queryFunc, targets, timeoutPass).pipe(toArray()));
- jest.runAllTimers();
- const values = await valuesPromise;
- expect(queryFunc).toBeCalledTimes(2);
- expect(queryFunc).nthCalledWith(1, targets);
- expect(queryFunc).nthCalledWith(2, [targetB]);
- // Bit more involved because dataFrameToJSON and dataFrameFromJSON are not symmetrical and add some attributes to the
- // dataframe fields
- expect(values.length).toBe(1);
- expect(values[0].frames.length).toBe(2);
- expect(values[0].frames[0].fields[0].values.get(0)).toBe('A');
- expect(values[0].frames[1].fields[0].values.get(0)).toBe('B');
- });
- it('sends data and also error if only one query gets limit error', async () => {
- jest.useFakeTimers();
- const targets = [targetA, targetB];
- const queryFunc = jest.fn();
- queryFunc.mockReturnValueOnce(
- throwError(() =>
- createErrorResponse(targets, {
- A: { frames: [dataFrameToJSON(createResponseFrame('A'))] },
- B: { error: 'LimitExceededException' },
- })
- )
- );
- const valuesPromise = lastValueFrom(runWithRetry(queryFunc, targets, timeoutFail).pipe(toArray()));
- jest.runAllTimers();
- const values = await valuesPromise;
- expect(queryFunc).toBeCalledTimes(1);
- expect(queryFunc).nthCalledWith(1, targets);
- expect(values.length).toBe(1);
- expect(values[0].frames.length).toBe(1);
- expect(values[0].frames[0].fields[0].values.get(0)).toBe('A');
- expect(values[0].error).toEqual({ message: 'Some queries timed out: LimitExceededException' });
- });
- it('sends all collected successful responses on timeout', async () => {
- jest.useFakeTimers();
- const targets = [targetA, targetB, targetC];
- const queryFunc = jest.fn();
- queryFunc.mockReturnValueOnce(
- throwError(() =>
- createErrorResponse(targets, {
- A: { frames: [dataFrameToJSON(createResponseFrame('A'))] },
- B: { error: 'LimitExceededException' },
- C: { error: 'LimitExceededException' },
- })
- )
- );
- queryFunc.mockReturnValueOnce(
- throwError(() =>
- createErrorResponse(targets, {
- B: { frames: [dataFrameToJSON(createResponseFrame('B'))] },
- C: { error: 'LimitExceededException' },
- })
- )
- );
- queryFunc.mockReturnValueOnce(
- throwError(() =>
- createErrorResponse(targets, {
- C: { error: 'LimitExceededException' },
- })
- )
- );
- const valuesPromise = lastValueFrom(runWithRetry(queryFunc, targets, (retry) => retry >= 2).pipe(toArray()));
- jest.runAllTimers();
- const values = await valuesPromise;
- expect(queryFunc).toBeCalledTimes(3);
- expect(queryFunc).nthCalledWith(1, targets);
- expect(queryFunc).nthCalledWith(2, [targetB, targetC]);
- expect(queryFunc).nthCalledWith(3, [targetC]);
- expect(values.length).toBe(1);
- expect(values[0].frames.length).toBe(2);
- expect(values[0].frames[0].fields[0].values.get(0)).toBe('A');
- expect(values[0].frames[1].fields[0].values.get(0)).toBe('B');
- expect(values[0].error).toEqual({ message: 'Some queries timed out: LimitExceededException' });
- });
- });
- const targetA = makeTarget('A');
- const targetB = makeTarget('B');
- const targetC = makeTarget('C');
- function makeTarget(refId: string) {
- return { queryString: 'query ' + refId, refId, region: 'test' };
- }
- function createResponseFrame(ref: string) {
- return new MutableDataFrame({
- fields: [{ name: 'queryId', values: [ref] }],
- refId: ref,
- });
- }
- function createErrorResponse(targets: StartQueryRequest[], results?: Record<string, DataResponse>): FetchError {
- return {
- status: 400,
- data: {
- results: results || {
- A: {
- error: 'LimitExceededException',
- },
- },
- },
- config: {
- url: '',
- data: {
- queries: targets,
- },
- },
- };
- }
|