Combine Publishers, Subscriptions, and Subscribers — Under the Hood
With implementation examples
Maybe you have already studied the Combine native framework released by Apple at WWDC19 and have memorized so huge list of publishers, subscribers, and operators.
Perhaps you only know that the first one is responsible for generating new asynchronous values that should update the second. Okay, that's what it consists of! But often a lot of us haven’t dug into how these protocols communicate between them and some might not know they were protocols (Aha!).
So in this article, we will talk about how to create your custom publishers, subscriptions, and subscribers by implementing those protocols in a way that when a publisher emits some new value, all its subscriptions may tell the subscribers about that in order to take the proper handling(and not, sink
and assign
are not the only available subscribers at all).
Understanding each of them
To start off, it's essential to understand the true relationship between those three entities: Publisher
, Subscription
, and Subscriber
, and mainly what their interface contracts consist of.
Apple doesn't recommend you to implement a Publisher
type because all the native and predefined ones already do the right tasks in order to keep the perfect synchrony between them and their subscribers, but this article aims at understanding how that synchrony actually works, so let's move on!
1. Publisher
The publisher is a type that conforms to the Publisher
protocol and as the name already says, it's meant to publish values. It has two generic types within: an Output
, which is the type of value that it shall emit to its subscribers
and a Failure
type, that inherits from Swift Error
and is meant to deliver some error that happened during the subscription to the subscriber.
As you can see, the Publisher
consists of the two associated types and a single method that receives a subscriber
as a parameter. When this method is called, the custom Publisher
must create a Subscription
object and send it to the Subscriber
.
Briefly describing the Publisher
's lifecycle, it starts by calling this receive
method and then it sends a subscription
to the subscriber
. The publisher
keeps sending values asynchronously and then it stops in two exclusive ways: by finishing or by a failure, which corresponds to the Failure
generic type.
2. Subscription
The subscription object is not as much known by Combine developers as it should be. The important thing to say about this protocol is that the object that implements it is responsible to link a subscriber
to the publisher
. As long as it's in memory, the subscriber
keeps receiving values. It consists of just one method:
The request
method is called by the subscriber
once it receives the subscription
from the publisher
and it's responsible for establishing how many values the subscriber
is requesting from the publisher
, which is defined by the Demand
enum.
It may be none
, which corresponds to no value to receive, a max(value)
that defines that the subscriber requests value
times, or unlimited
, that corresponds to receiving infinity value as long as the subscription
is alive(not receiving a completion from the publisher
).
It's important to remind you that what really links publisher
and subscriber
is the Subscription
object — which contains a reference to the subscriber
in order to keep it up to date and a reference to an object related to the publisher
to notify it sent a new output value.
Despite Subscription
is not always mentioned when talking about Combine, I would say this is the most important entity regarding this concept in reactive programming.
As the Subscription
protocol inherits from Cancellable
, by default it has a cancel
method, which is responsible to cancel the link to the subscriber. Usually, it just sets the subscriber property as nil
.
Another important thing to say is that the way the values are received from the publisher depends truly on the custom subscription
, but we will talk deeper about this when implementing one later.
3. Subscriber
At last, we have the Subscriber
, which is the object that is kept up to date with the publisher
lifecycle and requests the demand for these values through the Subscription
. It's the subscriber
who really handles the events from the publisher
, and it has three methods:
The first receive
method is sent by the Publisher
itself and contains a Subscription
as a parameter. The idea behind the curtains is the Subscriber
to call the request
method on the Subscription
in order to tell it how many values it's prepared to receive from the Publisher
. Inside the implementation, the subscriber
may do any other extra action.
The second receive
method is meant to receive the values that come from the Publisher
and do the proper handling. As you can see, it returns a Demand
type to the subscription, who called it, and it shall adjust the number of values that it actually requires. It's important to mention that it doesn't fully update the demand that is kept within the Subscription
but just adds more to the existing one.
The last receive
method is meant to receive a completion event from the publisher
and handle the finish event or maybe an incoming failure.
Note that in order to make this process works, the generic types Input
and Output
from Subscriber
and Publisher
must be the same, as one that is published in the same that is received.
Subscription lifecycle
By meeting these protocols, now we are secure to understand how the relation between publisher
and subscriber
is established:
- A new subscriber requires a subscription from the publisher by calling
receive
method on the publisher by passing the subscriber as a parameter - The publisher creates a custom subscription, an object that is responsible for keeping the subscriber up to date with the publisher and sends it to the subscriber through the
receive(Subscription)
method. - Within the
receive(Subscription)
method, the subscriber calls therequest
method on theSubscription
object that was just received establishing the true demand it requires from the publisher. - When receiving the
request
method, theSubscription
object has theDemand
the subscriber requires and knows how many values it must receive from the publisher - As the
Subscription
has some mechanism for keeping track of publisher's emitted values, it just needs to send them to the subscriber via thereceive(Input)
method. - When some event requires the subscription to complete, the
Subscription
calls thereceive(Completion)
method on theSubscriber
and the process finishes.
Creating our own Publisher, Subscription and Subscriber
In order to understand how this powerful framework works under the hood, we are going to implement a publisher for a class that just holds an integer value and when it's changed, we tell its subscribers about it in order to handle. Enough talking, let's start it.
This is our HoldValue
custom class, as you see, it's only meant to hold a single integer value that may change. Now we want a publisher
for it in order to keep track of some instance value changes and handle it via a subscriber
. Simple, not? It's even similar to the publisher
generated by the Published
property wrapper when declaring some ObservableObject
class. We are implementing a very similar case.
Implementing our publisher
We want a publisher
in order to receive values when our HoldValue
instance changes its internal property, so let's have a new class implementing Combine's Publisher
protocol:
Now we have a new Publisher
class defining the two associated types we need: an Int
as Output
, which our publisher shall emit to the subscriber and a Never
as Failure
, since our class will never end with an error. As you can see, we are declaring a HoldValue
instance as a property in our publisher
as well. We need this instance in order to keep track of its events to send to our subscriber
.
As we need some mechanism to listen to our HoldValue
object and we want our publisher to be available for multiple subscribers to attach, we are going to define a set of completions that shall update multiple listeners:
Now we have an array of closures that will deal with the value's change, each one will correspond later to a subscription
. Note that each time our value changes, an observer iterates through all handlers and execute them with our new value. That will be very important soon.
Creating our subscription
As we talked about before, the Subscription
is the most important entity regarding Combine, since it's what keeps a link between our publisher and a subscriber.
The subscriber keeps updating while the Subscription
is in there as a mediator. We are going to create a custom Subscription
type, which will implement some logic to attend the subscriber's demand. And for that, it must have a way of listening to the publisher
and sending the new output value to the subscriber
it's responsible for to make a synchronization.
This is our Subscription
type for our HoldValuePublisher
. It receives a subscriber
that shall be responsible for sending values and an instance of our HoldValue
class that it must listen to.
As it's a Cancellable
type, it implements the cancel
method that will cut the link between the subscription and the subscriber (and then with the publisher as well). It just assigns the subscriber to nil
.
As we want to keep track of the HoldValue
value, what we must do for now is append a new closure to it that will handle our change of value as we saw before:
So, our logic works in the following way: we set our subscriber
reference within our subscription
and a reference to HoldValue
as well. Then we append a new closure to our HoldValue
instance that will send a new value to the subscriber via the receive(Input)
method. Easy, right?
Now the subscription
acts as a mediator between our publisher
and subscriber
. But it doesn't stop here. We still need to show how the publisher establishes the subscription
when a new subscriber attaches to it.
Remember that receive(Subscriber)
method in the Publisher
protocol? Well, this is the first step in our Combine pipeline and is actually responsible for instantiating the Subscription
and sending to the Subscriber
, therefore creating the link between the two entities:
The Publisher
, when received a new subscriber
, creates a new subscription
and sends to the subscriber
to handle. The Subscription
receives the Subscriber
itself and a reference to the class the Publisher
publishes its values(sorry for the redundancy!). Now it's time for the subscriber
to request a demand to the subscription
.
Dealing with Demand
As we saw before, the Subscription
has a method called request
that receives a Subscribers.Demand
and we didn't implement it yet. This method is responsible for receiving the Demand
object, which internally contains the number of times our subscriber actually wants to receive values and with that our subscription makes some logic to deliver exactly what our subscriber expects.
Before implementing that, there is just one thing that you must know regarding demands and our publisher lifecycle. The demand is totally related to the subscriber while the lifecycle belongs to the publisher, which means that if our subscriber stops receiving new inputs due to the lack of demand, it will not necessarily receive a completion event from the subscription. We will see that further, for now just follow the code:
Add two new properties to our subscription class, a counter
, that shall keep track of how many values has our subscriber received so far and a maximum
, which corresponds to our demand.
In our request
method, just assign the max
property from our inputed demand to the maximum
property from the HoldValue
subscription.
Implementing demand logic
Now that we have saved the maximum we want to send to the subscriber
, we need to implement some logic inside our subscription
in order to fulfill this demand. Inside the initializer, append a new handler to the HoldValue
instance.
Now we implement the magic: inside our new holdValue
closure, check if the maximum
value exists(not nil
), and if it does, check if our counter
is less than maximum
. If it is, means that our given subscriber is still able to receive new inputs, so we just call the receive(Input)
method with our new integer from HoldValue
and naturally, increment the counter
to update our logic. If the counter
reached our maximum
demand, in this case, we send a finished
completion event to our subscriber and cancel the subscription
.
If we don't even have a maximum
, due to an unlimited
demand, we just keep our subscribers receiving values indefinitely. Note that we are triggering a completion event just because there are no more values to be emitted, but in other publishers, like the one created with the Published
property wrapper, it doesn't happen at all.
Brilliant, now our subscriber
is up to receive demand and just sends values to our subscribers while it needs. So far we implemented a publisher
, which creates a subscription
, sends it to the subscriber, and the subscriber by its own requests new values according to its demand.
Implementing our subscriber
Last but not the least, we’ll implement our subscriber class. As we talked before, the subscriber
is only responsible for handling new input values, the completion event, and requesting how many events it shall receive.
Let's create a new custom class:
What we have done?
We created a new HoldValueSubscriber
type defining its associated values as the same from the publisher: an Int
as input and a Never
as the failure since it doesn't expect any error.
Let's take a look at the interface methods:
.receive(Subscription)
: It's called from the publisher's side as we saw before and sends a new request to theSubscription
by passing a demand object. As it only needs three values, we set a maximum of three into our demand. At the end, in order to keep our subscription alive, it's saved in thecancellables
set.receive(Input)
: This method is called from the subscription's side and just sends a new input value to the subscriber. We handle that by just printing the value. The returned demand is a value that increments the demand in the subscription class. As we don't want it to increase, we return anone
, which is the same asmax(0)
receive(Completion)
: This method is also called from the subscription's side and sends a completion object that notifies our subscriber that the subscription is over with afinished
completion our afailure
one. It just prints the completion event
Make the publisher accessible by its origin class
We created a publisher that shall serve the HoldValue
class. This is the class that we actually want to listen.
In order to access it from the HoldValue
origin class as most Combine publishers do, implement this Swift extension:
Now, if you want to make a subscription, just access the publisher
from the HoldValue
instance.
Testing our subscription
Now that we created our three Combine custom types, let's test this subscription:
What we are doing? We created a new instance for HoldValue
, a new HoldValue
subscriber
and made it subscribe to our instance publisher
. This establishes the pipeline we saw before: The publisher
internally creates a new subscription
, sends it to the subscriber, the subscriber request a demand to the subscription and now it's ready to receive new inputs(outputs from the publisher
).
We now iterate through the integer numbers from 0 to 9 and update our holdValue
value to each number. As we print each of the input values in our subscriber, take a look at the console:
It receives three values, as demanded, and after that, the subscription
sends a completion event to the subscriber
, which is also printed.
Conclusion
Now you completely understand the pipeline of a Combine subscription. You know that our publisher
is responsible for sending a new subscription to a subscriber
when it's requested, you know that our subscriber
is who actually defines its own demand and that the subscription
is who implements the logic to make the subscriber
receive a completion.
As we talked about before, Apple doesn't recommend us to implement publishers and subscriptions since a logical mistake may compromise all of the connections between publisher and subscriber, which may lead to unexpected results.
But it's essential to understand how the Combine framework works under the hood and I really hope you enjoyed this article ;)