Combine Publishers, Subscriptions, and Subscribers — Under the Hood

With implementation examples

Pedro Alvarez
Better Programming

--

Image from http://www.maestro.ind.br/a-importancia-das-engrenagens-em-sistemas-industriais/

Maybe you have already studied the Combine native framework released by Apple at WWDC19 and have memorized so huge list of publishers, subscribers, and operators.

Perhaps you only know that the first one is responsible for generating new asynchronous values that should update the second. Okay, that's what it consists of! But often a lot of us haven’t dug into how these protocols communicate between them and some might not know they were protocols (Aha!).

So in this article, we will talk about how to create your custom publishers, subscriptions, and subscribers by implementing those protocols in a way that when a publisher emits some new value, all its subscriptions may tell the subscribers about that in order to take the proper handling(and not, sink and assign are not the only available subscribers at all).

Understanding each of them

To start off, it's essential to understand the true relationship between those three entities: Publisher, Subscription, and Subscriber, and mainly what their interface contracts consist of.

Apple doesn't recommend you to implement a Publisher type because all the native and predefined ones already do the right tasks in order to keep the perfect synchrony between them and their subscribers, but this article aims at understanding how that synchrony actually works, so let's move on!

1. Publisher

The publisher is a type that conforms to the Publisher protocol and as the name already says, it's meant to publish values. It has two generic types within: an Output , which is the type of value that it shall emit to its subscribers and a Failure type, that inherits from Swift Error and is meant to deliver some error that happened during the subscription to the subscriber.

As you can see, the Publisherconsists of the two associated types and a single method that receives a subscriber as a parameter. When this method is called, the custom Publisher must create a Subscription object and send it to the Subscriber .

Briefly describing the Publisher 's lifecycle, it starts by calling this receive method and then it sends a subscription to the subscriber. The publisher keeps sending values asynchronously and then it stops in two exclusive ways: by finishing or by a failure, which corresponds to the Failure generic type.

2. Subscription

The subscription object is not as much known by Combine developers as it should be. The important thing to say about this protocol is that the object that implements it is responsible to link a subscriber to the publisher. As long as it's in memory, the subscriber keeps receiving values. It consists of just one method:

The request method is called by the subscriber once it receives the subscription from the publisher and it's responsible for establishing how many values the subscriber is requesting from the publisher, which is defined by the Demand enum.

It may be none , which corresponds to no value to receive, a max(value) that defines that the subscriber requests value times, or unlimited , that corresponds to receiving infinity value as long as the subscription is alive(not receiving a completion from the publisher).

It's important to remind you that what really links publisher and subscriber is the Subscription object — which contains a reference to the subscriber in order to keep it up to date and a reference to an object related to the publisher to notify it sent a new output value.

Despite Subscriptionis not always mentioned when talking about Combine, I would say this is the most important entity regarding this concept in reactive programming.

As the Subscription protocol inherits from Cancellable , by default it has a cancel method, which is responsible to cancel the link to the subscriber. Usually, it just sets the subscriber property as nil .

Another important thing to say is that the way the values are received from the publisher depends truly on the custom subscription, but we will talk deeper about this when implementing one later.

3. Subscriber

At last, we have the Subscriber, which is the object that is kept up to date with the publisher lifecycle and requests the demand for these values through the Subscription . It's the subscriber who really handles the events from the publisher, and it has three methods:

The first receive method is sent by the Publisher itself and contains a Subscription as a parameter. The idea behind the curtains is the Subscriber to call the request method on the Subscription in order to tell it how many values it's prepared to receive from the Publisher . Inside the implementation, the subscriber may do any other extra action.

The second receive method is meant to receive the values that come from the Publisher and do the proper handling. As you can see, it returns a Demand type to the subscription, who called it, and it shall adjust the number of values that it actually requires. It's important to mention that it doesn't fully update the demand that is kept within the Subscription but just adds more to the existing one.

The last receive method is meant to receive a completion event from the publisher and handle the finish event or maybe an incoming failure.

Note that in order to make this process works, the generic types Input and Output from Subscriber and Publisher must be the same, as one that is published in the same that is received.

Subscription lifecycle

