ariel_os_sensors/
signal.rs

1//! This module contains a custom [`Signal`] struct meant to be used in the [`ariel-os-sensors`][crate] ecosystem.
2
3use core::{
4    cell::Cell,
5    pin::Pin,
6    task::{Context, Poll, Waker},
7};
8
9use embassy_sync::blocking_mutex::{Mutex, raw::CriticalSectionRawMutex};
10
11#[derive(Debug, Default)]
12enum SignalState<T> {
13    #[default]
14    None,
15    Waiting(Waker),
16    Ready(T),
17}
18
19/// Custom signal struct inspired by [`embassy_sync::signal::Signal`] and [`embassy_sync::channel::Channel`].
20///
21/// This is meant for single-producer and single-consumer signaling.
22///
23/// This struct has been created for the [`ariel-os-sensors`][ariel-os-sensors] ecosystem.
24///
25/// [ariel-os-sensors]: crate
26// This struct exists for multiple reasons:
27// - Get a lightweight [`Future`] that can be easily stored in a struct like [`ReadingWaiter`][crate::sensor::ReadingWaiter]
28// - Keep a stable API that doesn't change with [`embassy_sync`] versions
29pub struct Signal<T> {
30    inner: Mutex<CriticalSectionRawMutex, Cell<SignalState<T>>>,
31}
32
33impl<T> Default for Signal<T> {
34    fn default() -> Self {
35        Self::new()
36    }
37}
38
39impl<T> Signal<T> {
40    /// Create a new empty [`Signal`].
41    #[must_use]
42    pub const fn new() -> Self {
43        Self {
44            inner: Mutex::new(Cell::new(SignalState::<T>::None)),
45        }
46    }
47
48    /// Signal that a new value is available and will replace the previous value if it wasn't read.
49    pub fn signal(&self, new: T) {
50        self.inner.lock(|cell| {
51            let state = cell.take();
52            match state {
53                SignalState::None => {
54                    cell.set(SignalState::Ready(new));
55                }
56                SignalState::Ready(_prev) => {
57                    cell.set(SignalState::Ready(new));
58                }
59                SignalState::Waiting(read_waker) => {
60                    cell.set(SignalState::Ready(new));
61                    read_waker.wake();
62                }
63            }
64        });
65    }
66
67    /// Returns a future that will return once a value is available.
68    ///
69    /// This is not meant to have multiple tasks waiting for a signal. If multiple tasks are waiting
70    /// then a signal sent with [`Self::signal`] will reach only one task at random.
71    pub fn wait(&'static self) -> ReceiveFuture<'static, T> {
72        ReceiveFuture { signaling: self }
73    }
74
75    fn poll_wait(&self, cx: &mut Context<'_>) -> Poll<T> {
76        self.inner.lock(|cell| {
77            let state = cell.take();
78            match state {
79                SignalState::None => {
80                    cell.set(SignalState::Waiting(cx.waker().clone()));
81                    Poll::Pending
82                }
83
84                // Multiple tasks waiting for a reading, this shouldn't happen
85                SignalState::Waiting(prev_waker) => {
86                    if prev_waker.will_wake(cx.waker()) {
87                        cell.set(SignalState::Waiting(prev_waker));
88                    } else {
89                        cell.set(SignalState::Waiting(cx.waker().clone()));
90
91                        // We can't store multiple wakers, they will fight eachother until some data
92                        // is sent.
93                        // This should happen only if multiple tasks are waiting for a measurement.
94                        prev_waker.wake();
95                    }
96                    Poll::Pending
97                }
98                SignalState::Ready(res) => Poll::Ready(res),
99            }
100        })
101    }
102
103    /// Removes any pending value from the signal.
104    pub fn clear(&self) {
105        self.inner.lock(|cell| {
106            let state = cell.take();
107            match state {
108                // Do nothing, `cell.take()` already set the state to `SignalState::None`
109                SignalState::None | SignalState::Ready(_) => {}
110                SignalState::Waiting(waker) => cell.set(SignalState::Waiting(waker)),
111            }
112        });
113    }
114}
115
116/// A future that will resolve once a signal is sent.
117#[must_use = "futures do nothing unless you `.await` or poll them"]
118pub struct ReceiveFuture<'ch, T> {
119    signaling: &'ch Signal<T>,
120}
121
122impl<T> Future for ReceiveFuture<'_, T> {
123    type Output = T;
124
125    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
126        self.signaling.poll_wait(cx)
127    }
128}
129
130#[cfg(test)]
131mod tests {
132
133    use super::*;
134    use static_cell::StaticCell;
135
136    #[test]
137    fn future_returns() {
138        static SIGNAL: StaticCell<Signal<u8>> = StaticCell::new();
139        let signal = SIGNAL.init(Signal::new());
140        let future = signal.wait();
141
142        let wanted = 42u8;
143
144        embassy_futures::block_on(async {
145            embassy_futures::join::join(
146                async {
147                    signal.signal(wanted);
148                },
149                async {
150                    assert_eq!(future.await, wanted);
151                },
152            )
153            .await;
154        });
155    }
156
157    #[test]
158    fn manual_poll() {
159        static SIGNAL: StaticCell<Signal<u8>> = StaticCell::new();
160        let signal = &*SIGNAL.init(Signal::new());
161
162        let mut receive_future = signal.wait();
163        let wanted = 31;
164
165        // arbitrary amount of polling, should always return Poll::Pending
166        assert_eq!(
167            embassy_futures::poll_once(&mut receive_future),
168            Poll::Pending
169        );
170        assert_eq!(
171            embassy_futures::poll_once(&mut receive_future),
172            Poll::Pending
173        );
174
175        signal.signal(wanted);
176
177        assert_eq!(
178            embassy_futures::poll_once(receive_future),
179            Poll::Ready(wanted)
180        );
181    }
182
183    #[test]
184    fn override_value() {
185        static SIGNAL: StaticCell<Signal<u8>> = StaticCell::new();
186        let signal = &*SIGNAL.init(Signal::new());
187        let future = signal.wait();
188        let wanted = 42u8;
189
190        signal.signal(2);
191        signal.signal(wanted);
192
193        assert_eq!(embassy_futures::block_on(future), wanted);
194    }
195
196    #[test]
197    fn clear_value() {
198        static SIGNAL: StaticCell<Signal<u8>> = StaticCell::new();
199        let signal = &*SIGNAL.init(Signal::new());
200        let mut receive_future = signal.wait();
201
202        // arbitrary amount of polling, should always return Poll::Pending
203        for _ in 0..10 {
204            assert_eq!(
205                embassy_futures::poll_once(&mut receive_future),
206                Poll::Pending
207            );
208        }
209
210        signal.signal(3);
211
212        signal.clear();
213
214        // arbitrary amount of polling, should always return Poll::Pending
215        for _ in 0..10 {
216            assert_eq!(
217                embassy_futures::poll_once(&mut receive_future),
218                Poll::Pending
219            );
220        }
221    }
222}