Saturday, 6 October 2012

On List Comprehension in C++

Many programming languages take inspiration from the language of mathematics and emulate Set-builder Notation. Take for instance the following set, specified by the set-builder notation:





Haskell, for example, has List Comprehension syntax which allows for expressing the above set as:

[x^2 | x <- [0..], x > 5]

C# supports LINQ, which borrows SQL notation to express the same idea. C++ does not have syntactic support for expressing set-builder notation but numerous attempts by library authors have tried to fill the gap. Take a look at this StackOverflow thread for links to projects emulating LINQ in C++. Others have noted that Boost.Range can already be used for this task. The following code expresses the aforementioned set (at least for a subset of natural numbers):

using namespace boost;

auto sq = counting_range(0, 1000)
        | filtered([](int x) { return x > 5; })
        | transformed([](int x) { return x*x; });

There is one place though, where Boost.Range approach falls short. Set-builder notation can be used with more than one variable:



In this case, a Cartesian product of the sets is taken and the elements of that product are "filtered" and "transformed". The Range library, however, does not provide a way to generate the Cartesian product. So let's look at what it would take to offer such functionality. This will give us a chance to not only experiment with Boost.Range but also play around with variadic templates.

Since we'll be making ample use of C++11's variadic templates, it's important to understand them. If you need a good primer, the best thing is to watch Andrei's Alexandrescu's Variadic Templates are Funadic (or at least the first 30 mins of it).

The Goal

Let's begin by looking at what we are after:
std::vector<int> xx = { 1, 2, 3 };
std::vector<char> yy = { 'a', 'b', 'c' };
std::vector<double> zz = { 0.1, 0.2, 0.3 };

auto r = cartesian(xx, yy, zz)
       | xfiltered([](int x, char y, double z) { return x > 1 && y < 'c'; })
       | xtransformed([](int x, char y, double z) { return x + int(y) + z; });
cartesian() function takes any number of ranges and returns a range of std::tuple's. In the example above, cartesian(xx, yy, zz) returns a range of std::tuple<int&, char&, double&> and will have nine elements: (1, 'a', 0.1), (1, 'a', 0.2), (1, 'a', 0.3), (1, 'b', 0.1), and so on. xfiltered() and xtransformed() are analogous to filtered() and transformed() except that they allow the lambda to accept multiple arguments instead of a single tuple. Without them, the code would look like this:
auto r = cartesian(xx, yy, zz)
       | filtered([](std::tuple<int&, char&, double&> x) { return std::get<0>(x) > 1 && std::get<1>(x) < 'c'; })
       | transformed([](std::tuple<int&, char&, double&> x) { return std::get<0>(x) + int(std::get<1>(x)) + std::get<2>(x); });

Boost Ranges

Boost Ranges aim to raise the level of abstraction when dealing with sequences by providing a single object representing an interval. However, unlike ranges proposed by Alexandrescu in Iterators Must Go keynote, Boost Ranges are a leaky abstraction. They leak the underlying iterators by requiring the range to expose them via boost::begin(rng) and boost::end(rng). This requirement will force us to define a cartesian_iterator and demonstrate the downside of the leak. The principal advantage of leaking the iterators though is the ability to inter-operate with existing algorithms designed to work with iterators.

Getting Started

Before getting into the details of the cartesian_iterator that will do all of the heavy-lifting, let's look at the big picture:
using namespace boost;

// To be filled in later
template <typename... Rs>
class cartesian_iterator;

// Boost.Range provides iterator_range class which constructs a range
// from a begin and end iterator pair
template <typename... Rs>
using cartesian_range = iterator_range<cartesian_iterator<Rs...>>;

// Our "public" function that takes any number of ranges and
// constructs a cartesian_range
template <typename... Rs>
typename cartesian_range<Rs...>::type cartesian(Rs&... rs) {
    typedef cartesian_iterator<Rs...> iter_t;
    return cartesian_range<Rs...>(iter_t(rs...), iter_t(rs..., 0));
}

The first step in defining the cartesian_iterator is to decide what will be its value_type. As I mentioned above, it will be the std::tuple of references to the types of the ranges. The code below defines a meta-function to extract the reference type of a range and then uses it to define the value_type of cartesian_iterator:
template <typename R>
struct range_reference {
    typedef typename boost::range_iterator<R>::type iter;
    typedef typename iter::reference type;
};

template <typename... Rs>
struct value_type {
    typedef std::tuple<
        typename range_reference<Rs>::type...
    > type;
};
To ease the task of writing an iterator, we'll be using Boost.Iterators, in particular iterator_facade class. We just have to derive from it and implement 3 functions. Here's what the derivation looks like:
template <typename... Rs>
class cartesian_iterator : public boost::iterator_facade<
    cartesian_iterator<Rs...>,  // pass self per CRTP
    typename value_type<Rs...>::type,  // value_type
    boost::forward_traversal_tag,  // iterator category
    typename value_type<Rs...>::type  // reference -- same as value_type!
>
Note that the reference type will be std::tuple<...> and not std::tuple<...>&. The reason being is that when dereferencing, we'll return a temporary std::tuple<...> since the cartesian_range is not stored in memory per se. Obviously, returning a temporary from a function with a reference return type is a bad idea.

Next stop -- deciding what data members will be needed in the iterator. Consider what happens when asked to generate a cartesian_range over two ranges -- {1, 2} and {'a', 'b'}. We are going to use two iterators, one pointing into each sequence and emulating a nested for-loop. On each iteration we advance the iterator over the {'a', 'b'}. Once the iterator advances past 'b', we reset it back to the beginning to point to 'a' again and advance the iterator over {1, 2}.