By meeting these protocols, now we are secure to understand how the relation between publisher and subscriber is established:

  1. A new subscriber requires a subscription from the publisher by calling receive method on the publisher by passing the subscriber as a parameter
  2. The publisher creates a custom subscription, an object that is responsible for keeping the subscriber up to date with the publisher and sends it to the subscriber through the receive(Subscription) method.
  3. Within the receive(Subscription) method, the subscriber calls the request method on the Subscription object that was just received establishing the true demand it requires from the publisher.
  4. When receiving the request method, the Subscription object has the Demand the subscriber requires and knows how many values it must receive from the publisher
  5. As the Subscription has some mechanism for keeping track of publisher's emitted values, it just needs to send them to the subscriber via the receive(Input) method.
  6. When some event requires the subscription to complete, the Subscription calls the receive(Completion) method on the Subscriber and the process finishes.

Creating our own Publisher, Subscription and Subscriber

In order to understand how this powerful framework works under the hood, we are going to implement a publisher for a class that just holds an integer value and when it's changed, we tell its subscribers about it in order to handle. Enough talking, let's start it.

This is our HoldValue custom class, as you see, it's only meant to hold a single integer value that may change. Now we want a publisher for it in order to keep track of some instance value changes and handle it via a subscriber. Simple, not? It's even similar to the publisher generated by the Published property wrapper when declaring some ObservableObject class. We are implementing a very similar case.

Implementing our publisher

We want a publisher in order to receive values when our HoldValue instance changes its internal property, so let's have a new class implementing Combine's Publisher protocol:

Now we have a new Publisher class defining the two associated types we need: an Int as Output , which our publisher shall emit to the subscriber and a Never as Failure , since our class will never end with an error. As you can see, we are declaring a HoldValue instance as a property in our publisher as well. We need this instance in order to keep track of its events to send to our subscriber.

As we need some mechanism to listen to our HoldValue object and we want our publisher to be available for multiple subscribers to attach, we are going to define a set of completions that shall update multiple listeners:

Now we have an array of closures that will deal with the value's change, each one will correspond later to a subscription. Note that each time our value changes, an observer iterates through all handlers and execute them with our new value. That will be very important soon.

Creating our subscription

As we talked about before, the Subscription is the most important entity regarding Combine, since it's what keeps a link between our publisher and a subscriber.

The subscriber keeps updating while the Subscription is in there as a mediator. We are going to create a custom Subscription type, which will implement some logic to attend the subscriber's demand. And for that, it must have a way of listening to the publisher and sending the new output value to the subscriber it's responsible for to make a synchronization.

This is our Subscription type for our HoldValuePublisher . It receives a subscriber that shall be responsible for sending values and an instance of our HoldValue class that it must listen to.

As it's a Cancellable type, it implements the cancel method that will cut the link between the subscription and the subscriber (and then with the publisher as well). It just assigns the subscriber to nil .

As we want to keep track of the HoldValue value, what we must do for now is append a new closure to it that will handle our change of value as we saw before:

So, our logic works in the following way: we set our subscriber reference within our subscription and a reference to HoldValue as well. Then we append a new closure to our HoldValue instance that will send a new value to the subscriber via the receive(Input) method. Easy, right?

Now the subscription acts as a mediator between our publisher and subscriber. But it doesn't stop here. We still need to show how the publisher establishes the subscription when a new subscriber attaches to it.

Remember that receive(Subscriber) method in the Publisher protocol? Well, this is the first step in our Combine pipeline and is actually responsible for instantiating the Subscription and sending to the Subscriber , therefore creating the link between the two entities:

The Publisher , when received a new subscriber, creates a new subscription and sends to the subscriber to handle. The Subscription receives the Subscriber itself and a reference to the class the Publisher publishes its values(sorry for the redundancy!). Now it's time for the subscriber to request a demand to the subscription.

Dealing with Demand

As we saw before, the Subscription has a method called request that receives a Subscribers.Demand and we didn't implement it yet. This method is responsible for receiving the Demand object, which internally contains the number of times our subscriber actually wants to receive values and with that our subscription makes some logic to deliver exactly what our subscriber expects.

