@@ -0,0 +1,260 @@
+package tsdb;
+import com.github.luben.zstd.EndDirective;
+import com.github.luben.zstd.ZstdCompressCtx;
+import com.google.gson.FieldNamingPolicy;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.annotations.SerializedName;
+import it.unimi.dsi.fastutil.doubles.AbstractDoubleList;
+import it.unimi.dsi.fastutil.doubles.DoubleListIterator;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.RandomAccess;
+import java.util.UUID;
+public class TsdbApiEntryStream implements AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(TsdbApiEntryStream.class);
+ private final StreamMetadata metadata;
+ private final String streamBaseDir;
+ private final ByteBuffer bufInput = ByteBuffer.allocateDirect(16 * 1024).order(ByteOrder.LITTLE_ENDIAN);
+ private final ByteBuffer bufOutput = ByteBuffer.allocateDirect(16 * 1024).order(ByteOrder.LITTLE_ENDIAN);
+ private long currentPartRemainingSize = 0;
+ private FileChannel currentPartFile = null;
+ private String currentPartPath;
+ private ZstdCompressCtx compressCtx = new ZstdCompressCtx();
+ private String correlationId = "";
+ TsdbApiEntryStream(TsdbApiEntry parent, String streamBaseDir, String name, long timestampOffset, long interval) {
+ metadata = new StreamMetadata(parent.getMetricName(), name, timestampOffset, interval, parent.getPartSize());
+ this.streamBaseDir = streamBaseDir;
+ correlationId = UUID.randomUUID().toString();
+ LOG.debug("Creating TsdbApiEntryStream from TsdbApiEntry {}, correlation ID = {}",
+ parent.getCorrelationId(), correlationId);
+ // Validate metadata
+ Util.ensureDir(streamBaseDir);
+ if (metadata.partSize <= 0) {
+ throw new IllegalArgumentException("Part size must be positive");
+ }
+ if (metadata.interval <= 0) {
+ throw new IllegalArgumentException("Interval must be positive");
+ }
+ if (metadata.timestampOffset < 0) {
+ throw new IllegalArgumentException("Timestamp offset must be non-negative");
+ }
+ if (StringUtils.isEmpty(metadata.name)) {
+ throw new IllegalArgumentException("Name must not be empty");
+ }
+ // No need to check metric name: it is allowed to be null
+ // Emit metadata
+ rewriteMetadata();
+ }
+ @Override
+ public void close() {
+ if (isClosed()) {
+ return;
+ }
+ LOG.info("Closing TsdbApiEntryStream, correlation id = {}", correlationId);
+ // WARN: If an exception is thrown, data will definitely be lost, because the stream goes to a closed state
+ // and is irreversible.
+ try (ZstdCompressCtx ctx = compressCtx) {
+ closeCurrentPart();
+ rewriteMetadata();
+ } catch (Exception e) {
+ LOG.error("Error while closing TsdbApiEntryStream", e);
+ throw e;
+ } finally {
+ compressCtx = null;
+ }
+ }
+ public void insertPoint(double value) {
+ checkClosed();
+ LOG.debug("Inserting 1 point, correlation ID = {}", correlationId);
+ insertPointInner(value);
+ }
+ public void insertPoints(AbstractDoubleList values) {
+ checkClosed();
+ LOG.debug("Inserting {} points, correlation ID = {}", values.size(), correlationId);
+ if (values instanceof RandomAccess) {
+ // Fast path
+ for (int i = 0; i < values.size(); i++) {
+ insertPointInner(values.getDouble(i));
+ }
+ } else {
+ // Slow path
+ DoubleListIterator it = values.iterator();
+ while (it.hasNext()) {
+ insertPointInner(it.nextDouble());
+ }
+ }
+ }
+ private void insertPointInner(double value) {
+ if (currentPartRemainingSize <= 0) {
+ createNewPart();
+ }
+ // Write the value to the buffer
+ // NOTE: We cache writes eagerly to avoid unnecessary JNI overhead
+ if (bufInput.remaining() < 8) {
+ // Not enough space in the buffer, flush now
+ flushIoBuffer(EndDirective.CONTINUE);
+ }
+ bufInput.putDouble(value);
+ metadata.currentPointsCount++;
+ currentPartRemainingSize--;
+ }
+ private void createNewPart() {
+ closeCurrentPart();
+ String partName = String.format("part_%09d.zst.part", metadata.currentPartIndex);
+ String partPath = Util.pathCombine(streamBaseDir, partName);
+ LOG.debug("Opening new part {}, correlation ID = {}", partPath, correlationId);
+ try {
+// File f = new File(partPath);
+// if (f.exists()) {
+// throw new IllegalStateException("Part file already exists: " + partPath);
+// }
+// currentPartFile = FileChannel.open(f.toPath(), StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
+ currentPartFile = FileChannel.open(Paths.get(partPath), StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
+ currentPartPath = partPath;
+ currentPartRemainingSize = metadata.partSize;
+ metadata.currentPartIndex++;
+ } catch (Exception e) {
+ LOG.error("Error while creating new part, correlation ID = {}", correlationId, e);
+ throw new RuntimeException("Error while creating new part", e);
+ }
+ }
+ private void closeCurrentPart() {
+ if (currentPartFile != null) {
+ LOG.debug("Closing current part, correlation ID = {}", correlationId);
+ try {
+ try (FileChannel fc = currentPartFile) {
+ // Flush the remaining data
+ flushIoBuffer(EndDirective.END);
+ } finally {
+ currentPartFile = null;
+ }
+ Util.renameFileToExtension(currentPartPath, "");
+ } catch (Exception e) {
+ LOG.error("Error while closing current part, correlation ID = {}", correlationId, e);
+ throw new RuntimeException("Error while closing current part", e);
+ }
+ }
+ }
+ private void flushIoBuffer(EndDirective directive) {
+ // Uncompressed data goes to bufInput, compressed data goes to bufOutput.
+ // Data is compressed and written to currentPartStream when it is full.
+ if (currentPartFile == null) {
+ throw new IllegalStateException("Current part is not open");
+ }
+// if (bufInput.position() == 0) {
+// return;
+// }
+ // Compress the data
+ bufInput.flip();
+ bufOutput.clear();
+ if (directive == EndDirective.END) {
+ while (!compressCtx.compressDirectByteBufferStream(bufOutput, bufInput, directive)) {
+ // Write compressed data to the file
+ flushOutputBufToFile();
+ }
+ } else {
+ // Return value of `compressDirectByteBufferStream` is not interesting here
+ // It is OK to return as soon as we've drained the input buffer
+ while (bufInput.hasRemaining()) {
+ compressCtx.compressDirectByteBufferStream(bufOutput, bufInput, directive);
+ flushOutputBufToFile();
+ }
+ }
+ bufInput.clear();
+ }
+ private void flushOutputBufToFile() {
+ bufOutput.flip();
+ try {
+ Util.writeFileExact(currentPartFile, bufOutput);
+ } catch (Exception e) {
+ LOG.error("Error while writing compressed data, correlation ID = {}", correlationId, e);
+ throw e;
+ }
+ bufOutput.clear();
+ }
+ private void rewriteMetadata() {
+ LOG.debug("Rewriting metadata, correlation ID = {}", correlationId);
+ String metadataPath = Util.pathCombine(streamBaseDir, "metadata.json.part");
+ try {
+ try (FileChannel fc = FileChannel.open(Paths.get(metadataPath), StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE)) {
+ // Write metadata
+ String json = StreamMetadata.GSON.toJson(metadata);
+ ByteBuffer buf = ByteBuffer.wrap(json.getBytes(StandardCharsets.UTF_8));
+ Util.writeFileExact(fc, buf);
+ }
+ Util.renameFileToExtension(metadataPath, "");
+ } catch (Exception e) {
+ LOG.error("Error while rewriting metadata, correlation ID = {}", correlationId, e);
+ throw new RuntimeException("Error while rewriting metadata", e);
+ }
+ }
+ private boolean isClosed() {
+ return compressCtx == null;
+ }
+ private void checkClosed() {
+ if (isClosed()) {
+ throw new IllegalStateException("Stream is closed");
+ }
+ }
+ private static class StreamMetadata {
+ public final String metricName;
+ public final String name;
+ public final long timestampOffset;
+ public final long interval;
+ @SerializedName("points_per_part")
+ public final long partSize;
+ @SerializedName("parts_count")
+ public long currentPartIndex = 0;
+ @SerializedName("total_points")
+ public long currentPointsCount = 0;
+ public static final Gson GSON = new GsonBuilder()
+ .setFieldNamingPolicy(FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES)
+ .create();
+ public StreamMetadata(String metricName, String name, long timestampOffset, long interval, long partSize) {
+ this.metricName = metricName;
+ this.name = name;
+ this.timestampOffset = timestampOffset;
+ this.interval = interval;
+ this.partSize = partSize;
+ }
+ }