The analysis leads us to conclude that we will (a) need a tuple of iterators into the underlying ranges and (b) a tuple of references to the underlying ranges for resetting the iterators back to the beginning and comparing with the range ends. This demonstrates a weakness of Boost Ranges: our iterator is forced to keep references to the underlying ranges. Since the cartesian_range has two iterators (begin and end), this approach wastes memory. If the a range was a primitive (as in Alexandrescu's ranges), we could save on the extra references. Back to the code -- data members, constructors and equality check:

std::tuple<typename boost::range_iterator<Rs>::type... > iters;
std::tuple<Rs&...> ranges;

cartesian_iterator() {}

// used to construct the begin iterator
cartesian_iterator(Rs&... rs) :
    ranges(rs...), iters(boost::begin(rs)...) {}

// used to construct the end iterator
cartesian_iterator(Rs&... rs, int) :
    ranges(rs...), iters(boost::end(rs)...) {}

// called by iterator_facade's impl of oprerator==
bool equal(cartesian_iterator const& other ) const {
    return iters == other.iters;
}

With the easy parts are out of the way, let's tackle the iterator's increment functionality. Remember, we need to simulate the nested for loops but in a functional manner:
template <size_t N>
using const_int = std::integral_constant<size_t, N>;

// called by iterator_facade's impl of operator++
void increment() {
    increment(const_int<sizeof...(Rs) - 1>());
}

// helpers
template <size_t N>
bool increment(const_int<N>) {
    if( ++(std::get<N>(iters)) == boost::end(std::get<N>(ranges)) ) {
        if( !increment(const_int<N-1>()) )
            return false;
        std::get<N>(iters) = boost::begin(std::get<N>(ranges));
    }
    return true;
}

// base case
bool increment(const_int<0>) {
    return ++(std::get<0>(iters)) != boost::end(std::get<0>(ranges));
}
For any given iterator (starting with the last one), we increment it and if it reached the end, we recursively call ourselves to increment the previous iterator. If we are not at the very end, we reset the given iterator back to the beginning of the range.

Function application with a tuple

The last thing to do is to take care of the dereferencing. Before we proceed though, we need to sidestep and look at a problem that I think will often come up with variadics. I speak of calling a function with the arguments stored in a std::tuple. For example, if I have args of type std::tuple<int, char, double> and I want to call "void foo(int, char, double)" with args' elements. It's trivial to do so in this case -- foo(std::get<0>(args), std::get<1>(args), std::get<2>(args)) -- but less so generically with the number of arguments not fixed. This StackOverflow thread's accepted answer provides the best way of doing so.

The main idea is for a tuple of size N to generate a sequence of type seq<0, 1, 2, ..., N-1>. Then call a helper function that will capture the integral sequence in its parameter pack and expand it into many calls to std::get<>:

// the sequence type
template<size_t...>
struct seq { };

// meta-function to generate seq<0, 1, ..., N-1>
template<size_t N, size_t ...S>
struct gens : gens<N-1, N-1, S...> { };

template<size_t ...S>
struct gens<0, S...> {
    typedef seq<S...> type;
};

// accepts a tuple and returns seq<0, 1, ..., N-1>
template <typename... Ts>
typename gens<sizeof...(Ts)>::type tuple_indices(std::tuple<Ts...> const&) {
    return typename gens<sizeof...(Ts)>::type();
};

// helper that captures indices into a parameter pack and invokes f
template<typename F, typename Args, size_t ...Indices>
auto call_func(F&& f, Args&& args, seq<Indices...>) -> decltype(f(std::get<Indices>(args)...)) {
    return f(std::get<Indices>(args)...);
}

// takes function f and tuple args and invokes f with args
template <typename F, typename Args>
auto invoke(F&& f, Args&& args) -> decltype(call_func(std::forward<F>(f), std::forward<Args>(args), tuple_indices(args))) {
    return call_func(std::forward<F>(f), std::forward<Args>(args), tuple_indices(args));
}

I think this problem will come up so often that invoke() should become part of the Standard Library. But as you'll see in a moment, it is important to understand the technique, since invoke() will not always work for all problems of this type.

Back to Dereference

The dereference() function needs to construct a std::tuple out of the references returned by dereferencing each of the iterators. The problem here is subtly different from one solved by invoke() above. First, instead of a function (or callable), we are invoking a constructor (albeit this can by solved by using std::make_tuple). Second, our tuple does not contain values to be invoked with. Rather, each of those values (iters tuple contains iterators) needs to be dereferenced first. Fortunately, the technique still applies:

// invoked by iterator_facade's impl of operator*()
typename value_type<Rs...>::type dereference() const {
    return dereference(tuple_indices(iters));
}

// helper
template <size_t... Indices>
typename value_type<Rs...>::type dereference(seq<Indices...>) const {
    typedef typename value_type<Rs...>::type result_t;
    return result_t(*std::get<Indices>(iters)...);
}

We have now implemented all the necessary bits for a Forward Iterator.

Defining Boost.Range Adaptors

All that is left is to define xfiltered and xtransformed adaptors. It's a pretty simple process described here. First, we define a polymorphic functor (expander) that will just call invoke() defined earlier. The rest of the code wraps the user passed function inside the expander and passes it on to filter and transform adaptors. Everything else is just boilerplate specified by the Boost.Range's documentation:

template <typename F>
struct expander {
    F f;

    template <typename... Args>
    typename std::result_of<F(Args...)>::type operator()(std::tuple<Args...> tup) const {
        return invoke(f, tup);
    }
};

// --- xfiltered ---
template< class T >
struct xfilter_holder : boost::range_detail::holder<T> {
    xfilter_holder( T r ) : boost::range_detail::holder<T>(r) { }
};

template< class InputRng, class Pred >
boost::filtered_range<expander<Pred>, const InputRng>
operator|( const InputRng& r, const xfilter_holder<Pred>& f ) {
    return boost::filtered_range<expander<Pred>, const InputRng>( expander<Pred>{f.val}, r ); 
}

const boost::range_detail::forwarder<xfilter_holder> xfiltered = boost::range_detail::forwarder<xfilter_holder>();

// --- xtransformed---
template< class T >
struct xtransform_holder : boost::range_detail::holder<T> {
    xtransform_holder( T r ) : boost::range_detail::holder<T>(r) { }
};

template< class InputRng, class F >
boost::transformed_range<expander<F>, const InputRng>
operator|( const InputRng& r, const xtransform_holder<F>& f ) {
    return boost::transformed_range<expander<F>, const InputRng>( expander<F>{f.val}, r );
}

const boost::range_detail::forwarder<xtransform_holder> xtransformed = boost::range_detail::forwarder<xtransform_holder>();

Conclusion

Boost.Range provides useful blocks that can be put together to express set-builder notation of one variable. Defining a cartesian_range allows for generalizing the solution to any number of variables. We also saw that the leaky nature of Boost Ranges make it awkward to define some types of ranges/iterators. Variadic templates and std::tuple are great additions to C++ but the Standard Library could benefit from an invoke() function.

Friday, 7 September 2012

Iterating over a text file

It's a simple task -- you want iterate over all the lines in a file and perform an action on each one. In Python it's as simple as:
for line in open("somefile.txt"):
    print line
How about C++11. How hard can it be? This stackoverflow post gives us a starting point. We first define an iterator:
struct line_t : std::string {
     friend std::istream & operator>>(std::istream& is, line_t& line) {
         return std::getline(is, line);
     }
};
typedef std::istream_iterator<line_t> line_iterator;
Boost.Range packages a pair of begin/end iterators into a range which makes the code more elegant. It defines all those algorithms found in std namespace in terms of ranges. So we'll give ranges a try and hope it improves our style. Since std::pair of iterators is a valid Boost.Range range, let's define a line_range in terms of that and also a helper function to construct the range:
typedef std::pair<line_iterator, line_iterator> line_range;

line_range lines(std::ifstream& is) {
    return line_range(line_iterator(is), line_iterator());
}
Now we can use Boost.Range's for_each to iterate over a file:
std::ifstream file("somefile.txt");
boost::for_each(lines(file), [](std::string const& line) {
    std::cout << line << std::endl;
});
Not bad. Not counting the very last line, it's 3 lines of code. One more than Python. Let's see if we can get it down to two lines by constructing the ifstream object as a temporary:
boost::for_each(lines(std::ifstream("somefile.txt")), [](std::string const& line) {
    std::cout << line << std::endl;
});
g++ -std=c++0x iter.cpp
iter.cpp: In function ‘int main()’:
iter.cpp:97:54: error: invalid initialization of non-const reference of type ‘std::ifstream& {aka std::basic_ifstream<char>&}’ from an rvalue of type ‘std::ifstream {aka std::basic_ifstream<char>}’
iter.cpp:82:12: error: in passing argument 1 of ‘line_range lines(std::ifstream&)’
That doesn't work. Of course -- passing a temporary into a function that takes a non-const reference will not work. Unfortunately we can't change it to a const reference since istream_iterator's constructor takes a non-const reference. Fortunately we can deal with this situation by taking it as an rvalue reference:
line_range lines(std::ifstream&& is) {
    return line_range(line_iterator(is), line_iterator());
}
This will happily compile and work but we just broke the previous usage. That is because an rvalue reference won't bind to an lvalue. We can do two things to solve this problem. First is to provide two overloads:
line_range lines(std::ifstream& is) {
    return line_range(line_iterator(is), line_iterator());
}

line_range lines(std::ifstream&& is) {
    return lines(is);
}
The second overload ends up calling the first one because an rvalue reference is itself an lvalue! See the tutorial by Thomas Becker to learn everything you'll ever need about rvalue references. The other way is to use the mechanism employed by std::forward:
template <typename S>
line_range lines(S&& is) {
    return line_range(line_iterator(is), line_iterator());
}
A call with lvalue will bind S to std::ifstream& and with && will collapse to just std::ifstream&. A call with rvalue will bind S to std::ifstream and the whole thing will be std::ifstream&&. Again, see Becker's Section 8 for more detail.
Presto! We got two perfectly good ways to make it work with temporaries or not.

Room for Improvement: Using range-based-for

As much as I love lambdas and functional programming, there is a lot to be said for a good old for loop. And with C++11 we have the new range-based-for (aka foreach). I'd rather write this:
for( auto& line: lines(std::ifstream("somefile.txt")) )
    std::cout << line << std::endl;
In order for the range-based-for to work we must expose begin/end iterators via one of two ways. Either our line_range needs to have begin() and end() methods (just like STL containers) or we need to have free functions with same names such that ADL can pick them up. Since Boost.Range defines such free functions (boost::begin() and boost::end()), all we need to do is dump them into our namespace:
using boost::begin;
using boost::end;

std::ifstream file("somefile.txt");
for( auto& line: lines(file) )
    std::cout << line << std::endl;
This works great but unfortunately this does not (although it compiles):
for( auto& line: lines(std::ifstream("somefile.txt")) )
    std::cout << line << std::endl;
The reason being is that the compiler transforms this loop [stmt.ranged/1] into:
auto&& __range = lines(std::ifstream("somefile.txt"));
// regular for loop using iterators returned by begin(__range) and end(__range)
lines() returns the instance of line_range by value but its lifetime is extended to that of a the __range reference. However the lifetime of std::ifstream temporary is not extended! It is destroyed before the loop is even started. It's a problem that didn't exist when we were using boost::for_each since the whole iteration was all one statement. See [class.temporary/5] for details.
How should we fix it? Since the line_range temporary lives on for the duration of the loop, we need to move the instance of std::ifstream into it. For the cases where std::ifstream is an lvalue rather than a temporary, we can just store a reference to it. Let's get started.

Start off by redefining line_range as a class, ditching the std::pair of iterators:
template <typename T>
class line_range {
    T istr;
public:
    line_range(T&& is) : istr(std::forward<T>(is)) {}
    line_iterator begin() { return line_iterator(istr); }
    line_iterator end() { return line_iterator(); }
};

We need to rig this up so that T=std::ifstream if a temporary is used and T=std::ifstream& otherwise. Well, we know how to this! You did read Becker's Section 8, didn't you?!
It's as simple as (albeit syntax is not simple):
template <typename S>
auto lines(S&& is) -> decltype(line_range<S>(std::forward<S>(is))) {
    return line_range<S>(std::forward<S>(is));
}
It's an example of the Perfect Forwarding problem -- we just forward rvalueness/lvalueness through to the line_range and end up either moving the object or storing the reference to it. As a very last step, we have to slightly modify the line_range to once again accommodate Boost.Range. Complete code is shown below:
#include <iostream>
#include <fstream>
#include <boost/range/algorithm/for_each.hpp>

struct line_t : std::string {
     friend std::istream & operator>>(std::istream& is, line_t& line) {
         return std::getline(is, line);
     }
};

typedef std::istream_iterator<line_t> line_iterator;

template <typename T>
class line_range {
    T istr;
    line_iterator b;

public:
    typedef line_iterator iterator;
    typedef line_iterator const_iterator;

    line_range(T&& is) :
        istr(std::forward<T>(is)),
        b(istr)
    {}

    line_iterator begin() const { return b; }
    line_iterator end() const { return line_iterator(); }
};

template <typename S>
auto lines(S&& is) -> decltype(line_range<S>(std::forward<S>(is))) {
    return line_range<S>(std::forward<S>(is));
}

int main()
{
    std::ifstream file("somefile.txt");
    for( auto& line: lines(file) )
        std::cout << line << std::endl;

    for( auto& line: lines(std::ifstream("somefile.txt")) )
        std::cout << line << std::endl;

    std::ifstream file2("somefile.txt");
    boost::for_each(lines(file2), [](std::string const& line) {
        std::cout << line << std::endl;
    });

    boost::for_each(lines(std::ifstream("somefile.txt")), [](std::string const& line) {
        std::cout << line << std::endl;
    });

    return 0;
}

Saturday, 4 August 2012

Thoughts on Steve Jobs Biography

I recently finished reading biography of Steve Jobs by Walter Isaacson. It's a great book that shows Jobs not only from his side but also as perceived by his friends and enemies. After reading the book, I can certainly say that Steve Jobs was a complicated person. I can't say that I can admire him but there are definitely traits in him that appeal to me.

I consider myself a geek. I like technology. I like taking it apart, whether it's upgrading a part in my computer or studying the source of an open source project. By this account, Apple products should not appeal to me. After all, you can't even replace the battery in most of its devices. And yet I own an iPhone, an iPod and MacBook Air (I do also have a Linux desktop). I have these Apple devices because I love their design. I love how they look -- the body, the interface, the little click the magnetic plug of the charger makes as it latches onto my MacBook. When I talk to other geeks about Apple, some mock me for indulging in this "trivial" stuff. They think that people should not care about this fluff and should only care about how many GHz or GB the machine has. Or at least geeks shouldn't. They say that Apple is not a technology company, it's an industrial design company, making it sound like an insult. And yet many of these geeks admire cars like Mercedes, BMW or Ferrari. And while they appreciate the horsepower of the engine or quality workmanship, they love to talk and adore the car's design. A car is a piece of technology. Why is it ok for geeks to admire its design but not the design of a notebook or a phone?

And so I admire Steve Jobs for getting it that hi-tech should be beautiful and not just functional. And I also admire his desire to have everything work by having everything be integrated. Who doesn't like it when "it just works". But what I don't agree with is his assertion that it can only be achieved with a closed system. In fact he demonstrated that it doesn't have to be. Because Jobs would insist that Apple focus on only a few things at a time, they avoided making anything that would not be "absolutely great". For example, under his watch Apple was not in the printer business. And yet printers work pretty well with Macs. Third party printers connect via open interfaces (USB, LPD, IPP) and work just fine with PCs and Macs (unfortunately not Linux). Another example is iPhone. When it launched, Apple did not the capacity do its own maps. So while it developed the app, it used Google's data to power it. That was only possible because Google's maps were open.

So I hope many companies will copy Apple and pay more attention to design, simplicity and focus, but not embrace its close-system philosophy. I hope companies will embrace standards, open formats and open source. But I hope they will do so without sacrificing the ease of use and the "just works" philosophy. I truly believe they are not mutually exclusive.

Sunday, 22 July 2012

More fun with Intel TBB and Boost.Proto (and Boost.Fusion)

In my last post, we developed an EDSL for specifying TBB Flow Graph structure with the help of Boost.Proto. Instead of writing a series of tbb::flow::make_edge calls, we can use a little language to help us be more declarative. In the process, however, we lost some of the runtime efficiency. Instead of just invoking make_edge() a number of times, our solution uses loops and temporary data structures (e.g. std::set) which introduce runtime overhead. In this post, we will look at how to modify what we did last time to create a meta-program that will enable us to remove the unnecessary runtime overhead.

Boost.Fusion

Fusion library sits somewhere between Boost.MPL and STL. Like MPL, it provides heterogeneous data structures but unlike MPL it is able to operate at runtime. It also is a great tool for certain meta-programming tasks. For example, suppose we wanted to generate code that would invoke foo() multiple times, each with a different value (and maybe type). With Fusion, it is as easy as putting those values into a fusion::vector and then calling fusion::for_each() with the vector and a polymorphic functor:
void foo(int);
void foo(double);

struct call_foo {
    template <typename T>
    void operator()(T x) const {
        foo(x);
    }
};

int main() {
    boost::fusion::vector<int, double, int> v(1, 2.3, 4);
    boost::fusion::for_each(v, call_foo());
    return 0;
}

Once the program is compiled with optimizations enabled, all that remains are just foo() invocations (I ran the output generated via -s flag through c++filt to demangle the names):
main:                          # @main
    .cfi_startproc
# BB#0:
    pushq %rax
.Ltmp1:
    .cfi_def_cfa_offset 16
    movl $1, %edi
    callq foo(int)
    movsd .LCPI0_0(%rip), %xmm0
    callq foo(double)
    movl $4, %edi
    callq foo(int)
    xorl %eax, %eax
    popq %rdx
    ret

Since what we are trying to do is generate a bunch of make_edge() calls, this seems like exactly what we need.

Putting it all together

The basic idea is very simple. Replace the use of std::set with boost::fusion::vector when keeping track of senders and receivers in a compound node. We begin by re-defining compound_node to bundle up two fusion vectors. I replaced the use of std::tuple with a struct to make the code more readable and also added a utility function to make the construction easier:
template <typename Senders, typename Receivers>
struct compound_node {
    typedef Senders senders_t;
    typedef Receivers receivers_t;

    senders_t senders;
    receivers_t receivers;
};

template <typename S, typename R>
compound_node<S, R> mk_compound_node(S s, R r) {
    return compound_node<S, R>{s, r};
}

Now it's time to modify the make_compound_node transform that is invoked by Proto everytime it meets a terminal:
struct make_compound_node : proto::callable {
    typedef compound_node<
        fusion::vector<flow::sender<flow::continue_msg> *>,
        fusion::vector<flow::receiver<flow::continue_msg> *>
    > result_type;

    result_type operator()(flow::sender<flow::continue_msg>& s, flow::receiver<flow::continue_msg>& r) {
        return mk_compound_node(
            fusion::vector<flow::sender<flow::continue_msg> *>(&s),
            fusion::vector<flow::receiver<flow::continue_msg> *>(&r)
        );
    }
};

Next up is the join transform that is applied to process the + operator. Recall that all it has to do is create a new compound node by joining the senders and receivers of the left and right hand sides. To join two Fusion vectors, we use fusion::join function. Unfortunately, it returns not a new vector but a view which contains references to the original sequences it was asked to join. Because we pass everything by value and have tons of temporaries, this is a recipe for disaster. So we force Fusion to create a new vector by passing the result of join to fusion::as_vector. The messy part turns out to be the return type. Remember, Proto transforms have to comply with (TR1/C++11) result_of protocol. While previously we got away by using the simple method of typedef'ing a result_type, this time our return type depends on the types of arguments. We have to define a meta-function by specializing the result<Sig> struct. Luckily, Fusion library also supports a uniform way of computing result types of its functions (albeit slightly different from the standard approach). All of its functions have a meta-function with the same name in the fusion::result_of namespace.
struct join : proto::callable {
    template <typename Sig>
    struct result;

    template <typename This, typename LeftNode, typename RightNode>
    struct result<This(LeftNode, RightNode)> {
        typedef typename LeftNode::senders_t left_senders_t;
        typedef typename LeftNode::receivers_t left_receivers_t;
        typedef typename RightNode::senders_t right_senders_t;
        typedef typename RightNode::receivers_t right_receivers_t;

        typedef compound_node<
            typename fusion::result_of::as_vector<
                typename fusion::result_of::join<left_senders_t const, right_senders_t const>::type
            >::type,
            typename fusion::result_of::as_vector<
                typename fusion::result_of::join<left_receivers_t const, right_receivers_t const>::type
            >::type
        > type;
    };

    template <typename LeftNode, typename RightNode>
    typename result<join(LeftNode, RightNode)>::type operator()(LeftNode left, RightNode right) const {
        return mk_compound_node(
                fusion::as_vector(fusion::join(left.senders, right.senders)),
                fusion::as_vector(fusion::join(left.receivers, right.receivers))
            );
    };
};

Finally we get to the meat of the problem -- implementing splice transform where actual make_edge calls are made. Just like in the join transform above, we need to adhere to result_of protocol using struct specialization. We also need to execute the double for loop: the outer loop iterates over the left-hand's senders and the inner one over the right-hand receivers. But of course we'll use fusion::for_each() twice to do the looping. We are lucky that the fusion::vectors contain homogeneous types -- they are all either flow::sender<continue_msg> or flow::receiver<continue_msg>. This makes it possible to use C++11 lambdas and keep everything in one place.
struct splice : proto::callable {
    template <typename Sig>
    struct result;

    template <typename This, typename LeftNode, typename RightNode>
    struct result<This(LeftNode, RightNode)> {
        typedef compound_node<
            typename RightNode::senders_t,
            typename LeftNode::receivers_t
        > type;
    };

    template <typename LeftNode, typename RightNode>
    typename result<splice(LeftNode, RightNode)>::type operator()(LeftNode left, RightNode right) const {
        fusion::for_each(left.senders, [&](flow::sender<flow::continue_msg>* s) {
            fusion::for_each(right.receivers, [=](flow::receiver<flow::continue_msg>* r) {
                flow::make_edge(*s, *r);
            });
        });

        return mk_compound_node(right.senders, left.receivers);
    }
};

Note that no changes were necessary to the grammar nor, of course, main(). To verify that we reached our goal, let's check out the disassembly. Since flow::make_edge is inline function, I temporarily declared a make_edge() in the global namespace with the matching signature and used that instead. And voilĂ :
    leaq    560(%rsp), %rsi
    leaq    344(%rsp), %rdi
.LEHB53:
    call    make_edge(tbb::flow::interface6::sender<tbb::flow::interface6::continue_msg>&, tbb::flow::interface6::receiver<tbb::flow::interface6::continue_msg>&)
    leaq    816(%rsp), %rsi
    leaq    600(%rsp), %rdi
    call    make_edge(tbb::flow::interface6::sender<tbb::flow::interface6::continue_msg>&, tbb::flow::interface6::receiver<tbb::flow::interface6::continue_msg>&)
    leaq    1072(%rsp), %rsi
    leaq    88(%rsp), %rdi
    call    make_edge(tbb::flow::interface6::sender<tbb::flow::interface6::continue_msg>&, tbb::flow::interface6::receiver<tbb::flow::interface6::continue_msg>&)
    leaq    560(%rsp), %rsi
    leaq    88(%rsp), %rdi
    call    make_edge(tbb::flow::interface6::sender<tbb::flow::interface6::continue_msg>&, tbb::flow::interface6::receiver<tbb::flow::interface6::continue_msg>&)
    leaq    48(%rsp), %rsi
    leaq    1336(%rsp), %rdi
    call    make_edge(tbb::flow::interface6::sender<tbb::flow::interface6::continue_msg>&, tbb::flow::interface6::receiver<tbb::flow::interface6::continue_msg>&)
    leaq    304(%rsp), %rsi
    leaq    1336(%rsp), %rdi
    call    make_edge(tbb::flow::interface6::sender<tbb::flow::interface6::continue_msg>&, tbb::flow::interface6::receiver<tbb::flow::interface6::continue_msg>&)

Saturday, 14 July 2012

Fun with Intel TBB and Boost.Proto

Intel Threading Building Blocks

As the hardware engineers cram more and more cores into the processors, software engineers are left to ponder about how to best exploit these new capabilities. Intel offers an open source C++ library they call Threading Building Blocks (TBB). Akin to competing solutions (OpenMP, GCD, PPL), the library allows the developer to break up the computation into bite size pieces called tasks. A task also has a successor (or parent) task which makes it possible to express a relationship describing who should run before whom. The library runtime then takes care of scheduling the tasks onto the physical threads. It is, however, error prone and inconvenient to program using the tasks. Thankfully, much of the programming can be expressed via higher level abstractions (especially those involving fork-join parallelism). For example, parallel_for function allows for using multiple threads to execute the iterations of the loop. Another example is parallel_pipeline function which allows for expressing pipes-and-filters kind of parallelism.

TBB also has a facility called a Flow Graph. Designed for those situations when it is best to express the concurrency as a directed graph of dependencies between message emitting nodes, Flow Graph stands almost like a separate library within the library. Let's take a look at an example from TBB's online reference. In this example, every node will print its name after it receives a completion message from the node that it depends on. Here's the dependency graph:

And the corresponding code:

#include <cstdio>
#include "tbb/flow_graph.h"

using namespace tbb::flow;

struct body {
    std::string my_name;
    body( const char *name ) : my_name(name) {}
    void operator()( continue_msg ) const {
        std::printf("%s\n", my_name.c_str());
    }
};

int main() {
    graph g;

    broadcast_node< continue_msg > start;
    continue_node<continue_msg> a( g, body("A"));
    continue_node<continue_msg> b( g, body("B"));
    continue_node<continue_msg> c( g, body("C"));
    continue_node<continue_msg> d( g, body("D"));
    continue_node<continue_msg> e( g, body("E"));

    make_edge( start, a );
    make_edge( start, b );
    make_edge( a, c );
    make_edge( b, c );
    make_edge( c, d );
    make_edge( a, e );

    for (int i = 0; i < 3; ++i ) {
        start.try_put( continue_msg() );
        g.wait_for_all();
    }

    return 0;
}

The trouble with the code above is that the graph definition is not declarative. Looking at the series of make_edge calls, it is very hard to visualize the graph. I wanted to see if the situation could be improved...

Boost.Proto

I first encountered Boost.Proto at the Extraordinary C++ conference in 2007. Eric Niebler, the author of the library, gave a presentation to an awestruck audience where he took C++ metaprogramming to a level not seen before. In 2010, Eric wrote a series (1, 2, 3, 4, 5, 6, 7, 8) of articles on the subject that were published on C++Next blog. Through this wonderful series, I was finally able to start wrapping my head around Boost.Proto. The library is actually designed for library writers, more specifically those that are interested in developing an embeddable DSL inside C++ programs. I am not going try to give a tutorial of Boost.Proto in this post. If you have not yet done so, I highly recommend reading the aforementioned articles. What I hope to do is document my experience of using Boost.Proto for this exercise.

Intel TBB meets Boost.Proto

To make the TBB's flow graph construction more declarative, let's create an EDSL for this task. Ideally the syntax would be as close to the picture of a graph as possible but that turned out to be difficult. Let's start with a simple case of few nodes in a row:


In the EDSL we will write this as a > b > c. Next, let's look at graphs where a node has more than one edge coming out of it or going into it:


 



We will write the left graph as a > (b + c) and the right one as (a + b) > c. Since when I see graphs like this, I tell to myself "a goes to b and c" and "a and b go to c", this syntax seems reasonable. And since in C++ the addition has higher precedence than greater operator, the parenthesis can be dropped: a > b + c and a + b > c. Let's use this syntax to express the graph in the original example: start > (a > e + c) + (b > c > d).

The Grammar

As I started working on defining a grammar for this mini language, I first defined it on paper using EBNF notation much like I would do if I was going to use yacc:
expr ::= expr ">" group | group
group ::= group "+" node | "(" expr ")" | node
node ::= broadcast_node | continue_node

After a little fumbling however, I realized  that Proto grammars can be much simpler. Since Proto expressions are C++ expressions, one does not have to worry about introducing parenthesis into the grammar nor trying to build operator precedence and associativity into it. Thus the grammar actually becomes:

expr ::= expr ">" expr
       | expr "+" expr
       | broadcast_node
       | continue_node

A grammar like this would usually be ambiguous as it would be unclear how parse a > b > c, for example. But since C++ dictates the operator rules for us, there is no such ambiguity with Proto grammars! Written in Proto (transforms replaced with ellipses for now), it becomes:

namespace flow = tbb::flow;
namespace proto = boost::proto;
typedef flow::broadcast_node<flow::continue_msg> bcast_node;
typedef flow::continue_node<flow::continue_msg> cont_node;
typedef proto::literal<bcast_node> bcast;
typedef proto::literal<cont_node> cont;

struct grammar
    : proto::or_<
        proto::when<proto::plus<grammar, grammar>, ...>,
        proto::when<proto::greater<grammar, grammar>, ...>,
        proto::when<bcast, ...>,
        proto::when<cont, ...>
    >
{};

The Transforms

With the grammar in place, it's time to make it actually do stuff. We need to put in semantic actions, or transforms as Proto calls it, next to each grammar rule. In TBB, each node is both a sender and receiver of messages (and is derived from sender<T> and receiver<T>). As we combine the nodes together using the two operators, we produce a compound node which has some of the nodes inside of it acting as its senders and some as receivers. For example, suppose we have an expression (a > b) > (c > d > e) and we have independently build up two compound nodes, (a > b) and (c > d > e). We now join the two using the > operator. The > operator splices together the left-hand's senders with right-hand's receivers. In the diagrams below, the box  represents the compound node, the receivers are colored blue, the senders are colored red and nodes acting as both receivers and senders are colored purple. For the example above, the following diagram illustrates the before and after the splice. Note that while 'a' is sending messages to 'b', it is actually the receiver of the compound node.




The + operator joins the two compound nodes by combining the left-hand's receivers with right-hand's receivers and left-hand's senders with right-hand's senders. Again, suppose for the expression (a > b) + (c + d) we are joining (a > b) with (c + d). The following diagram shows the before and after:



Finally, let's go back to the > operator for the case when there are multiple senders being spliced to multiple receivers. In that case we splice each of the lefthand's senders to each of the right-hand's receivers. Thus, executing > operator on expressions (a + b) and (c + d + e) results in the following graph:


We now have all the pieces to start looking at code. Here are some typedefs:

typedef std::set<flow::sender<flow::continue_msg>*> sender_set;
typedef std::set<flow::receiver<flow::continue_msg>*> receiver_set;
typedef std::tuple<sender_set, receiver_set> compound_node;

Next up is the easiest of the transforms. When we encounter the terminals, broadcast_node or continue_node, we make a compound_node that only has that node inside of it:

struct make_compound_node : proto::callable
    typedef compound_node result_type;

    compound_node operator()(flow::sender<flow::continue_msg>& s, flow::receiver<flow::continue_msg>& r) {
        return compound_node({&s}, {&r});
    }
};

