Merge pull request #42 from zdyxry/master
[rust-101.git] / src / part13.rs
1 // Rust-101, Part 13: Concurrency, Arc, Send
2 // =========================================
3
4 use std::io::prelude::*;
5 use std::{io, fs, thread};
6 use std::sync::mpsc::{sync_channel, SyncSender, Receiver};
7 use std::sync::Arc;
8
9 //@ Our next stop are the concurrency features of Rust. We are going to write our own small version
10 //@ of "grep", called *rgrep*, and it is going to perform three jobs concurrently: One thread reads
11 //@ the input files, one thread does the actual matching, and one thread writes the output. I
12 //@ already mentioned in the beginning of the course that Rust's type system (more precisely, the
13 //@ discipline of ownership and borrowing) will help us to avoid a common pitfall of concurrent
14 //@ programming: data races. We will see how that works concretely.
15
16 // Before we come to the actual code, we define a data-structure `Options` to store all the
17 // information we need to complete the job: Which files to work on, which pattern to look for, and
18 // how to output.
19 //@ Besides just printing all the matching lines, we will also offer to count them, or
20 //@ alternatively to sort them.
21 #[derive(Clone,Copy)]
22 pub enum OutputMode {
23     Print,
24     SortAndPrint,
25     Count,
26 }
27 use self::OutputMode::*;
28
29 pub struct Options {
30     pub files: Vec<String>,
31     pub pattern: String,
32     pub output_mode: OutputMode,
33 }
34
35 //@ Now we can write three functions to do the actual job of reading, matching, and printing,
36 //@ respectively. To get the data from one thread to the next, we will use *message passing*: We
37 //@ will establish communication channels between the threads, with one thread *sending* data, and
38 //@ the other one *receiving* it. `SyncSender<T>` is the type of the sending end of a synchronous
39 //@ channel transmitting data of type `T`.
40 //@ *Synchronous* here means that the `send` operation could block, waiting for the other side to
41 //@ make progress. We don't want to end up with the entire file being stored in the buffer of the
42 //@ channels, and the output not being fast enough to keep up with the speed of input.
43 //@
44 //@ We also need all the threads to have access to the options of the job they are supposed to do.
45 //@ Since it would be rather unnecessary to actually copy these options around, we will use
46 //@ reference-counting to share them between all threads. `Arc` is the thread-safe version of `Rc`,
47 //@ using atomic operations to keep the reference count up-to-date.
48
49 // The first function reads the files, and sends every line over the `out_channel`.
50 fn read_files(options: Arc<Options>, out_channel: SyncSender<String>) {
51     for file in options.files.iter() {
52         // First, we open the file, ignoring any errors.
53         let file = fs::File::open(file).unwrap();
54         // Then we obtain a `BufReader` for it, which provides the `lines` function.
55         let file = io::BufReader::new(file);
56         for line in file.lines() {
57             let line = line.unwrap();
58             // Now we send the line over the channel, ignoring the possibility of `send` failing.
59             out_channel.send(line).unwrap();
60         }
61     }
62     // When we drop the `out_channel`, it will be closed, which the other end can notice.
63 }
64
65 // The second function filters the lines it receives through `in_channel` with the pattern, and sends
66 // matches via `out_channel`.
67 fn filter_lines(options: Arc<Options>,
68                 in_channel: Receiver<String>,
69                 out_channel: SyncSender<String>) {
70     // We can simply iterate over the channel, which will stop when the channel is closed.
71     for line in in_channel.iter() {
72         // `contains` works on lots of types of patterns, but in particular, we can use it to test
73         // whether one string is contained in another. This is another example of Rust using traits
74         // as substitute for overloading.
75         if line.contains(&options.pattern) {
76             out_channel.send(line).unwrap();                        /*@*/
77         }
78     }
79 }
80
81 // The third function performs the output operations, receiving the relevant lines on its
82 // `in_channel`.
83 fn output_lines(options: Arc<Options>, in_channel: Receiver<String>) {
84     match options.output_mode {
85         Print => {
86             // Here, we just print every line we see.
87             for line in in_channel.iter() {
88                 println!("{}", line);                               /*@*/
89             }
90         },
91         Count => {
92             // We are supposed to count the number of matching lines. There's a convenient iterator
93             // adapter that we can use for this job.
94             let count = in_channel.iter().count();                  /*@*/
95             println!("{} hits for {}.", count, options.pattern);    /*@*/
96         },
97         SortAndPrint => {
98             // We are asked to sort the matching lines before printing. So let's collect them all
99             // in a local vector...
100             let mut data: Vec<String> = in_channel.iter().collect();
101             // ...and implement the actual sorting later.
102             unimplemented!()
103         }
104     }
105 }
106
107 // With the operations of the three threads defined, we can now implement a function that performs
108 // grepping according to some given options.
109 pub fn run(options: Options) {
110     // We move the `options` into an `Arc`, as that's what the thread workers expect.
111     let options = Arc::new(options);
112
113     // This sets up the channels. We use a `sync_channel` with buffer-size of 16 to avoid needlessly
114     // filling RAM.
115     let (line_sender, line_receiver) = sync_channel(16);
116     let (filtered_sender, filtered_receiver) = sync_channel(16);
117
118     // Spawn the read thread: `thread::spawn` takes a closure that is run in a new thread.
119     //@ The `move` keyword again tells Rust that we want ownership of captured variables to be
120     //@ moved into the closure. This means we need to do the `clone` *first*, otherwise we would
121     //@ lose our `options` to the new thread!
122     let options1 = options.clone();
123     let handle1 = thread::spawn(move || read_files(options1, line_sender));
124
125     // Same with the filter thread.
126     let options2 = options.clone();
127     let handle2 = thread::spawn(move || {
128         filter_lines(options2, line_receiver, filtered_sender)
129     });
130
131     // And the output thread.
132     let options3 = options.clone();
133     let handle3 = thread::spawn(move || output_lines(options3, filtered_receiver));
134
135     // Finally, wait until all three threads did their job.
136     //@ Joining a thread waits for its termination. This can fail if that thread panicked: In this
137     //@ case, we could get access to the data that it provided to `panic!`. Here, we just assert
138     //@ that they did not panic - so we will panic ourselves if that happened.
139     handle1.join().unwrap();
140     handle2.join().unwrap();
141     handle3.join().unwrap();
142 }
143
144 // Now we have all the pieces together for testing our rgrep with some hard-coded options.
145 //@ We need to call `to_string` on string literals to convert them to a fully-owned `String`.
146 pub fn main() {
147     let options = Options {
148         files: vec!["src/part10.rs".to_string(),
149                     "src/part11.rs".to_string(),
150                     "src/part12.rs".to_string()],
151         pattern: "let".to_string(),
152         output_mode: Print
153     };
154     run(options);
155 }
156
157 // **Exercise 13.1**: Change rgrep such that it prints not only the matching lines, but also the
158 // name of the file and the number of the line in the file. You will have to change the type of the
159 // channels from `String` to something that records this extra information.
160
161 //@ ## Ownership, Borrowing, and Concurrency
162 //@ The little demo above showed that concurrency in Rust has a fairly simple API. Considering Rust
163 //@ has closures, that should not be entirely surprising. However, as it turns out, Rust goes well
164 //@ beyond this and actually ensures the absence of data races. <br/>
165 //@ A data race is typically defined as having two concurrent, unsynchronized accesses to the same
166 //@ memory location, at least one of which is a write. In other words, a data race is mutation in
167 //@ the presence of aliasing, which Rust reliably rules out! It turns out that the same mechanism
168 //@ that makes our single-threaded programs memory safe, and that prevents us from invalidating
169 //@ iterators, also helps secure our multi-threaded code against data races. For example, notice
170 //@ how `read_files` sends a `String` to `filter_lines`.
171 //@ At run-time, only the pointer to the character data will actually be moved around (just like
172 //@ when a `String` is passed to a function with full ownership). However, `read_files` has to
173 //@ *give up* ownership of the string to perform `send`, to it is impossible for the string to
174 //@ still be borrowed. After it sent the string to the other side, `read_files` has no pointer into
175 //@ the string content anymore, and hence no way to race on the data with someone else.
176 //@ 
177 //@ There is a little more to this. Remember the `'static` bound we had to add to `register` in the
178 //@ previous parts, to make sure that the callbacks do not reference any pointers that might become
179 //@ invalid? This is just as crucial for spawning a thread: In general, that thread could last for
180 //@ much longer than the current stack frame. Thus, it must not use any pointers to data in that
181 //@ stack frame. This is achieved by requiring the `FnOnce` closure passed to `thread::spawn` to be
182 //@ valid for lifetime `'static`, as you can see in
183 //@ [its documentation](https://doc.rust-lang.org/stable/std/thread/fn.spawn.html). This avoids
184 //@ another kind of data race, where the thread's access races with the callee deallocating its
185 //@ stack frame. It is only thanks to the concept of lifetimes that this can be expressed as part
186 //@ of the type of `spawn`.
187
188 //@ ## Send
189 //@ However, the story goes even further. I said above that `Arc` is a thread-safe version of `Rc`,
190 //@ which uses atomic operations to manipulate the reference count. It is thus crucial that we
191 //@ don't use `Rc` across multiple threads, or the reference count may become invalid. And indeed,
192 //@ if you replace `Arc` by `Rc` (and add the appropriate imports), Rust will tell you that
193 //@ something is wrong. That's great, of course, but how did it do that?
194 //@ 
195 //@ The answer is already hinted at in the error: It will say something about `Send`. You may have
196 //@ noticed that the closure in `thread::spawn` does not just have a `'static` bound, but also has
197 //@ to satisfy `Send`. `Send` is a trait, and just like `Copy`, it's just a marker - there are no
198 //@ functions provided by `Send`. What the trait says is that types which are `Send` can be safely
199 //@ sent to another thread without causing trouble.
200 //@ Of course, all the primitive data-types are `Send`. So is `Arc`, which is why Rust accepted our
201 //@ code. But `Rc` is not `Send`, and for a good reason! If we had two `Rc`s to the same data, and sent
202 //@ one of them to another thread, things could go havoc due to the lack of synchronization.
203 //@ 
204 //@ Now, `Send` as a trait is fairly special. It has a so-called *default implementation*. This
205 //@ means that *every type* implements `Send`, unless it opts out. Opting out is viral: If your
206 //@ type contains a type that opted out, then you don't have `Send`, either. So if the environment
207 //@ of your closure contains an `Rc`, it won't be `Send`, preventing it from causing trouble. If
208 //@ however every captured variable *is* `Send`, then so is the entire environment, and you are
209 //@ good.
210
211 //@ [index](main.html) | [previous](part12.html) | [raw source](workspace/src/part13.rs) |
212 //@ [next](part14.html)