apkipa преди 2 седмици
родител
ревизия
1931e4acea
променени са 2 файла, в които са добавени 108 реда и са изтрити 31 реда
  1. 10 7
      include/tsdb_hf.h
  2. 98 24
      src/tsdb_hf.cpp

+ 10 - 7
include/tsdb_hf.h

@@ -43,15 +43,16 @@ struct tsdb_entry_stream {
 
     /// @brief Closes the stream. Will also commit the stream if it has not been committed yet.
     /// It is recommended to always call this method before dropping the object.
-    void close();
+    bool close();
 
-    void insert_point(double value);
-    void insert_points(const double* values, size_t count);
+    bool insert_point(double value);
+    bool insert_points(const double* values, size_t count);
 
 private:
     friend tsdb_entry;
 
     tsdb_entry_stream(std::unique_ptr<impl> impl);
+    bool ensure_open() const;
 
     std::unique_ptr<impl> impl_;
 };
@@ -74,14 +75,14 @@ struct tsdb_entry {
 
     /// @brief Closes the database. Will fail if there are open streams. It is recommended to always
     /// call this method before dropping the object.
-    void close();
+    bool close();
 
-    void set_data_dir(string_view data_dir);
+    bool set_data_dir(string_view data_dir);
 
     /// @brief Sets how many points are stored in a single part.
-    void set_part_size(size_t size);
+    bool set_part_size(size_t size);
 
-    void set_metric_name(string_view metric_name);
+    bool set_metric_name(string_view metric_name);
 
     /// @brief Creates a new stream of points. The stream will be automatically committed when
     /// the object is dropped.
@@ -92,6 +93,8 @@ struct tsdb_entry {
     tsdb_entry_stream new_stream(std::string name, uint64_t timestampOffset, uint64_t interval);
 
 private:
+    bool ensure_open() const;
+
     std::unique_ptr<impl> impl_;
 };
 

+ 98 - 24
src/tsdb_hf.cpp

@@ -15,6 +15,7 @@
 #include <string_view>
 
 #include "internal_shared.hpp"
+#include "shared.h"
 
 using namespace tsdb_preludes;
 
@@ -246,16 +247,18 @@ struct tsdb_entry_stream::impl {
     }
 
     // WARN: You must call this method exactly once before dropping the object.
-    void close() {
+    bool close() {
         // Flush the remaining data
         close_current_part();
 
         // Emit the metadata about the stream and the data parts
         rewrite_metadata();
+
+        return true;
     }
 
-    void insert_point(double value) { insert_points(&value, 1); }
-    void insert_points(const double* values, size_t count) {
+    bool insert_point(double value) { return insert_points(&value, 1); }
+    bool insert_points(const double* values, size_t count) {
         const auto count_copy = count;
         logger().debug("TSDB-hf stream inserting {} points, correlation ID: {}", count_copy, correlation_id_);
         while (count > 0) {
@@ -273,6 +276,8 @@ struct tsdb_entry_stream::impl {
             count -= to_write;
         }
         logger().debug("TSDB-hf stream inserted {} points, correlation ID: {}", count_copy, correlation_id_);
+
+        return true;
     }
 
     void err_log_env() {
@@ -370,16 +375,42 @@ tsdb_entry_stream::~tsdb_entry_stream() {
 }
 tsdb_entry_stream::tsdb_entry_stream(tsdb_entry_stream&& other) : impl_(std::move(other.impl_)) {
 }
-void tsdb_entry_stream::close() {
+bool tsdb_entry_stream::close() {
     if (auto impl = std::move(impl_)) {
-        ex_log_boundary([&] { impl->close(); }, [&] { impl->err_log_env(); });
+        return ex_log_boundary([&] { return impl->close(); },
+                               [&] {
+                                   impl->err_log_env();
+                                   return false;
+                               });
+    }
+    return true;
+}
+bool tsdb_entry_stream::insert_point(double value) {
+    if (!ensure_open()) {
+        return false;
     }
+    return ex_log_boundary([&] { return impl_->insert_point(value); },
+                           [&] {
+                               impl_->err_log_env();
+                               return false;
+                           });
 }
-void tsdb_entry_stream::insert_point(double value) {
-    ex_log_boundary([&] { impl_->insert_point(value); }, [&] { impl_->err_log_env(); });
+bool tsdb_entry_stream::insert_points(const double* values, size_t count) {
+    if (!ensure_open()) {
+        return false;
+    }
+    return ex_log_boundary([&] { return impl_->insert_points(values, count); },
+                           [&] {
+                               impl_->err_log_env();
+                               return false;
+                           });
 }
-void tsdb_entry_stream::insert_points(const double* values, size_t count) {
-    ex_log_boundary([&] { impl_->insert_points(values, count); }, [&] { impl_->err_log_env(); });
+bool tsdb_entry_stream::ensure_open() const {
+    if (!impl_) {
+        logger().error("TSDB-hf stream is not open");
+        return false;
+    }
+    return true;
 }
 
 struct tsdb_entry::impl {
@@ -393,31 +424,38 @@ struct tsdb_entry::impl {
     ~impl() { logger().debug("TSDB-hf entry closed, correlation ID: {}", correlation_id_); }
 
     // WARN: You must call this method exactly once before dropping the object.
-    void close() {
+    bool close() {
         if (open_streams_count_ > 0) {
-            // TODO: Maybe fail-fast?
-            // Fail-fast
-            throw std::runtime_error("Cannot close tsdb_entry while there are open streams");
+            // // Fail-fast
+            // throw std::runtime_error("Cannot close tsdb_entry while there are open streams");
+            logger().error("Cannot close tsdb_entry while there are open streams, correlation ID: {}", correlation_id_);
+            logger().error("!!! This is a BUG in the client code, please fix it !!!");
+            err_log_env();
+            return false;
         }
+        return true;
     }
 
-    void set_data_dir(string_view data_dir) {
+    bool set_data_dir(string_view data_dir) {
         // TODO: Maybe sanitize the path?
         logger().debug("TSDB-hf entry setting data dir to `{}`, correlation ID: {}", data_dir, correlation_id_);
         data_dir_ = data_dir;
+        return true;
     }
 
-    void set_part_size(size_t size) {
+    bool set_part_size(size_t size) {
         logger().debug("TSDB-hf entry setting part size to {}, correlation ID: {}", size, correlation_id_);
         if (size < 1) {
             throw std::invalid_argument("Part size must be positive");
         }
         part_size_ = size;
+        return true;
     }
 
-    void set_metric_name(string_view metric_name) {
+    bool set_metric_name(string_view metric_name) {
         logger().debug("TSDB-hf entry setting metric name to `{}`, correlation ID: {}", metric_name, correlation_id_);
         metric_name_ = metric_name;
+        return true;
     }
 
     tsdb_entry_stream new_stream(std::string name, uint64_t timestampOffset, uint64_t interval) {
@@ -470,26 +508,62 @@ tsdb_entry::~tsdb_entry() {
 }
 tsdb_entry::tsdb_entry(tsdb_entry&& other) : impl_(std::move(other.impl_)) {
 }
-void tsdb_entry::close() {
+bool tsdb_entry::close() {
     if (auto impl = std::move(impl_)) {
-        ex_log_boundary([&] { impl->close(); }, [&] { impl->err_log_env(); });
+        return ex_log_boundary([&] { return impl->close(); },
+                               [&] {
+                                   impl->err_log_env();
+                                   return false;
+                               });
     }
+    return true;
 }
-void tsdb_entry::set_data_dir(string_view data_dir) {
-    ex_log_boundary([&] { impl_->set_data_dir(data_dir); }, [&] { impl_->err_log_env(); });
+bool tsdb_entry::set_data_dir(string_view data_dir) {
+    if (!ensure_open()) {
+        return false;
+    }
+    return ex_log_boundary([&] { return impl_->set_data_dir(data_dir); },
+                           [&] {
+                               impl_->err_log_env();
+                               return false;
+                           });
 }
-void tsdb_entry::set_part_size(size_t size) {
-    ex_log_boundary([&] { impl_->set_part_size(size); }, [&] { impl_->err_log_env(); });
+bool tsdb_entry::set_part_size(size_t size) {
+    if (!ensure_open()) {
+        return false;
+    }
+    return ex_log_boundary([&] { return impl_->set_part_size(size); },
+                           [&] {
+                               impl_->err_log_env();
+                               return false;
+                           });
 }
-void tsdb_entry::set_metric_name(string_view metric_name) {
-    ex_log_boundary([&] { impl_->set_metric_name(metric_name); }, [&] { impl_->err_log_env(); });
+bool tsdb_entry::set_metric_name(string_view metric_name) {
+    if (!ensure_open()) {
+        return false;
+    }
+    return ex_log_boundary([&] { return impl_->set_metric_name(metric_name); },
+                           [&] {
+                               impl_->err_log_env();
+                               return false;
+                           });
 }
 tsdb_entry_stream tsdb_entry::new_stream(std::string name, uint64_t timestampOffset, uint64_t interval) {
+    if (!ensure_open()) {
+        return nullptr;
+    }
     return ex_log_boundary([&] { return impl_->new_stream(name, timestampOffset, interval); },
                            [&] {
                                impl_->err_log_env();
                                return nullptr;
                            });
 }
+bool tsdb_entry::ensure_open() const {
+    if (!impl_) {
+        logger().error("TSDB-hf entry is not open");
+        return false;
+    }
+    return true;
+}
 
 }  // namespace tsdb_hf_cpp