struct grammar
    : proto::or_<
        proto::when<proto::plus<grammar, grammar>, ...>,
        proto::when<proto::greater<grammar, grammar>, ...>,
        proto::when<bcast, make_compound_node(proto::_value, proto::_value)>,
        proto::when<cont, make_compound_node(proto::_value, proto::_value)>
>
{};

Next, we take care of + operator. Remember, we just need to join the two senders' sets into one and do the same for the receivers' sets:

struct join : proto::callable {
    typedef compound_node result_type;
    compound_node operator()(compound_node left, compound_node right) const {
        sender_set& senders = std::get<0>(left);
        senders.insert(std::get<0>(right).begin(), std::get<0>(right).end());

        receiver_set& receivers = std::get<1>(left);
        receivers.insert(std::get<1>(right).begin(), std::get<1>(right).end());

        return compound_node(std::move(senders), std::move(receivers));
    };
};

struct grammar
    : proto::or_<
        proto::when<proto::plus<grammar, grammar>, join(grammar(proto::_left), grammar(proto::_right))>,
        proto::when<proto::greater<grammar, grammar>, ...>,
        proto::when<bcast, make_compound_node(proto::_value, proto::_value)>,
        proto::when<cont, make_compound_node(proto::_value, proto::_value)>
>
{};

