Package io.helidon.common.reactive
Class OriginThreadPublisher<T,U>
- java.lang.Object
-
- io.helidon.common.reactive.OriginThreadPublisher<T,U>
-
- Type Parameters:
T
- type of published itemsU
- type of submitted items
- All Implemented Interfaces:
Flow.Publisher<T>
public abstract class OriginThreadPublisher<T,U> extends Object implements Flow.Publisher<T>
The OriginThreadPublisher's nature is to always runFlow.Subscriber.onNext(Object)
on the very same thread assubmit(Object)
. In other words, whenever the source of chunks sends data, the same thread is used to deliver the data to the subscriber.Standard publisher implementations (such as
SubmissionPublisher
or Reactor Flux would use the same thread asFlow.Subscription.request(long)
was called on to deliver the chunk when the data are already available; this implementation however strictly uses the originating thread.In order to be able to achieve such behavior, this publisher provides hooks on subscription methods:
hookOnCancel()
andhookOnRequested(long, long)
.This publisher allows only a single subscriber.
-
-
Constructor Summary
Constructors Modifier Constructor Description protected
OriginThreadPublisher()
Create same thread publisher.protected
OriginThreadPublisher(UnboundedSemaphore semaphore)
Create same thread publisher.
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
complete()
Synchronously triggerFlow.Subscriber.onComplete()
.void
drain()
If not subscribed to, consume all the items from this publisher.protected void
drain(T item)
Process a drained item.void
error(Throwable throwable)
Synchronously triggerFlow.Subscriber.onError(Throwable)
.protected void
hookOnCancel()
Hook invoked after calls toFlow.Subscription.cancel()
.protected void
hookOnRequested(long n, long result)
Hook invoked after calls toFlow.Subscription.request(long)
.boolean
isCompleted()
Indicates that the only one possible associated subscriber has been completed.boolean
requiresMoreItems()
Indicate that more items should be published in order to meet the current demand of the subscriber.void
submit(U data)
Submit the data to the subscriber.void
subscribe(Flow.Subscriber<? super T> originalSubscriber)
Adds the given Subscriber if possible.long
tryAcquire()
In a non-blocking manner, try to acquire an allowance to publish next item.protected T
wrap(U data)
Wrap the submitted data into an item that can be published.
-
-
-
Constructor Detail
-
OriginThreadPublisher
protected OriginThreadPublisher(UnboundedSemaphore semaphore)
Create same thread publisher.- Parameters:
semaphore
- the semaphore to indicate the amount of requested data. The owner of this publisher is responsible to send the data as determined by the semaphore (i.e., to properly acquire a permission to send the data; to not send when the number of permits is zero).
-
OriginThreadPublisher
protected OriginThreadPublisher()
Create same thread publisher.
-
-
Method Detail
-
subscribe
public void subscribe(Flow.Subscriber<? super T> originalSubscriber)
Description copied from interface:Flow.Publisher
Adds the given Subscriber if possible. If already subscribed, or the attempt to subscribe fails due to policy violations or errors, the Subscriber'sonError
method is invoked with anIllegalStateException
. Otherwise, the Subscriber'sonSubscribe
method is invoked with a newFlow.Subscription
. Subscribers may enable receiving items by invoking therequest
method of this Subscription, and may unsubscribe by invoking itscancel
method.- Specified by:
subscribe
in interfaceFlow.Publisher<T>
- Parameters:
originalSubscriber
- the subscriber
-
wrap
protected T wrap(U data)
Wrap the submitted data into an item that can be published. This implementation castsU
toT
.- Parameters:
data
- submitted data- Returns:
- item to publish
- Throws:
ClassCastException
- ifU
cannot be cast toT
-
submit
public void submit(U data)
Submit the data to the subscriber. The same thread is used to callFlow.Subscriber.onNext(Object)
. That is, the data are synchronously passed to the subscriber.Note that in order to maintain a consistency of this publisher, this method must be called only once per a single permit that must be acquired by
tryAcquire()
.- Parameters:
data
- the chunk of data to send to the subscriber
-
drain
public void drain()
If not subscribed to, consume all the items from this publisher.
-
drain
protected void drain(T item)
Process a drained item. This default implementation of this method is a no-op, it is meant to be overridden by sub-classes to customize the draining process.- Parameters:
item
- drained item
-
error
public void error(Throwable throwable)
Synchronously triggerFlow.Subscriber.onError(Throwable)
.- Parameters:
throwable
- the exception to send
-
complete
public void complete()
Synchronously triggerFlow.Subscriber.onComplete()
.
-
tryAcquire
public long tryAcquire()
In a non-blocking manner, try to acquire an allowance to publish next item.- Returns:
- original number of requests on the very one associated subscriber's subscription; if
0
is returned, the requester didn't obtain a permit to publish next item. In case aLong.MAX_VALUE
is returned, the requester is informed that unlimited number of items can be published.
-
isCompleted
public boolean isCompleted()
Indicates that the only one possible associated subscriber has been completed.- Returns:
- whether this publisher has successfully finished
-
requiresMoreItems
public boolean requiresMoreItems()
Indicate that more items should be published in order to meet the current demand of the subscriber.- Returns:
- whether this publisher currently satisfies the subscriber
-
hookOnRequested
protected void hookOnRequested(long n, long result)
Hook invoked after calls toFlow.Subscription.request(long)
.- Parameters:
n
- the requested countresult
- the current total cumulative requested count; ranges between [0,Long.MAX_VALUE
] where the max indicates that this publisher is unbounded
-
hookOnCancel
protected void hookOnCancel()
Hook invoked after calls toFlow.Subscription.cancel()
.
-
-