runStreams.ts 6.6 KB


  1. import { defaults } from 'lodash';
  2. import { Observable } from 'rxjs';
  3. import {
  4. DataQueryRequest,
  5. DataQueryResponse,
  6. FieldType,
  7. CircularDataFrame,
  8. CSVReader,
  9. Field,
  10. LoadingState,
  11. DataFrameSchema,
  12. DataFrameData,
  13. } from '@grafana/data';
  14. import { liveTimer } from 'app/features/dashboard/dashgrid/liveTimer';
  15. import { StreamingDataFrame } from 'app/features/live/data/StreamingDataFrame';
  16. import { getRandomLine } from './LogIpsum';
  17. import { TestDataQuery, StreamingQuery } from './types';
  18. export const defaultStreamQuery: StreamingQuery = {
  19. type: 'signal',
  20. speed: 250, // ms
  21. spread: 3.5,
  22. noise: 2.2,
  23. bands: 1,
  24. };
  25. export function runStream(target: TestDataQuery, req: DataQueryRequest<TestDataQuery>): Observable<DataQueryResponse> {
  26. const query = defaults(target.stream, defaultStreamQuery);
  27. if ('signal' === query.type) {
  28. return runSignalStream(target, query, req);
  29. }
  30. if ('logs' === query.type) {
  31. return runLogsStream(target, query, req);
  32. }
  33. if ('fetch' === query.type) {
  34. return runFetchStream(target, query, req);
  35. }
  36. throw new Error(`Unknown Stream Type: ${query.type}`);
  37. }
  38. export function runSignalStream(
  39. target: TestDataQuery,
  40. query: StreamingQuery,
  41. req: DataQueryRequest<TestDataQuery>
  42. ): Observable<DataQueryResponse> {
  43. return new Observable<DataQueryResponse>((subscriber) => {
  44. const streamId = `signal-${req.panelId}-${target.refId}`;
  45. const maxDataPoints = req.maxDataPoints || 1000;
  46. const schema: DataFrameSchema = {
  47. refId: target.refId,
  48. fields: [
  49. { name: 'time', type: FieldType.time },
  50. { name: target.alias ?? 'value', type: FieldType.number },
  51. ],
  52. };
  53. const { spread, speed, bands = 0, noise } = query;
  54. for (let i = 0; i < bands; i++) {
  55. const suffix = bands > 1 ? ` ${i + 1}` : '';
  56. schema.fields.push({ name: 'Min' + suffix, type: FieldType.number });
  57. schema.fields.push({ name: 'Max' + suffix, type: FieldType.number });
  58. }
  59. const frame = StreamingDataFrame.fromDataFrameJSON({ schema }, { maxLength: maxDataPoints });
  60. let value = Math.random() * 100;
  61. let timeoutId: any = null;
  62. let lastSent = -1;
  63. const addNextRow = (time: number) => {
  64. value += (Math.random() - 0.5) * spread;
  65. const data: DataFrameData = {
  66. values: [[time], [value]],
  67. };
  68. let min = value;
  69. let max = value;
  70. for (let i = 0; i < bands; i++) {
  71. min = min - Math.random() * noise;
  72. max = max + Math.random() * noise;
  73. data.values.push([min]);
  74. data.values.push([max]);
  75. }
  76. const event = { data };
  77. return frame.push(event);
  78. };
  79. // Fill the buffer on init
  80. if (true) {
  81. let time = Date.now() - maxDataPoints * speed;
  82. for (let i = 0; i < maxDataPoints; i++) {
  83. addNextRow(time);
  84. time += speed;
  85. }
  86. }
  87. const pushNextEvent = () => {
  88. addNextRow(Date.now());
  89. const elapsed = liveTimer.lastUpdate - lastSent;
  90. if (elapsed > 1000 || liveTimer.ok) {
  91. subscriber.next({
  92. data: [frame],
  93. key: streamId,
  94. state: LoadingState.Streaming,
  95. });
  96. lastSent = liveTimer.lastUpdate;
  97. }
  98. timeoutId = setTimeout(pushNextEvent, speed);
  99. };
  100. // Send first event in 5ms
  101. setTimeout(pushNextEvent, 5);
  102. return () => {
  103. console.log('unsubscribing to stream ' + streamId);
  104. clearTimeout(timeoutId);
  105. };
  106. });
  107. }
  108. export function runLogsStream(
  109. target: TestDataQuery,
  110. query: StreamingQuery,
  111. req: DataQueryRequest<TestDataQuery>
  112. ): Observable<DataQueryResponse> {
  113. return new Observable<DataQueryResponse>((subscriber) => {
  114. const streamId = `logs-${req.panelId}-${target.refId}`;
  115. const maxDataPoints = req.maxDataPoints || 1000;
  116. const data = new CircularDataFrame({
  117. append: 'tail',
  118. capacity: maxDataPoints,
  119. });
  120. data.refId = target.refId;
  121. data.name = target.alias || 'Logs ' + target.refId;
  122. data.addField({ name: 'line', type: FieldType.string });
  123. data.addField({ name: 'time', type: FieldType.time });
  124. data.meta = { preferredVisualisationType: 'logs' };
  125. const { speed } = query;
  126. let timeoutId: any = null;
  127. const pushNextEvent = () => {
  128. data.fields[0].values.add(Date.now());
  129. data.fields[1].values.add(getRandomLine());
  130. subscriber.next({
  131. data: [data],
  132. key: streamId,
  133. });
  134. timeoutId = setTimeout(pushNextEvent, speed);
  135. };
  136. // Send first event in 5ms
  137. setTimeout(pushNextEvent, 5);
  138. return () => {
  139. console.log('unsubscribing to stream ' + streamId);
  140. clearTimeout(timeoutId);
  141. };
  142. });
  143. }
  144. export function runFetchStream(
  145. target: TestDataQuery,
  146. query: StreamingQuery,
  147. req: DataQueryRequest<TestDataQuery>
  148. ): Observable<DataQueryResponse> {
  149. return new Observable<DataQueryResponse>((subscriber) => {
  150. const streamId = `fetch-${req.panelId}-${target.refId}`;
  151. const maxDataPoints = req.maxDataPoints || 1000;
  152. let data = new CircularDataFrame({
  153. append: 'tail',
  154. capacity: maxDataPoints,
  155. });
  156. data.refId = target.refId;
  157. data.name = target.alias || 'Fetch ' + target.refId;
  158. let reader: ReadableStreamReader<Uint8Array>;
  159. const csv = new CSVReader({
  160. callback: {
  161. onHeader: (fields: Field[]) => {
  162. // Clear any existing fields
  163. if (data.fields.length) {
  164. data = new CircularDataFrame({
  165. append: 'tail',
  166. capacity: maxDataPoints,
  167. });
  168. data.refId = target.refId;
  169. data.name = 'Fetch ' + target.refId;
  170. }
  171. for (const field of fields) {
  172. data.addField(field);
  173. }
  174. },
  175. onRow: (row: any[]) => {
  176. data.add(row);
  177. },
  178. },
  179. });
  180. const processChunk = (value: ReadableStreamDefaultReadResult<Uint8Array>): any => {
  181. if (value.value) {
  182. const text = new TextDecoder().decode(value.value);
  183. csv.readCSV(text);
  184. }
  185. subscriber.next({
  186. data: [data],
  187. key: streamId,
  188. state: value.done ? LoadingState.Done : LoadingState.Streaming,
  189. });
  190. if (value.done) {
  191. console.log('Finished stream');
  192. subscriber.complete(); // necessary?
  193. return;
  194. }
  195. return reader.read().then(processChunk);
  196. };
  197. if (!query.url) {
  198. throw new Error('query.url is not defined');
  199. }
  200. fetch(new Request(query.url)).then((response) => {
  201. if (response.body) {
  202. reader = response.body.getReader();
  203. reader.read().then(processChunk);
  204. }
  205. });
  206. return () => {
  207. // Cancel fetch?
  208. console.log('unsubscribing to stream ' + streamId);
  209. };
  210. });
  211. }