And finally the > operator:

struct splice : proto::callable {
    typedef compound_node result_type;

    compound_node operator()(compound_node left, compound_node right) const {
        for( auto s: std::get<0>(left) )
            for( auto r: std::get<1>(right) )
                flow::make_edge(*s, *r);

        sender_set& senders = std::get<0>(right);
        receiver_set& receivers = std::get<1>(left);

        return compound_node(std::move(senders), std::move(receivers));
    }
};

struct grammar
    : proto::or_<
        proto::when<proto::plus<grammar, grammar>, join(grammar(proto::_left), grammar(proto::_right))>,
        proto::when<proto::greater<grammar, grammar>, splice(grammar(proto::_left), grammar(proto::_right))>,
        proto::when<bcast, make_compound_node(proto::_value, proto::_value)>,
        proto::when<cont, make_compound_node(proto::_value, proto::_value)>
>
{};

The only thing that is left is main() demonstrating the usage:

int main() {

    flow::graph g;

    bcast start(g);
    cont a = cont_node( g, body("A"));
    cont b = cont_node( g, body("B"));
    cont c = cont_node( g, body("C"));
    cont d = cont_node( g, body("D"));
    cont e = cont_node( g, body("E"));

    auto expr =
        start > (a > e + c)
              + (b > c > d);

    grammar()(expr);

    proto::value(start).try_put( flow::continue_msg() );
    g.wait_for_all();

    return 0;
}

