Class BufferedEmittingPublisher<T>
- java.lang.Object
-
- io.helidon.common.reactive.BufferedEmittingPublisher<T>
-
- Type Parameters:
T
- type of emitted item
- All Implemented Interfaces:
Flow.Publisher<T>
public class BufferedEmittingPublisher<T> extends Object implements Flow.Publisher<T>
Emitting publisher for manual publishing with built-in buffer for handling backpressure.This publisher allows only a single subscriber.
-
-
Constructor Summary
Constructors Modifier Constructor Description protected
BufferedEmittingPublisher()
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description int
bufferSize()
Estimated size of the buffer.protected void
cleanup()
Override, if you prefer to do cleanup in a uniform way, instead of requiring everyone to register a onCleanup.void
clearBuffer(Consumer<T> consumer)
Clear whole buffer, invoke consumer for each item before discarding it.void
complete()
Send onComplete to downstream after it consumes the entire buffer.void
completeNow()
SendonComplete
signal downstream immediately, regardless of the buffer content.static <T> BufferedEmittingPublisher<T>
create()
Create newBufferedEmittingPublisher
.void
emit(T item)
Emit item to the stream, if there is no immediate demand from downstream, buffer item for sending when demand is signaled.void
fail(Throwable throwable)
SendonError
signal downstream, regardless of the buffer content.boolean
hasRequests()
Check if demand is higher than 0.boolean
isCancelled()
Check if publisher is in terminal state CANCELLED.boolean
isCompleted()
Check if publisher sentonComplete
signal downstream.boolean
isUnbounded()
Check if downstream requested unbounded number of items eg.void
onAbort(Consumer<? super Throwable> onAbort)
Callback executed when this Publisher fails or is cancelled in a way that the entity performing emit() may be unaware of.void
onCleanup(Consumer<? super T> onCleanup)
Callback executed to clean up the buffer, when the Publisher terminates without passing ownership of buffered items to anyone (fail, completeNow, or the Subscription is cancelled).void
onEmit(Consumer<T> onEmitCallback)
Callback executed right afteronNext
is actually sent.void
onRequest(BiConsumer<Long,Long> requestCallback)
Callback executed when request signal from downstream arrive.void
subscribe(Flow.Subscriber<? super T> sub)
-
-
-
Method Detail
-
create
public static <T> BufferedEmittingPublisher<T> create()
Create newBufferedEmittingPublisher
.- Type Parameters:
T
- type of emitted item- Returns:
- new instance of BufferedEmittingPublisher
-
subscribe
public void subscribe(Flow.Subscriber<? super T> sub)
- Specified by:
subscribe
in interfaceFlow.Publisher<T>
-
onRequest
public void onRequest(BiConsumer<Long,Long> requestCallback)
Callback executed when request signal from downstream arrive.- param
n
the requested count. - param
result
the current total cumulative requested count, ranges between [0,Long.MAX_VALUE
] where the max indicates that this publisher is unbounded.
- Parameters:
requestCallback
- to be executed
- param
-
onEmit
public void onEmit(Consumer<T> onEmitCallback)
Callback executed right afteronNext
is actually sent.- param
i
sent item
- Parameters:
onEmitCallback
- to be executed
- param
-
onCleanup
public void onCleanup(Consumer<? super T> onCleanup)
Callback executed to clean up the buffer, when the Publisher terminates without passing ownership of buffered items to anyone (fail, completeNow, or the Subscription is cancelled).Use case: items buffered require handling their lifecycle, like releasing resources, or returning to a pool.
Calling onCleanup multiple times will ensure that each of the provided Consumers gets a chance to look at the items in the buffer. Usually you do not want to release the same resource to a pool more than once, so you should usually want to ensure you pass one and only one callback to onCleanup. For this reason, do not use together with clearBuffer, unless you know how to have idempotent resource lifecycle management.
- Parameters:
onCleanup
- callback executed to clean up the buffer
-
onAbort
public void onAbort(Consumer<? super Throwable> onAbort)
Callback executed when this Publisher fails or is cancelled in a way that the entity performing emit() may be unaware of.Use case: emit() is issued only if onRequest is received; these will cease upon a failed request or when downstream requests cancellation. onAbort is going to let the entity issuing emit() know that no more onRequest are forthcoming (albeit they may still happen, the items emitted after onAbort will likely be discarded, and not emitted items will not be missed).
In essence the pair of onRequest and onAbort make up the interface like that of a Processor's Subscription's request and cancel. The difference is only the API and the promise: we allow emit() to not heed backpressure (for example, when upstream is really unable to heed backpressure without introducing a buffer of its own, like is the case with many transformations of the form Publisher<T>->Publisher<Publisher<T>>).
In the same vein there really is no restriction as to when onAbort callback can be called - there is no requirement for this Publisher to establish exactly whether the entity performing emit() is aware of the abort (say, a fail), or not. It is only required to ensure that the failures it generates (and not merely forwards to downstream) and cancellations it received, get propagated to the callback.
- Parameters:
onAbort
- callback executed when this Publisher fails or is cancelled
-
emit
public void emit(T item)
Emit item to the stream, if there is no immediate demand from downstream, buffer item for sending when demand is signaled. No-op after downstream enters terminal state. (Cancelled subscription or received onError/onComplete)- Parameters:
item
- to be emitted
-
fail
public void fail(Throwable throwable)
SendonError
signal downstream, regardless of the buffer content. Nothing else can be sent downstream after calling fail. No-op after downstream enters terminal state. (Cancelled subscription or received onError/onComplete)If several fail are invoked in quick succession or concurrently, no guarantee which of them ends up sent to downstream.
- Parameters:
throwable
- Throwable to be sent downstream as onError signal.
-
complete
public void complete()
Send onComplete to downstream after it consumes the entire buffer. Intervening fail invocations can end up sending onError instead of onComplete. No-op after downstream enters terminal state. (Cancelled subscription or received onError/onComplete)
-
completeNow
public void completeNow()
SendonComplete
signal downstream immediately, regardless of the buffer content. Nothing else can be sent downstream after callingcompleteNow()
. No-op after downstream enters terminal state. (Cancelled subscription or received onError/onComplete)
-
clearBuffer
public void clearBuffer(Consumer<T> consumer)
Clear whole buffer, invoke consumer for each item before discarding it. Use case: items in the buffer require discarding properly, freeing up some resources, or returning them to a pool.It is the caller's responsibility to ensure there are no concurrent invocations of clearBuffer, and that there will be no emit calls in the future, as the items processed by those invocations may not be consumed properly.
It is recommended that onCleanup is set up instead of using clearBuffer. Do not use together with onCleanup.
- Parameters:
consumer
- to be invoked for each item
-
isUnbounded
public boolean isUnbounded()
Check if downstream requested unbounded number of items eg.Long.MAX_VALUE
.- Returns:
- true if so
-
hasRequests
public boolean hasRequests()
Check if demand is higher than 0. Returned value should be used as informative and can change asynchronously.- Returns:
- true if demand is higher than 0
-
isCompleted
public boolean isCompleted()
Check if publisher sentonComplete
signal downstream. Returnstrue
right after callingcompleteNow()
(with a caveat) but after callingcomplete()
returnsfalse
until whole buffer has been drained.The caveat is that completeNow() does not guarantee that the onComplete signal is sent before returning from completeNow() - it is only guaranteed to be sent as soon as it can be done.
- Returns:
- true if so
-
isCancelled
public boolean isCancelled()
Check if publisher is in terminal state CANCELLED.It is for information only. It is not guaranteed to tell what happened to downstream, if there were a concurrent cancellation and a completion.
- Returns:
- true if so
-
bufferSize
public int bufferSize()
Estimated size of the buffer. Returned value should be used as informative and can change asynchronously.- Returns:
- estimated size of the buffer
-
cleanup
protected void cleanup()
Override, if you prefer to do cleanup in a uniform way, instead of requiring everyone to register a onCleanup.Use case: a subclass that offers an implementation of BufferedEmittingPublisher<T> for a certain type of resource T.
-
-