FutureYu 1 year ago
parent
commit
f7687df913
2 changed files with 69 additions and 0 deletions
  1. 6 0
      insert-app/pom.xml
  2. 63 0
      insert-app/src/main/java/db/aeron/BasicSubscriber.java

+ 6 - 0
insert-app/pom.xml

@@ -62,6 +62,12 @@
             <artifactId>netty-transport-native-epoll</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>io.aeron</groupId>
+            <artifactId>aeron-all</artifactId>
+            <version>1.40.0</version>
+        </dependency>
+
     </dependencies>
 
     <build>

+ 63 - 0
insert-app/src/main/java/db/aeron/BasicSubscriber.java

@@ -0,0 +1,63 @@
+package db.aeron;
+
+
+import io.aeron.Aeron;
+import io.aeron.Subscription;
+import io.aeron.driver.MediaDriver;
+import io.aeron.logbuffer.FragmentHandler;
+import io.aeron.samples.SampleConfiguration;
+import io.aeron.samples.SamplesUtil;
+import org.agrona.CloseHelper;
+import org.agrona.concurrent.SigInt;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+
+public class BasicSubscriber
+{
+    private static final int STREAM_ID = SampleConfiguration.STREAM_ID;
+    private static final String CHANNEL = SampleConfiguration.CHANNEL;
+    private static final int FRAGMENT_COUNT_LIMIT = SampleConfiguration.FRAGMENT_COUNT_LIMIT;
+    private static final boolean EMBEDDED_MEDIA_DRIVER = SampleConfiguration.EMBEDDED_MEDIA_DRIVER;
+
+    /**
+     * Main method for launching the process.
+     *
+     * @param args passed to the process.
+     */
+    public static void main(final String[] args)
+    {
+        System.out.println("Subscribing to " + CHANNEL + " on stream id " + STREAM_ID);
+
+        final MediaDriver driver = EMBEDDED_MEDIA_DRIVER ? MediaDriver.launchEmbedded() : null;
+        final Aeron.Context ctx = new Aeron.Context()
+                .availableImageHandler(SamplesUtil::printAvailableImage)
+                .unavailableImageHandler(SamplesUtil::printUnavailableImage);
+
+        if (EMBEDDED_MEDIA_DRIVER)
+        {
+            ctx.aeronDirectoryName(driver.aeronDirectoryName());
+        }
+
+        final FragmentHandler fragmentHandler = SamplesUtil.printAsciiMessage(STREAM_ID);
+        final AtomicBoolean running = new AtomicBoolean(true);
+
+        // Register a SIGINT handler for graceful shutdown.
+        SigInt.register(() -> running.set(false));
+
+        // Create an Aeron instance using the configured Context and create a
+        // Subscription on that instance that subscribes to the configured
+        // channel and stream ID.
+        // The Aeron and Subscription classes implement "AutoCloseable" and will automatically
+        // clean up resources when this try block is finished
+        try (Aeron aeron = Aeron.connect(ctx);
+             Subscription subscription = aeron.addSubscription(CHANNEL, STREAM_ID))
+        {
+            SamplesUtil.subscriberLoop(fragmentHandler, FRAGMENT_COUNT_LIMIT, running).accept(subscription);
+
+            System.out.println("Shutting down...");
+        }
+
+        CloseHelper.close(driver);
+    }
+}