A big thanks goes to Neil Groves for reviewing this article.

Friday, 16 March 2012

Deferring execution

Go language has defer statements which allow for postponing execution of a function or method call until the end of the current function. Similarly, D has scope guard statements which allow statement execution to be delayed until the end of the scope (optionally specified to only execute under successful or failed scenarios).

Go
D
lock(l)
// unlocking happens before
// surrounding function returns
defer unlock(l)

lock(l);
// unlock on leaving the scope
scope(exit) unlock(l);

This is similar to 'finally' statements that are used in languages like Java and Python. In C++, we have our trusty RAII technique to execute clean up code at the end of scope. But for the cases when that is not convenient, boost provides ScopeExit library to accomplish something akin to D:

lock(l);
BOOST_SCOPE_EXIT( (&l) )
{
    unlock(l);
} BOOST_SCOPE_EXIT_END

If you look at Boost.ScopeExit source, it is filled with enough macro magic to make your head spin. The upside is that it works with C++03. But with C++11 and lambdas, we can build the same functionality and more with easy to read, macro free code.

The basic idea is to put deferred code into a lambda function and then store the lambda in a class that will execute it when it destructs. Let's start with this:

template <typename F>
struct deferrer {
    F fun;
    ~deferrer() { fun(); }
};

template <typename F>
deferrer<F> defer(F const& f) {
    return deferrer<F>{ f };
}
With this simple class (struct for now) and helper function, we can defer code like this:
void foo() {
    lock_t l;
    lock(l);
    auto d = defer([&l] { unlock(l); });
    // ...
}

