At work we run a main resque worker server that has about 15-20 workers running at any given time. We also distribute about 5 or 6 workers to our app servers to get more work done and for security against our main worker server going down. Sometimes, our clients cause hundreds of thousands of jobs to get enqueued in a very short period of time. To combat this we will spin up a clone or two of the worker server with 30 or so workers each to handle the extra burden.
You typically start workers like this:
QUEUE=* rake resque:work
We use god to start up all of our workers this way. This worked pretty well for a while.
Here’s the rub: starting 20 workers on a single machine is brutal. Really brutal. All 20 workers are trying to start a heavy rails app environment and are getting into resource contention. This results in load on a 4 core system going up to something like 10. After a deploy, our workers would take upwards of 10 minutes to start.
The solution is to start workers sequentially on the same machine by hooking into the worker tasks in rake. This is what worked for us:
# lib/tasks/resque.rake
require 'fileutils'
# Copied from http://thomasmango.com/2010/05/27/resque-in-production/
namespace :resque do
LOCKFILE = File.join(File.dirname(__FILE__), '..', '..', 'tmp', 'worker_start.lock')
desc "wait for lock file to clear before starting"
task :wait_for_lock do
begin
File.open(LOCKFILE, File::CREAT | File::EXCL | File::WRONLY) do |f|
f.write(Process.pid)
end
rescue Errno::EEXIST => e
sleep(1)
retry
end
end
task :clear_lock do
FileUtils.rm(LOCKFILE, :force => true)
end
task :preload => [:wait_for_lock, :environment] do
# Add clear lock to run after loading rails
Rake::Task['resque:clear_lock'].invoke
end
end
There is a much more elegant solution in Ruby’s File#flock but it didn’t seem to work in this case. Every contrived script I could come up with would have an exclusive flock (file lock) would work, but in a task like this, the lock would not be exclusive and all the workers would start at once. This solution is ugly but does the trick. Without so much resource contention, our workers are each starting in < 30 seconds and can begin work immediately while the other workers are waiting.
I came up with an idea the other day when listening to the latest Ruby Rogues podcast. In it, there was a lot of debate about the edition of Enumerable#lazy to Ruby 2.0, which would allow for chained method calls on lists that would build up and then get evaluated lazily to prevent extra trips through the list like so:
[1,2,3,4].map {|x| x + 3}.select {|x| x > 5}
The host who aired on the side of object oriented programming prescribed that these method chains should be extracted into a method which better describes them for reuse and readability. That’s what got me thinking.
In Haskell, instead of method chaining we compose functions with the . operator. As I learn more about Haskell I find myself using composition more and more to succintly summarize a transform I want to perform to get data from one type to another. The downside to it being so easy in Haskell is it becomes easy to introduce redundancies into your code at the cost of expressiveness.
From your project directory, you can run:
$: find . -name '*.hs' | xargs grep -Posh '[\w\.]+ \. [\w+\.]+' | uniq -cd
2 show . length
2 maybeRead . T.unpack
This line of shell script does the following:
That’s pretty powerful. A good refactoring would be to find instances of these compositions and extract them to a better-named function and put them in a Util module if they are used throughout your application. For example, maybeRead . T.unpack could be more readable as tryReadText. The regex isn’t quite right for multiple compositions strung together but I don’t quite want to work that out for the sake of example.
I find as I’m learning to improve my Haskell-fu that while we can write very terse programs in Haskell, a well-named function will almost always be preferable to long chains of function composition in an already-busy function definition. OO and FP programmers would do well to strive for compact, expressive methods/functions. I used to think that I should only split up long functions when I knew I was going to reuse some component of the function. I now realize that I should be much more liberal with decomposing long functions so the reader can see a high level, grokable definition and to look at the definitions of the components if further explanation is necessary.
A word of caution: This post features code from my first and so far only parser I’ve ever written. I was able to achieve what I wanted by reading attoparsec’s documentation, which is by no means written in the manner of a tutorial. I let the types and my understanding of the various typeclasses it uses guide me. I make no claims that this is The Best Way® to do things. The parser I made is performant enough for my needs but I have done no formal benchmarks nor have I put time into optimizing it further.
Attoparsec is a Haskell library for creating parser combinators. It is inspired by the older Parsec library and is designed with performance and efficiency in mind. Brian O’Sullivan is the creator. Brian writes some very high-quality libraries, all of which are a joy to use.
I decided to write this because I found very few tutorial-like resources for writing parsers with Attoparsec. The documentation is good once you know a bit of what you’re doing, but there wasn’t anything to get started. I dug into the source code of another one of Brian O’Sullivan’s projects, Aeson
I needed a parser for a project I’m working on, HollaBack. This service receives emails with a specified date as the mailbox, parses them and bounces the email back to you at the desired time. I got the idea from FollowUp.cc. Since their format seemed as good as any, I decided to make the parser for their date format.
The date format can either be a relative time or a specific date/time. A relative date/time looks like (quantity)(time keywword). Time keywords are mi, h, d, w, mo, and y. Some examples are:
Specific date/times look like:
These should be pretty self explanatory.
data DayOfWeek = Monday |
Tuesday |
Wednesday |
Thursday |
Friday |
Saturday |
Sunday deriving (Show, Eq)
data Date = Date Month Int deriving (Show, Eq)
data DateTimeSpec = RelativeDateTime TimeUnit |
SpecificDateTime Date TimeOfDay |
SpecificWeekdayTime DayOfWeek TimeOfDay |
SpecificWeekday DayOfWeek |
SpecificTime TimeOfDay deriving (Show, Eq)
data TimeUnit = TimeUnit Integer TimeKeyword deriving (Show, Eq)
data TimeKeyword = Minutes |
Hours |
Days |
Weeks |
Months |
Years deriving (Show, Eq)
import Control.Applicative ((<*>),
(*>),
(<$>),
(<|>),
pure)
import qualified Data.Attoparsec.Text as A
import qualified Data.Attoparsec.Combinator as AC
import Data.Attoparsec.Text (Parser)
import Data.Text (Text)
Parsers can be written almost entirely in terms of functions from Control.Applicative. Try LYAH for a refresher on Applicative.
The first step is to define signatures for combinators for all the major data types:
timeUnit :: Parser TimeUnit
timeKeyword :: Parser TimeKeyword
day :: Parser DayOfWeek
time :: Parser TimeOfDay
date :: Parser Date
Parsing discrete values is the simplest. Lets start with day
day :: Parser DayOfWeek
day = monday <|>
tuesday <|>
-- ...
where monday = A.stringCI "monday" *> pure Monday
We want to match the string “monday” so we use stringCI, which does a full string, case insenstive match. stringCI has a type Text -> Parser Text. We use *> which discards the result of the first action (Parser Text), and returns the second. pure lifts the value Monday into the functor.
Luckily for us, Parser has an instance for Alternaative which is a “monoid on applicative functors”. <|> is an associative binary operation. in the case of a paaarser, if the first parse fails, the next parser is used. So when we chain together these parsers with <|>, it will try them sequentially until one is successful.
Adding multiple aliases for each day is easy:
stringChoices :: [Text] -> Parser Text
stringChoices = AC.choice . map A.stringCI
-- ...
where monday = stringChoices ["monday", "mon"] *> pure Monday
For compound types like TimeUnit, the best way is to use Applicative’s sequential application function, <*>. We’ll write a parser for each component of TimeUnit, one for the quantity and the other for the TimeKeyword.
timeKeyword :: Parser TimeKeyword
timeKeyword = minutes <|>
hours <|>
-- ..
where minutes = A.stringCI "mi" *> pure Minutes
hours = A.stringCI "h" *> pure Hours
-- ..
We can think of the TimeUnit constructor as a function that consumes arguments (I know this isn’t exactly how it works in Haskell, but I’ll describe it thusly for brevity). Thus, the type is: TimeUnit :: Integer -> TimeKeyword -> TimeUnit. Our parser will then look like this:
timeUnit :: Parser TimeUnit
timeUnit = TimeUnit <$> integer
<*> timeKeyword
where integer = toInteger <$> A.decimal
Decimal is capable of parsing floating point numbers as well, but because we use fromIntegral, the parser will fail if it is given a floating point quantity, which is exactly what we want.
I’m really quite impressed by the ease of use for parsing simple grammars with Attoparsec. The Applicative/Alternative instances make the parsers read like BNF grammar notation and make parsing complex types a simple matter of creating smaller parsers and then composing them.
RememberTheMilk offers a bookmarklet to quickly add a task based on the page of you are currently on. For whatever reason, they do not fill in the URL for the task, only using the title of the page as the title of the task. Digging into the bookmarklet code, this was really simple to fix.
Drag the box below into your bookmark bar to install the bookarklet.
Depagination is a word I probably made up. When working on Web.GooglePlus I noticed that a lot of resources exposed in the Google+ API in their response return a “page token” to the next page, like so:
{
"kind": "plus#peopleFeed",
"selfLink": string,
"title": string,
"nextPageToken": string,
"items": [
people Resource
]
}
Pagination is nice for an end user but it is typically not very useful to the user of a library. If I want to perform a search for people on Google+ with Web.GooglePlus, my application’s code shouldn’t have to carry the burden of traversing pages of data. It should consume as many results as it needs until the results run out or some applicaiton logic decides it has had enough. That’s where depagination with Enumerators comes in.
The trick to depagination where the next page token is included alongside the current page is that you must maintain state to know when to stop enumerating and to have the token to get the next page.
Data.Enumerator.List provides unfoldM which has the type:
unfoldM :: Monad m => (s -> m (Maybe (a, s))) -> s -> Enumerator a m b
unfoldM takes a function which, given a state, either produces a result and the modified state or returns Nothing, thus terminating the Enumerator’s stream.
For my specific case, unfoldM wasn’t quite what I needed. It would yield a list of results at a time as a single Chunk per page. Instead, it would make more sense for each chunk to be a list of results, say [Person] or [Activity] in terms of Google+. For this, I needed to make a slight modification to unfoldM:
-- Exactly the same as unfoldM but takes the result of the stateful function
-- and uses it as the chunks, rather than a Chunks with a singleton list
unfoldListM :: Monad m => (s -> m (Maybe ([a], s)))
-> s
-> Enumerator a m b
unfoldListM f = checkContinue1 $ \loop s k -> do
fs <- lift (f s)
case fs of
Nothing -> continue k
Just (as, s') -> k (Chunks as) >>== loop s'
With that out of the way, I was able to create a generic depaginator for any paginated resource in the Google+ API. First, the relevant types:
type PageToken = Text
data DepaginationState = FirstPage |
MorePages PageToken |
NoMorePages
simpleDepaginator :: Monad m => (DepaginationState -> m (Maybe ([a], DepaginationState)))
-> Enumerator a m b
simpleDepaginator depaginate = unfoldListM depaginate FirstPage
simpleDepaginator sets up the depagination by initializing the state to being on the first page. The first page is a special case where there is no previous page token. It takes the step as an argument which fetches the next page and mutates the state. Here’s what the generic depagination step looks like:
paginatedState :: (a, Maybe PageToken)
-> (a, DepaginationState)
paginatedState (results, token) = (results, maybe NoMorePages MorePages token)
simpleDepaginationStep :: FromJSON a => Integer
-> Ascii
-> Query
-> DepaginationState
-> GooglePlusM (Maybe ([a], DepaginationState))
simpleDepaginationStep perPage pth params FirstPage = (return . fmap paginatedState) =<< simpleGetFirstPage perPage pth params
simpleDepaginationStep perPage pth params (MorePages tok) = (return . fmap paginatedState) =<< simpleGetPage perPage (Just tok) pth params
simpleDepaginationStep _ _ _ NoMorePages = return Nothing
Simple as can be. All 3 cases of the state are handled. In the first two, there is more data to get and thus an HTTP GET is performed. If we are on page 2 or higher, it includes the token. If there are no more pages, the depagination step returns Nothing, which terminates the stream of data.
paginatedState simply looks at the presence or absence of the current request’s page token to determine if the enumerator should continue requesting pages or not.
The process of coming up with this abstraction results in a nice way to yield results of the API calls before having gotten all the pages. This is the inherant benefit of designing for the Enumerator interface. The application using the library does not necessarily have to hold all results in memory, nor wait for them all to be fetched to deal with them, nor concern itself with the intricacies of paginating the requests.
Also, using enumerators makes defining non-enumerator interfaces dead simple. You just consume the enumerator:
import qualified Data.Enumerator.List as EL
enumActivities :: PersonID -- ^ Feed owner ID
-> ActivityCollection -- ^ Indicates what type of feed to retrieve
-> Maybe Integer -- ^ Page size. Should be between 1 and 100. Defualt 20
-> Enumerator Activity GooglePlusM b
enumActivities pid coll perPage = simpleDepaginator depaginate
where depaginate = -- ...
getActivities :: PersonID -- ^ Feed owner ID
-> ActivityCollection -- ^ Indicates what type of feed to retrieve
-> GooglePlusM [Activity]
getActivities pid coll = run_ $ enumActivities pid coll (Just 100) $ EL.consume
getActivities will then consume pages of 100 activities at a time (the maximum) and will not yield a result until it hits the end. If the user wants to do something quick and dirty, just slurping the entire resource without having to worry about enumerators is an attractive option.
The main fault I can see with using unfoldM is that it can be lossy and can send some false signals if you use it on an API which may change its data format at any time. For instance, I’ve coded my datatypes against the Google+ specs and I’m fairly confident I’ve gotten all the fields right. This includes handling fields which may possibly be absent such as email addresses, URLs, etc. However, because unfoldM is terminated with a Nothing, a fatal parse error to the consumer is indistinguishable from hitting the last page, it terminates the stream and says nothing more. If the resource you are consuming is more error prone than most, it may be a good idea to roll your own unfoldM which uses Either to distinguish between normal termination and termination due to error.
Overall I really like this pattern and will probably use it on any future web API projects I do that require consuming a paginated resource.