MixedDataSource.ts 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. import { cloneDeep, groupBy } from 'lodash';
  2. import { forkJoin, from, Observable, of, OperatorFunction } from 'rxjs';
  3. import { catchError, map, mergeAll, mergeMap, reduce, toArray } from 'rxjs/operators';
  4. import {
  5. DataQuery,
  6. DataQueryRequest,
  7. DataQueryResponse,
  8. DataSourceApi,
  9. DataSourceInstanceSettings,
  10. LoadingState,
  11. } from '@grafana/data';
  12. import { getDataSourceSrv, toDataQueryError } from '@grafana/runtime';
  13. export const MIXED_DATASOURCE_NAME = '-- Mixed --';
  14. export interface BatchedQueries {
  15. datasource: Promise<DataSourceApi>;
  16. targets: DataQuery[];
  17. }
  18. export class MixedDatasource extends DataSourceApi<DataQuery> {
  19. constructor(instanceSettings: DataSourceInstanceSettings) {
  20. super(instanceSettings);
  21. }
  22. query(request: DataQueryRequest<DataQuery>): Observable<DataQueryResponse> {
  23. // Remove any invalid queries
  24. const queries = request.targets.filter((t) => {
  25. return t.datasource?.uid !== MIXED_DATASOURCE_NAME;
  26. });
  27. if (!queries.length) {
  28. return of({ data: [] } as DataQueryResponse); // nothing
  29. }
  30. // Build groups of queries to run in parallel
  31. const sets: { [key: string]: DataQuery[] } = groupBy(queries, 'datasource.uid');
  32. const mixed: BatchedQueries[] = [];
  33. for (const key in sets) {
  34. const targets = sets[key];
  35. mixed.push({
  36. datasource: getDataSourceSrv().get(targets[0].datasource, request.scopedVars),
  37. targets,
  38. });
  39. }
  40. // Missing UIDs?
  41. if (!mixed.length) {
  42. return of({ data: [] } as DataQueryResponse); // nothing
  43. }
  44. return this.batchQueries(mixed, request);
  45. }
  46. batchQueries(mixed: BatchedQueries[], request: DataQueryRequest<DataQuery>): Observable<DataQueryResponse> {
  47. const runningQueries = mixed.filter(this.isQueryable).map((query, i) =>
  48. from(query.datasource).pipe(
  49. mergeMap((api: DataSourceApi) => {
  50. const dsRequest = cloneDeep(request);
  51. dsRequest.requestId = `mixed-${i}-${dsRequest.requestId || ''}`;
  52. dsRequest.targets = query.targets;
  53. return from(api.query(dsRequest)).pipe(
  54. map((response) => {
  55. return {
  56. ...response,
  57. data: response.data || [],
  58. state: LoadingState.Loading,
  59. key: `mixed-${i}-${response.key || ''}`,
  60. } as DataQueryResponse;
  61. }),
  62. toArray(),
  63. catchError((err) => {
  64. err = toDataQueryError(err);
  65. err.message = `${api.name}: ${err.message}`;
  66. return of<DataQueryResponse[]>([
  67. {
  68. data: [],
  69. state: LoadingState.Error,
  70. error: err,
  71. key: `mixed-${i}-${dsRequest.requestId || ''}`,
  72. },
  73. ]);
  74. })
  75. );
  76. })
  77. )
  78. );
  79. return forkJoin(runningQueries).pipe(flattenResponses(), map(this.finalizeResponses), mergeAll());
  80. }
  81. testDatasource() {
  82. return Promise.resolve({});
  83. }
  84. private isQueryable(query: BatchedQueries): boolean {
  85. return query && Array.isArray(query.targets) && query.targets.length > 0;
  86. }
  87. private finalizeResponses(responses: DataQueryResponse[]): DataQueryResponse[] {
  88. const { length } = responses;
  89. if (length === 0) {
  90. return responses;
  91. }
  92. const error = responses.find((response) => response.state === LoadingState.Error);
  93. if (error) {
  94. responses.push(error); // adds the first found error entry so error shows up in the panel
  95. } else {
  96. responses[length - 1].state = LoadingState.Done;
  97. }
  98. return responses;
  99. }
  100. }
  101. function flattenResponses(): OperatorFunction<DataQueryResponse[][], DataQueryResponse[]> {
  102. return reduce((all: DataQueryResponse[], current) => {
  103. return current.reduce((innerAll, innerCurrent) => {
  104. innerAll.push.apply(innerAll, innerCurrent);
  105. return innerAll;
  106. }, all);
  107. }, []);
  108. }