However, there are a number of problems with this code. First, if lambda function throws, we will get a throwing destructor and the program will go to terminate() in those cases when the stack was already being unwound due to an exception. We can either leave things as is and disallow deferred code to throw or surround fun() invokation in try/catch block. Since this problem plaques RAII as well, I'll just resort to the former.

Second, and more serious problem, is that deferrer<F> is copyable and since defer() returns the object by value, there will be temporaries that get constructed and destructed. Since the destructor executes our function, our deferred code will get executed multiple times and prematurely! Well, that's the worst case scenario. Most compilers now implement copy elision and so RVO will actually not make this scenario appear. But relying on a compiler optimization is a bad practice anyway.

You might be saying right now, "wait a minute, this is C++11, shouldn't the return value be moved instead of copied?". Yes, if the move constructor is available. But by defining the destructor, we opted out of move constructor (and move assignment operator) being implicitly defined for us. Remember, the C++ committee tightened the rules regarding when the implicit generation is OK. Dave Abrahams has a nice post on the topic. BTW, gcc 4.6 still generates the move constructor and I haven't tested the soon to be released 4.7.

The best thing to do here is to forbid copying and define a move constructor:
template <typename F>
class deferrer {
    F fun;
    bool enabled;

public:
    deferrer(F const& f) : fun(f), enabled(true) {}

