Class BufferedEmittingPublisher<T>

java.lang.Object
io.helidon.common.reactive.BufferedEmittingPublisher<T>
Type Parameters:
T - type of emitted item
All Implemented Interfaces:
Flow.Publisher<T>

public class BufferedEmittingPublisher<T> extends Object implements Flow.Publisher<T>
Emitting publisher for manual publishing with built-in buffer for handling backpressure.

This publisher allows only a single subscriber.

  • Constructor Details

    • BufferedEmittingPublisher

      protected BufferedEmittingPublisher(Queue<T> queue)
      A new buffered emitting publisher with a custom queue to be used as a buffer.
      Parameters:
      queue - buffer to use
    • BufferedEmittingPublisher

      protected BufferedEmittingPublisher()
      A new buffered emitting publisher using ConcurrentLinkedQueue as the buffer.
  • Method Details

    • subscribe

      public void subscribe(Flow.Subscriber<? super T> sub)
      Specified by:
      subscribe in interface Flow.Publisher<T>
    • onRequest

      public void onRequest(BiConsumer<Long,Long> requestCallback)
      Callback executed when request signal from downstream arrive.
      • param n the requested count.
      • param result the current total cumulative requested count, ranges between [0, Long.MAX_VALUE] where the max indicates that this publisher is unbounded.
      Parameters:
      requestCallback - to be executed
    • onEmit

      public void onEmit(Consumer<T> onEmitCallback)
      Callback executed right after onNext is actually sent.
      • param i sent item
      Parameters:
      onEmitCallback - to be executed
    • onCleanup

      public void onCleanup(Consumer<? super T> onCleanup)
      Callback executed to clean up the buffer, when the Publisher terminates without passing ownership of buffered items to anyone (fail, completeNow, or the Subscription is cancelled).

      Use case: items buffered require handling their lifecycle, like releasing resources, or returning to a pool.

      Calling onCleanup multiple times will ensure that each of the provided Consumers gets a chance to look at the items in the buffer. Usually you do not want to release the same resource to a pool more than once, so you should usually want to ensure you pass one and only one callback to onCleanup. For this reason, do not use together with clearBuffer, unless you know how to have idempotent resource lifecycle management.

      Parameters:
      onCleanup - callback executed to clean up the buffer
    • onAbort

      public void onAbort(Consumer<? super Throwable> onAbort)
      Callback executed when this Publisher fails or is cancelled in a way that the entity performing emit() may be unaware of.

      Use case: emit() is issued only if onRequest is received; these will cease upon a failed request or when downstream requests cancellation. onAbort is going to let the entity issuing emit() know that no more onRequest are forthcoming (albeit they may still happen, the items emitted after onAbort will likely be discarded, and not emitted items will not be missed).

      In essence the pair of onRequest and onAbort make up the interface like that of a Processor's Subscription's request and cancel. The difference is only the API and the promise: we allow emit() to not heed backpressure (for example, when upstream is really unable to heed backpressure without introducing a buffer of its own, like is the case with many transformations of the form Publisher<T>->Publisher<Publisher<T>>).

      In the same vein there really is no restriction as to when onAbort callback can be called - there is no requirement for this Publisher to establish exactly whether the entity performing emit() is aware of the abort (say, a fail), or not. It is only required to ensure that the failures it generates (and not merely forwards to downstream) and cancellations it received, get propagated to the callback.

      Parameters:
      onAbort - callback executed when this Publisher fails or is cancelled
    • emit

      public void emit(T item)
      Emit item to the stream, if there is no immediate demand from downstream, buffer item for sending when demand is signaled. No-op after downstream enters terminal state. (Cancelled subscription or received onError/onComplete)
      Parameters:
      item - to be emitted
    • fail

      public void fail(Throwable throwable)
      Send onError signal downstream, regardless of the buffer content. Nothing else can be sent downstream after calling fail. No-op after downstream enters terminal state. (Cancelled subscription or received onError/onComplete)

      If several fail are invoked in quick succession or concurrently, no guarantee which of them ends up sent to downstream.

      Parameters:
      throwable - Throwable to be sent downstream as onError signal.
    • complete

      public void complete()
      Send onComplete to downstream after it consumes the entire buffer. Intervening fail invocations can end up sending onError instead of onComplete. No-op after downstream enters terminal state. (Cancelled subscription or received onError/onComplete)
    • completeNow

      public void completeNow()
      Send onComplete signal downstream immediately, regardless of the buffer content. Nothing else can be sent downstream after calling completeNow(). No-op after downstream enters terminal state. (Cancelled subscription or received onError/onComplete)
    • clearBuffer

      public void clearBuffer(Consumer<T> consumer)
      Clear whole buffer, invoke consumer for each item before discarding it. Use case: items in the buffer require discarding properly, freeing up some resources, or returning them to a pool.

      It is the caller's responsibility to ensure there are no concurrent invocations of clearBuffer, and that there will be no emit calls in the future, as the items processed by those invocations may not be consumed properly.

      It is recommended that onCleanup is set up instead of using clearBuffer. Do not use together with onCleanup.

      Parameters:
      consumer - to be invoked for each item
    • isUnbounded

      public boolean isUnbounded()
      Check if downstream requested unbounded number of items eg. Long.MAX_VALUE.
      Returns:
      true if so
    • hasRequests

      public boolean hasRequests()
      Check if demand is higher than 0. Returned value should be used as informative and can change asynchronously.
      Returns:
      true if demand is higher than 0
    • isCompleted

      public boolean isCompleted()
      Check if publisher sent onComplete signal downstream. Returns true right after calling completeNow() (with a caveat) but after calling complete() returns false until whole buffer has been drained.

      The caveat is that completeNow() does not guarantee that the onComplete signal is sent before returning from completeNow() - it is only guaranteed to be sent as soon as it can be done.

      Returns:
      true if so
    • isCancelled

      public boolean isCancelled()
      Check if publisher is in terminal state CANCELLED.

      It is for information only. It is not guaranteed to tell what happened to downstream, if there were a concurrent cancellation and a completion.

      Returns:
      true if so
    • bufferSize

      public int bufferSize()
      Estimated size of the buffer. Returned value should be used as informative and can change asynchronously.
      Returns:
      estimated size of the buffer
    • cleanup

      protected void cleanup()
      Override, if you prefer to do cleanup in a uniform way, instead of requiring everyone to register a onCleanup.

      Use case: a subclass that offers an implementation of BufferedEmittingPublisher<T> for a certain type of resource T.

    • create

      public static <T> BufferedEmittingPublisher<T> create()
      Type Parameters:
      T - type of emitted item
      Returns:
      new instance of BufferedEmittingPublisher
    • builder

      public static <T> BufferedEmittingPublisher.Builder<T> builder()
      Create new builder for BufferedEmittingPublisher.
      Type Parameters:
      T - type of the expected payload
      Returns:
      new builder