Before implementing that, there is just one thing that you must know regarding demands and our publisher lifecycle. The demand is totally related to the subscriber while the lifecycle belongs to the publisher, which means that if our subscriber stops receiving new inputs due to the lack of demand, it will not necessarily receive a completion event from the subscription. We will see that further, for now just follow the code:

Add two new properties to our subscription class, a counter , that shall keep track of how many values has our subscriber received so far and a maximum , which corresponds to our demand.

In our request method, just assign the max property from our inputed demand to the maximum property from the HoldValue subscription.

Implementing demand logic

Now that we have saved the maximum we want to send to the subscriber, we need to implement some logic inside our subscription in order to fulfill this demand. Inside the initializer, append a new handler to the HoldValue instance.

Now we implement the magic: inside our new holdValue closure, check if the maximum value exists(not nil ), and if it does, check if our counter is less than maximum . If it is, means that our given subscriber is still able to receive new inputs, so we just call the receive(Input) method with our new integer from HoldValue and naturally, increment the counter to update our logic. If the counter reached our maximum demand, in this case, we send a finished completion event to our subscriber and cancel the subscription.

If we don't even have a maximum , due to an unlimited demand, we just keep our subscribers receiving values indefinitely. Note that we are triggering a completion event just because there are no more values to be emitted, but in other publishers, like the one created with the Published property wrapper, it doesn't happen at all.

Brilliant, now our subscriber is up to receive demand and just sends values to our subscribers while it needs. So far we implemented a publisher, which creates a subscription, sends it to the subscriber, and the subscriber by its own requests new values according to its demand.

Implementing our subscriber

Last but not the least, we’ll implement our subscriber class. As we talked before, the subscriber is only responsible for handling new input values, the completion event, and requesting how many events it shall receive.

Let's create a new custom class:

What we have done?

We created a new HoldValueSubscriber type defining its associated values as the same from the publisher: an Int as input and a Never as the failure since it doesn't expect any error.

Let's take a look at the interface methods:

  1. .receive(Subscription): It's called from the publisher's side as we saw before and sends a new request to the Subscription by passing a demand object. As it only needs three values, we set a maximum of three into our demand. At the end, in order to keep our subscription alive, it's saved in the cancellables set.
  2. receive(Input): This method is called from the subscription's side and just sends a new input value to the subscriber. We handle that by just printing the value. The returned demand is a value that increments the demand in the subscription class. As we don't want it to increase, we return a none , which is the same as max(0)
  3. receive(Completion): This method is also called from the subscription's side and sends a completion object that notifies our subscriber that the subscription is over with a finished completion our a failure one. It just prints the completion event

Make the publisher accessible by its origin class

We created a publisher that shall serve the HoldValue class. This is the class that we actually want to listen.

In order to access it from the HoldValue origin class as most Combine publishers do, implement this Swift extension:

Now, if you want to make a subscription, just access the publisher from the HoldValue instance.

Testing our subscription

Now that we created our three Combine custom types, let's test this subscription:

What we are doing? We created a new instance for HoldValue , a new HoldValue subscriber and made it subscribe to our instance publisher. This establishes the pipeline we saw before: The publisher internally creates a new subscription, sends it to the subscriber, the subscriber request a demand to the subscription and now it's ready to receive new inputs(outputs from the publisher).

We now iterate through the integer numbers from 0 to 9 and update our holdValue value to each number. As we print each of the input values in our subscriber, take a look at the console:

It receives three values, as demanded, and after that, the subscription sends a completion event to the subscriber, which is also printed.

Conclusion

Now you completely understand the pipeline of a Combine subscription. You know that our publisher is responsible for sending a new subscription to a subscriber when it's requested, you know that our subscriber is who actually defines its own demand and that the subscription is who implements the logic to make the subscriber receive a completion.

As we talked about before, Apple doesn't recommend us to implement publishers and subscriptions since a logical mistake may compromise all of the connections between publisher and subscriber, which may lead to unexpected results.

But it's essential to understand how the Combine framework works under the hood and I really hope you enjoyed this article ;)

--

--

iOS | Android Developer - WWDC19 scholarship winner- Blockchain enthusiast