    // move constructor
    deferrer(deferrer<F>&& rhs) : fun(rhs.fun), enabled(rhs.enabled) {
        rhs.enabled = false;
    }

    // move assignment
    deferrer<F>& operator=(deferrer<F>&& rhs) {
        if( this != &rhs ) {
            fun = rhs.fun;
            enabled = rhs.enabled;
            rhs.enabled = false;
        }
        return *this;
    }

    // no copying
    deferrer(deferrer<F> const& ) = delete;
    deferrer<F>& operator=(deferrer<F> const&) = delete;

    ~deferrer() {
        if( enabled )
            fun();
    }

    // add this as a bonus 
    void cancel() { enabled = false; }
};

Now let's turn our attention to the case when the deferred action should happen not at the end of lexical scope of where deferment was specified but at the end of the function scope or the end of some other lexical scope. For example:
void foo(const char* path, bool cleanup) {
    int fd = creat(path, S_IRUSR);
    if( cleanup ) {
        auto d = defer([path, fd]{
            close(fd);
            unlink(path);
        });
    }
    else {
        auto d = defer([fd]{ close(fd); });
    }
    // oops, closed and maybe unlinked too early!
    // ....
}

We can fix this by creating another class that we can instantiate in the lexical scope at the end of which we want the deferred action to be executed:
void foo(const char* path, bool cleanup) {
    deferred d; // deferred action will execute at the end of foo()
    int fd = creat(path, S_IRUSR);
    if( cleanup ) {
        d = defer([path, fd]{
            close(fd);
            unlink(path);
        });
    }
    else {
        d = defer([fd]{ close(fd); });
    }
    // ....
}
To implement such a beast, we'll need to use std::function to type erase the lambda into nullary function:
template <typename F>
class deferrer {
    friend class deferred;
    // remainder not changed
};

class deferred {
    std::function<void ()> fun;

public:
    deferred() = default;
    deferred(deferred const&) = delete;
    deferred& operator=(deferred const&) = delete;

    template <typename F>
    deferred(deferrer<F>&& d) {
        if( d.enabled ) {
            fun = d.fun;
            d.enabled = false;
        }
    }

    ~deferred() {
        if( fun )
            fun();
    }
};

Conclusion

While I think that the time it takes to define an RAII object is well spent, I admit that the occasional use of scope exit facility can be convenient. It is also great to see that lambda functions allow us to implement few simple utility classes that can do what other languages had to provide in the language itself.

Saturday, 3 March 2012

Recursing constexpr - Part II

In the previous post, we developed a logarithmic depth accumulate() function that can be used to emulate a for-loop. In this post, we'll look at how to construct a function that emulates a while-loop with the recursive depth of O(lg(n)) where n is the number of iterations of the while-loop. Recall that our initial motivation is to be able to determine if a large number is prime at compile time as was done in this post.

A while-loop can be generally written as follows:

state = initial state;
while( test(state) )
    mutate state;

Translating this to functional form, we can write the signature as:

template <typename T, typename Test, typename Op>
T while_loop(T init, Test test, Op op);

The main idea behind the implementation of while_loop() is to call a helper function that will execute up to n iterations of the loop. As we have seen in previous post, this can be done with recursive depth of O(lg(n)). If there are more iterations that are still needed (that is test(state) is still true), while_loop() recursively calls itself with 2n. Instead of n, the implementation uses depth argument to signify the maximum recursion depth. This is illustrated below:


The code looks like this:

template <typename T, typename Test, typename Op>
constexpr T exec(int first, int last, T init, Test test, Op op) {
    return
        (first >= last || !test(init)) ? init :
            (first + 1 == last) ?
                op(init) :
                exec((first + last) / 2, last,
                    exec(first, (first + last) / 2, init, test, op), test, op);
}

