1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
use crate::{BoxFuture, Subscription};

use futures::{channel::mpsc, sink::Sink};
use std::{collections::HashMap, marker::PhantomData};

/// A registry of subscription streams.
///
/// If you have an application that continuously returns a [`Subscription`],
/// you can use a [`Tracker`] to keep track of the different recipes and keep
/// its executions alive.
#[derive(Debug)]
pub struct Tracker<Hasher, Event> {
    subscriptions: HashMap<u64, Execution<Event>>,
    _hasher: PhantomData<Hasher>,
}

#[derive(Debug)]
pub struct Execution<Event> {
    _cancel: futures::channel::oneshot::Sender<()>,
    listener: Option<futures::channel::mpsc::Sender<Event>>,
}

impl<Hasher, Event> Tracker<Hasher, Event>
where
    Hasher: std::hash::Hasher + Default,
    Event: 'static + Send + Clone,
{
    /// Creates a new empty [`Tracker`].
    ///
    /// [`Tracker`]: struct.Tracker.html
    pub fn new() -> Self {
        Self {
            subscriptions: HashMap::new(),
            _hasher: PhantomData,
        }
    }

    /// Updates the [`Tracker`] with the given [`Subscription`].
    ///
    /// A [`Subscription`] can cause new streams to be spawned or old streams
    /// to be closed.
    ///
    /// The [`Tracker`] keeps track of these streams between calls to this
    /// method:
    ///
    /// - If the provided [`Subscription`] contains a new [`Recipe`] that is
    /// currently not being run, it will spawn a new stream and keep it alive.
    /// - On the other hand, if a [`Recipe`] is currently in execution and the
    /// provided [`Subscription`] does not contain it anymore, then the
    /// [`Tracker`] will close and drop the relevant stream.
    ///
    /// It returns a list of futures that need to be spawned to materialize
    /// the [`Tracker`] changes.
    ///
    /// [`Tracker`]: struct.Tracker.html
    /// [`Subscription`]: struct.Subscription.html
    /// [`Recipe`]: trait.Recipe.html
    pub fn update<Message, Receiver>(
        &mut self,
        subscription: Subscription<Hasher, Event, Message>,
        receiver: Receiver,
    ) -> Vec<BoxFuture<()>>
    where
        Message: 'static + Send,
        Receiver: 'static
            + Sink<Message, Error = mpsc::SendError>
            + Unpin
            + Send
            + Clone,
    {
        use futures::{future::FutureExt, stream::StreamExt};

        let mut futures: Vec<BoxFuture<()>> = Vec::new();

        let recipes = subscription.recipes();
        let mut alive = std::collections::HashSet::new();

        for recipe in recipes {
            let id = {
                let mut hasher = Hasher::default();
                recipe.hash(&mut hasher);

                hasher.finish()
            };

            let _ = alive.insert(id);

            if self.subscriptions.contains_key(&id) {
                continue;
            }

            let (cancel, cancelled) = futures::channel::oneshot::channel();

            // TODO: Use bus if/when it supports async
            let (event_sender, event_receiver) =
                futures::channel::mpsc::channel(100);

            let stream = recipe.stream(event_receiver.boxed());

            let future = futures::future::select(
                cancelled,
                stream.map(Ok).forward(receiver.clone()),
            )
            .map(|_| ());

            let _ = self.subscriptions.insert(
                id,
                Execution {
                    _cancel: cancel,
                    listener: if event_sender.is_closed() {
                        None
                    } else {
                        Some(event_sender)
                    },
                },
            );

            futures.push(Box::pin(future));
        }

        self.subscriptions.retain(|id, _| alive.contains(&id));

        futures
    }

    /// Broadcasts an event to the subscriptions currently alive.
    ///
    /// A subscription's [`Recipe::stream`] always receives a stream of events
    /// as input. This stream can be used by some subscription to listen to
    /// shell events.
    ///
    /// This method publishes the given event to all the subscription streams
    /// currently open.
    ///
    /// [`Recipe::stream`]: trait.Recipe.html#tymethod.stream
    pub fn broadcast(&mut self, event: Event) {
        self.subscriptions
            .values_mut()
            .filter_map(|connection| connection.listener.as_mut())
            .for_each(|listener| {
                if let Err(error) = listener.try_send(event.clone()) {
                    log::error!(
                        "Error sending event to subscription: {:?}",
                        error
                    );
                }
            });
    }
}