logsRetry.ts 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. import { Observable, Subscription } from 'rxjs';
  2. import { DataFrame, DataFrameJSON, DataQueryError } from '@grafana/data';
  3. import { FetchError, toDataQueryResponse } from '@grafana/runtime';
  4. import { StartQueryRequest } from '../types';
  5. type Result = { frames: DataFrameJSON[]; error?: string };
  6. /**
  7. * A retry strategy specifically for cloud watch logs query. Cloud watch logs queries need first starting the query
  8. * and the polling for the results. The start query can fail because of the concurrent queries rate limit,
  9. * and so we have to retry the start query call if there is already lot of queries running.
  10. *
  11. * As we send multiple queries in a single request some can fail and some can succeed and we have to also handle those
  12. * cases by only retrying the failed queries. We retry the failed queries until we hit the time limit or all queries
  13. * succeed and only then we pass the data forward. This means we wait longer but makes the code a bit simpler as we
  14. * can treat starting the query and polling as steps in a pipeline.
  15. * @param queryFun
  16. * @param targets
  17. * @param options
  18. */
  19. export function runWithRetry(
  20. queryFun: (targets: StartQueryRequest[]) => Observable<DataFrame[]>,
  21. targets: StartQueryRequest[],
  22. timeoutFunc: (retry: number, startTime: number) => boolean
  23. ): Observable<{ frames: DataFrame[]; error?: DataQueryError }> {
  24. const startTime = new Date();
  25. let retries = 0;
  26. let timerID: any;
  27. let subscription: Subscription;
  28. let collected = {};
  29. const retryWaitFunction = (retry: number) => {
  30. return Math.pow(2, retry) * 1000 + Math.random() * 100;
  31. };
  32. return new Observable((observer) => {
  33. // Run function is where the logic takes place. We have it in a function so we can call it recursively.
  34. function run(currentQueryParams: StartQueryRequest[]) {
  35. subscription = queryFun(currentQueryParams).subscribe({
  36. next(frames) {
  37. // In case we successfully finished, merge the current response with whatever we already collected.
  38. const collectedPreviously = toDataQueryResponse({ data: { results: collected } }).data || [];
  39. observer.next({ frames: [...collectedPreviously, ...frames] });
  40. observer.complete();
  41. },
  42. error(error: FetchError<{ results?: Record<string, Result> }> | string) {
  43. // In case of error we first try to figure out what kind of error it is
  44. // This means it was a generic 500 error probably so we just pass it on
  45. if (typeof error === 'string') {
  46. observer.error(error);
  47. return;
  48. }
  49. // In case of multiple queries this some can error while some may be ok
  50. const errorData = splitErrorData(error);
  51. if (!errorData) {
  52. // Not sure what happened but the error structure wasn't what we expected
  53. observer.error(error);
  54. return;
  55. }
  56. if (!errorData!.errors.length) {
  57. // So there is no limit error but some other errors so nothing to retry so we just pass it as it would be
  58. // otherwise.
  59. observer.error(error);
  60. return;
  61. }
  62. if (timeoutFunc(retries, startTime.valueOf())) {
  63. // We timed out but we could have started some queries
  64. if (Object.keys(collected).length || Object.keys(errorData.good).length) {
  65. const dataResponse = toDataQueryResponse({
  66. data: {
  67. results: {
  68. ...(errorData.good ?? {}),
  69. ...(collected ?? {}),
  70. },
  71. },
  72. });
  73. dataResponse.error = {
  74. ...(dataResponse.error ?? {}),
  75. message: `Some queries timed out: ${errorData.errorMessage}`,
  76. };
  77. // So we consider this a partial success and pass the data forward but also with error to be shown to
  78. // the user.
  79. observer.next({
  80. error: dataResponse.error,
  81. frames: dataResponse.data,
  82. });
  83. observer.complete();
  84. } else {
  85. // So we timed out and there was no data to pass forward so we just pass the error
  86. const dataResponse = toDataQueryResponse({ data: { results: error.data?.results ?? {} } });
  87. observer.error(dataResponse.error);
  88. }
  89. return;
  90. }
  91. collected = {
  92. ...collected,
  93. ...errorData!.good,
  94. };
  95. timerID = setTimeout(
  96. () => {
  97. retries++;
  98. run(errorData!.errors);
  99. },
  100. // We want to know how long to wait for the next retry. First time this will be 0.
  101. retryWaitFunction(retries + 1)
  102. );
  103. },
  104. });
  105. }
  106. run(targets);
  107. return () => {
  108. // We clear only the latest timer and subscription but the observable should complete after one response so
  109. // there should not be more things running at the same time.
  110. clearTimeout(timerID);
  111. subscription.unsubscribe();
  112. };
  113. });
  114. }
  115. function splitErrorData(error: FetchError<{ results?: Record<string, Result> }>) {
  116. const results = error.data?.results;
  117. if (!results) {
  118. return undefined;
  119. }
  120. return Object.keys(results).reduce<{
  121. errors: StartQueryRequest[];
  122. good: Record<string, Result>;
  123. errorMessage: string;
  124. }>(
  125. (acc, refId) => {
  126. if (results[refId].error?.startsWith('LimitExceededException')) {
  127. acc.errorMessage = results[refId].error!;
  128. acc.errors.push(error.config.data.queries.find((q: any) => q.refId === refId));
  129. } else {
  130. acc.good[refId] = results[refId];
  131. }
  132. return acc;
  133. },
  134. { errors: [], good: {}, errorMessage: '' }
  135. );
  136. }