stream.hpp

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#pragma once


#include <felspar/coro/allocator.hpp>
#include <felspar/coro/coroutine.hpp>
#include <felspar/memory/holding_pen.hpp>


namespace felspar::coro {


    template<typename Y, typename H>
    class stream_awaitable;
    template<typename Y, typename Allocator>
    struct stream_promise;


    template<typename Y, typename Allocator = void>
    class stream final {
        friend struct stream_promise<Y, Allocator>;
        using handle_type = typename stream_promise<Y, Allocator>::handle_type;
        handle_type yielding_coro;

        stream(handle_type h) : yielding_coro{std::move(h)} {}

      public:
        using value_type = Y;
        using optional_type = memory::holding_pen<value_type>;
        using promise_type = stream_promise<value_type, Allocator>;

Not copyable

32
33
        stream(stream const &) = delete;
        stream &operator=(stream const &) = delete;

Movable

35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
        stream(stream &&t) noexcept = default;
        stream &operator=(stream &&t) noexcept = default;
        ~stream() = default;

        stream_awaitable<Y, handle_type> next();
    };


    template<typename Y, typename Allocator>
    struct stream_promise : private promise_allocator_impl<Allocator> {
        using promise_allocator_impl<Allocator>::operator new;
        using promise_allocator_impl<Allocator>::operator delete;

        coroutine_handle<> continuation = {};
        bool completed = false;
        memory::holding_pen<Y> value = {};
        std::exception_ptr eptr = {};

        using handle_type = unique_handle<stream_promise>;

        auto yield_value(Y y) {
            value.assign(std::move(y));
            return symmetric_continuation{std::exchange(continuation, {})};
        }

        void unhandled_exception() {
            eptr = std::current_exception();
            completed = true;
            value.reset();
        }
        void return_void() {
            completed = true;
            value.reset();
        }

        auto get_return_object() {
            return stream<Y, Allocator>{handle_type::from_promise(*this)};
        }

        auto initial_suspend() const noexcept { return suspend_always{}; }
        auto final_suspend() const noexcept {
            return symmetric_continuation{continuation};
        }
    };

The awaitable type

 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
    template<typename Y, typename H>
    class stream_awaitable {
        friend class cancellable;

      public:
        stream_awaitable(H &c) : continuation{c} {}

        bool await_ready() const noexcept {
            return continuation.promise().completed;
        }
        auto await_suspend(coroutine_handle<> awaiting) noexcept {
            continuation.promise().continuation = awaiting;
            return continuation.get();
        }
        memory::holding_pen<Y> await_resume() {
            if (auto eptr = continuation.promise().eptr) {
                std::rethrow_exception(eptr);
            } else {
                return std::move(continuation.promise().value).transfer_out();
            }
        }

      private:
        H &continuation;
    };
    template<typename Y, typename A>
    inline stream_awaitable<Y, typename stream<Y, A>::handle_type>
            stream<Y, A>::next() {
        return {yielding_coro};
    }

Can be used to pipe streams into other streams

115
116
117
118
119
120
121
    template<typename Y, typename YA, typename X, typename XA>
    inline X operator|(stream<Y, YA> &&s, X (*c)(stream<Y, XA>)) {
        return c(std::move(s));
    }


}