dc0da61df5ae952c90fc243128e825e32537f1ea
[rust-101.git] / src / part12.rs
1 // Rust-101, Part 12: Concurrency, Arc, Send
2 // =========================================
3
4 use std::io::prelude::*;
5 use std::{io, fs, thread};
6 use std::sync::mpsc::{sync_channel, SyncSender, Receiver};
7 use std::sync::Arc;
8
9 //@ Our next stop are the concurrency features of Rust. We are going to write our own small version of "grep",
10 //@ called *rgrep*, and it is going to make use of concurrency: One thread reads the input files, one thread does
11 //@ the actual matching, and one thread writes the output. I already mentioned in the beginning of the course that
12 //@ Rust's type system (more precisely, the discipline of ownership and borrowing) will help us to avoid a common
13 //@ pitfall of concurrent programming: data races.
14
15 // Before we come to the actual code, we define a data-structure `Options` to store all the information we need
16 // to complete the job: Which files to work on, which pattern to look for, and how to output. <br/>
17 //@ Besides just printing all the matching lines, we will also offer to count them, or alternatively to sort them.
18 #[derive(Clone,Copy)]
19 pub enum OutputMode {
20     Print,
21     SortAndPrint,
22     Count,
23 }
24 use self::OutputMode::*;
25
26 pub struct Options {
27     pub files: Vec<String>,
28     pub pattern: String,
29     pub output_mode: OutputMode,
30 }
31
32 //@ Now we can write three functions to do the actual job of reading, matching, and printing, respectively.
33 //@ To get the data from one thread to the next, we will use *message passing*: We will establish communication
34 //@ channels between the threads, with one thread *sending* data, and the other one *receiving* it. `SyncSender<T>`
35 //@ is the type of the sending end of a synchronous channel transmitting data of type `T`. *Synchronous* here
36 //@ means that the `send` operation could block, waiting for the other side to make progress. We don't want to
37 //@ end up with the entire file being stored in the buffer of the channels, and the output not being fast enough
38 //@ to keep up with the speed of input.
39 //@
40 //@ We also need all the threads to have access to the options of the job they are supposed to do. Since it would
41 //@ be rather unnecessary to actually copy these options around, we will use reference-counting to share them between
42 //@ all threads. `Arc` is the thread-safe version of `Rc`, using atomic operations to keep the reference count up-to-date.
43
44 // The first function reads the files, and sends every line over the `out_channel`.
45 fn read_files(options: Arc<Options>, out_channel: SyncSender<String>) {
46     for file in options.files.iter() {
47         // First, we open the file, ignoring any errors.
48         let file = fs::File::open(file).unwrap();
49         // Then we obtain a `BufReader` for it, which provides the `lines` function.
50         let file = io::BufReader::new(file);
51         for line in file.lines() {
52             let line = line.unwrap();
53             // Now we send the line over the channel, ignoring the possibility of `send` failing.
54             out_channel.send(line).unwrap();
55         }
56     }
57     // When we drop the `out_channel`, it will be closed, which the other end can notice.
58 }
59
60 // The second function filters the lines it receives through `in_channel` with the pattern, and sends
61 // matches via `out_channel`.
62 fn filter_lines(options: Arc<Options>,
63                 in_channel: Receiver<String>,
64                 out_channel: SyncSender<String>) {
65     // We can simply iterate over the channel, which will stop when the channel is closed.
66     for line in in_channel.iter() {
67         // `contains` works on lots of types of patterns, but in particular, we can use it to test whether
68         // one string is contained in another. This is another example of Rust using traits as substitute for overloading.
69         if line.contains(&options.pattern) {
70             out_channel.send(line).unwrap();                        /*@*/
71         }
72     }
73 }
74
75 // The third function performs the output operations, receiving the relevant lines on its `in_channel`.
76 fn output_lines(options: Arc<Options>, in_channel: Receiver<String>) {
77     match options.output_mode {
78         Print => {
79             // Here, we just print every line we see.
80             for line in in_channel.iter() {
81                 println!("{}", line);                               /*@*/
82             }
83         },
84         Count => {
85             // We are supposed to count the number of matching lines. There's a convenient iterator adapter that
86             // we can use for this job.
87             let count = in_channel.iter().count();                  /*@*/
88             println!("{} hits for {}.", count, options.pattern);    /*@*/
89         },
90         SortAndPrint => {
91             // We are asked to sort the matching lines before printing. So let's collect them all in a local vector...
92             let mut data: Vec<String> = in_channel.iter().collect();
93             // ...and implement the actual sorting later.
94             unimplemented!()
95         }
96     }
97 }
98
99 // With the operations of the three threads defined, we can now implement a function that performs grepping according
100 // to some given options.
101 pub fn run(options: Options) {
102     // We move the `options` into an `Arc`, as that's what the thread workers expect.
103     let options = Arc::new(options);
104
105     // This sets up the channels. We use a `sync_channel` with buffer-size of 16 to avoid needlessly filling RAM.
106     let (line_sender, line_receiver) = sync_channel(16);
107     let (filtered_sender, filtered_receiver) = sync_channel(16);
108
109     // Spawn the read thread: `thread::spawn` takes a closure that is run in a new thread.
110     //@ The `move` keyword again tells Rust that we want ownership of captured variables to be moved into the
111     //@ closure. This means we need to do the `clone` *first*, otherwise we would lose our `options` to the
112     //@ new thread!
113     let options1 = options.clone();
114     let handle1 = thread::spawn(move || read_files(options1, line_sender));
115
116     // Same with the filter thread.
117     let options2 = options.clone();
118     let handle2 = thread::spawn(move || {
119         filter_lines(options2, line_receiver, filtered_sender)
120     });
121
122     // And the output thread.
123     let options3 = options.clone();
124     let handle3 = thread::spawn(move || output_lines(options3, filtered_receiver));
125
126     // Finally, wait until all three threads did their job.
127     //@ Joining a thread waits for its termination. This can fail if that thread panicked: In this case, we could get
128     //@ access to the data that it provided to `panic!`. Here, we just assert that they did not panic - so we will panic ourselves
129     //@ if that happened.
130     handle1.join().unwrap();
131     handle2.join().unwrap();
132     handle3.join().unwrap();
133 }
134
135 // Now we have all the pieces together for testing our rgrep with some hard-coded options.
136 //@ We need to call `to_string` on string literals to convert them to a fully-owned `String`.
137 pub fn main() {
138     let options = Options {
139         files: vec!["src/part10.rs".to_string(),
140                     "src/part11.rs".to_string(),
141                     "src/part12.rs".to_string()],
142         pattern: "let".to_string(),
143         output_mode: Print
144     };
145     run(options);
146 }
147
148 // **Exercise 12.1**: Change rgrep such that it prints not only the matching lines, but also the name of the file
149 // and the number of the line in the file. You will have to change the type of the channels from `String` to something
150 // that records this extra information.
151
152 //@ ## Ownership, Borrowing, and Concurrency
153 //@ The little demo above showed that concurrency in Rust has a fairly simple API. Considering Rust has closures,
154 //@ that should not be entirely surprising. However, as it turns out, Rust goes well beyond this and actually ensures
155 //@ the absence of data races. <br/>
156 //@ A data race is typically defined as having two concurrent, unsynchronized
157 //@ accesses to the same memory location, at least one of which is a write. In other words, a data race is mutation in
158 //@ the presence of aliasing, which Rust reliably rules out! It turns out that the same mechanism that makes our single-threaded
159 //@ programs memory safe, and that prevents us from invalidating iterators, also helps secure our multi-threaded code against
160 //@ data races. For example, notice how `read_files` sends a `String` to `filter_lines`. At run-time, only the pointer to
161 //@ the character data will actually be moved around (just like when a `String` is passed to a function with full ownership). However,
162 //@ `read_files` has to *give up* ownership of the string to perform `send`, to it is impossible for an outstanding borrow to
163 //@ still be around. After it sent the string to the other side, `read_files` has no pointer into the string content
164 //@ anymore, and hence no way to race on the data with someone else.
165 //@ 
166 //@ There is a little more to this. Remember the `'static` bound we had to add to `register` in the previous part, to make
167 //@ sure that the callbacks do not reference any pointers that might become invalid? This is just as crucial for spawning
168 //@ a thread: In general, that thread could last for much longer than the current stack frame. Thus, it must not use
169 //@ any pointers to data in that stack frame. This is achieved by requiring the `FnOnce` closure passed to `thread::spawn`
170 //@ to be valid for lifetime `'static`, as you can see in [its documentation](http://doc.rust-lang.org/stable/std/thread/fn.spawn.html).
171 //@ This avoids another kind of data race, where the thread's access races with the callee deallocating its stack frame.
172 //@ It is only thanks to the concept of lifetimes that this can be expressed as part of the type of `spawn`.
173
174 //@ ## Send
175 //@ However, the story goes even further. I said above that `Arc` is a thread-safe version of `Rc`, which uses atomic operations
176 //@ to manipulate the reference count. It is thus crucial that we don't use `Rc` across multiple threads, or the reference count may
177 //@ become invalid. And indeed, if you replace `Arc` by `Rc` (and add the appropriate imports), Rust will tell you that something
178 //@ is wrong. That's great, of course, but how did it do that?
179 //@ 
180 //@ The answer is already hinted at in the error: It will say something about `Send`. You may have noticed that the closure in
181 //@ `thread::spawn` does not just have a `'static` bound, but also has to satisfy `Send`. `Send` is a trait, and just like `Copy`,
182 //@ it's just a marker - there are no functions provided by `Send`. What the trait says is that types which are `Send`, can be
183 //@ safely sent to another thread without causing trouble. Of course, all the primitive data-types are `Send`. So is `Arc`,
184 //@ which is why Rust accepted our code. But `Rc` is not `Send`, and for a good reason!
185 //@ 
186 //@ Now, `Send` as a trait is fairly special. It has a so-called *default implementation*. This means that *every type* implements
187 //@ `Send`, unless it opts out. Opting out is viral: If your type contains a type that opted out, then you don't have `Send`, either.
188 //@ So if the environment of your closure contains an `Rc`, it won't be `Send`, preventing it from causing trouble. If however every
189 //@ captured variable *is* `Send`, then so is the entire environment, and you are good.
190
191 //@ [index](main.html) | [previous](part11.html) | [next](part13.html)