template <typename T, typename Test, typename Op>
constexpr T while_loop(T init, Test test, Op op, int depth = 1) {
    return !test(init) ? init :
        while_loop(exec(0, (1 << depth) - 1, init, test, op), test, op, depth+1);
}

There is one downside to this algorithm. The exec() function performs the test to determine if it should continue or bail at the point of entry. This performs more tests than necessary. For example, as exec() recurses along the left edge of the tree, it performs the test over and over without mutating the state. This can be fixed by introducing another helper and performing the test only on the right branch of the recursion:

template <typename T, typename Test, typename Op>
constexpr T guarded_exec(int first, int last, T init, Test test, Op op) {
    return test(init) ? exec(first, last, init, test, op) : init;
}

template <typename T, typename Test, typename Op>
constexpr T exec(int first, int last, T init, Test test, Op op) {
    return
        (first >= last) ? init :
            (first + 1 == last) ?
                op(init) :
                guarded_exec((first + last) / 2, last,
                    exec(first, (first + last) / 2, init, test, op), test, op);
}

Finally, we can use while_loop() to implement is_prime():

struct test_expr {
    int n;
    constexpr bool operator()(int x) const { return x*x <= n && n % x != 0; }
};

constexpr int inc(int x) {
    return x + 1;
}

constexpr bool gt_than_sqrt(int x, int n) {
    return x*x > n;
}

constexpr bool is_prime(int n) {
    return gt_than_sqrt(while_loop(2, test_expr{ n }, inc), n);
}

int main() {
    static_assert(is_prime(94418953), "should be prime");
    return 0;
}

Logarithmic Growth

To see that the recursion depth will be O(lg(n)) where n is the iteration count, observe that when the last iteration completes, while_loop() will have a recursive depth d and exec() will also produce depth d. Thus, the maximum recursion depth will be 2d. At the first level, exec() would have completed 1 iterations, 3 at the second, 7 at the third, and at level d. If , we need to show that



for all sufficiently large n. In that case, the recursion of depth can fit more than n iterations. By the use of geometric series formula, the sum becomes ; and since the first term is dominating, we just need to show that . Note that .

Practical Simplification

Since our initial goal was to not exceed the compiler recursion limit, we do not actually need truly logarithmic depth. It will suffice to map the loop iterations onto a tree of certain depth. For example, if we choose a depth of 30, we can have up to iterations, more than enough for most practical purposes. In this case, the while_loop() function simplifies to:

template <typename T, typename Test, typename Op>
constexpr T while_(T init, Test test, Op op) {
    return guarded_accumulate(0, (1 << 30) - 1, init, test, op);
}

Friday, 24 February 2012

Recursing a constexpr

While reading "Want speed? Use constexpr meta-programming!" blog post, I started thinking about how to reduce the recursion depth of such calculations. After all, using a compiler option to increase the recursion depth limits is not very friendly (gcc sets the limit at 512 by default). But before we get to reducing the depth of something like is_prime_func() from the aforementioned post, it's best to start with something slightly easier.

Consider a function to compute the following series:


Let's begin by writing it without constexpr to be executed at runtime but keeping in mind that we will later convert it for compile time computation. We can write it as a recursive accumulate function to gain some generality:

template <typename Int, typename T, typename BinOp>
T accumulate(Int first, Int last, T init, BinOp op) {
    return (first >= last) ?
        init :
        accumulate(first+1, last, op(init, first), op);
}

int sum_squares(int a, int n) {
    return accumulate(1, n+1, 0, [a](int prev, int i) {
        return prev + a*i*i;
    });
}

accumulate's signature is very similar to std::accumulate except that instead of dealing with iterators into a sequence, it operates directly on the integral sequence. The sum_squares function calls accumulate() with a lambda expression that captures 'a'.

All is good except that accumulate() will produce a recursion depth equal to the length of the input sequence. We can change it to a logarithmic growth by recursively dividing the sequence into two halves:

template <typename Int, typename T, typename BinOp>
T accumulate(Int first, Int last, T init, BinOp op) {
    return
        (first >= last ? init :
            (first+1 == last) ?
                op(init, first) :
                accumulate((first + last) / 2, last,
                    accumulate(first, (first + last) / 2, init, op), op));
}

At this point we should be able to just prefix function signatures with constexpr and get the magic of compile time computation. Let's give it a try:


template <typename Int, typename T, typename BinOp>
constexpr T accumulate(Int first, Int last, T init, BinOp op) {
    return
        (first >= last ? init :
            (first+1 == last) ?
                op(init, first) :
                accumulate((first + last) / 2, last,
                    accumulate(first, (first + last) / 2, init, op), op));
}

constexpr int sum_squares(int a, int n) {
    return accumulate(1, n+1, 0, [a](int prev, int i) {
        return prev + a*i*i;
    });
}

int main() {
    static_assert(sum_squares(3, 600) == 216540300, "");
    return 0;
}

$ g++ -std=c++0x test.cpp
test.cpp: In function ‘int sum_squares(int, int)’:
test.cpp:17:1: error: ‘constexpr T accumulate(Int, Int, T, BinOp) [with Int = int, T = int, BinOp = sum_squares(int, int)::<lambda(int, int)>]’ is not ‘constexpr’
test.cpp: In function ‘int main()’:
test.cpp:21:2: error: non-constant condition for static assertion
test.cpp:21:34: error: ‘int sum_squares(int, int)’ is not a constexpr function

For some mysterious reason, C++11 Standard explicitly forbids lambda expressions from being used in constexpr expressions. So we have to fall back to good old functors:

struct add_sq {
    int a;
    constexpr int operator()(int p, int i) const { return p + a*i*i; }
};

constexpr int sum_squares(int a, int n) {
    return accumulate(1, n+1, 0, add_sq{ a });
}

Generalization to for-loop

It is interesting to note that our accumulate() function can work for arbitrary for-loops of the form:
for( int i = start; i < end; i++ )
    body;
To see that, observe that a loop starts with some initial state and then mutates it on every iteration in the body of the loop. Functionally, we can view this mutation as applying an operation of two arguments, an old state and iteration index, to produce the new state. This is precisely what our accumulate function does. Of course, the state might encompass more than just a scalar and we would need a composite type to hold the individual elements. It would be great if std::tuple could work with constexpr but since it doesn't, it's easy enough to define a struct to hold the necessary state.


Conclusion

As we have seen, it is straightforward to convert a recursive function with linear depth into one with logarithmic depth if the number of recursions is known at the point of its invocation. This makes it easy to convert a classic for-loop into recursive form with manageable depth. Next time we'll look at how to convert a while-loop, where the number of iterations is not initially known, into logarithmic depth recursion and in the process define the promised is_prime(n) capable of dealing with very large numbers.