Sunday, April 18, 2010

Interacting with RabbitMQ via F# and Symbiote

Last week, I showed how easy it is to talk to CouchDB with F# and Symbiote.  In this post, I'll show how you can start interacting with RabbitMQ by adding an additional dozen or so lines of code.

RabbitMQ is a complete and highly reliable enterprise messaging system based on the emerging AMQP standard. It is licensed under the open source Mozilla Public License and has a platform-neutral distribution, plus platform-specific packages and bundles for easy installation. (

Symbiote is a set of libraries that reduce the radius of comprehension by providing simplified APIs " some of the better open source libraries available" as well as "...a way to centralize the configuration [and] dependency injection..." of said libraries (Robson, A., Symbiote).   

For those of you who haven't played with RabbitMQ and/or Symbiote, I strongly recommend checking them out!


Here's the code with the CouchDB interaction functionality as well as the RabbitMQ publish/subscribe functionality.

A couple of things to note about this sample:
1. CouchDB and RabbitMQ must be installed and running on the local machine.
2. Since Symbiote is under heavy development, the CouchDB related code has changed slightly from the example provided last week.
module Progam

open System
open Symbiote.Core
open Symbiote.Relax
open Symbiote.Daemon
open Symbiote.Jackalope
open Symbiote.Jackalope.Impl

type PersonCouchDocument =
    val id : string
    val mutable name : string
    val mutable address : string
    inherit DefaultCouchDocument 
        new (id, name, address) = 
            {id = id; name = name; address = address} then base.DocumentId <- id
        member x.Name
            with get () =
        member x.Address
            with get () = x.address

type HandleMessage() =
    interface IMessageHandler<PersonCouchDocument> with
        member x.Process (message:PersonCouchDocument, messageDelivery:IMessageDelivery) =
            Console.WriteLine("Processing message for person {0}.",

type DemoService = 
    val couchServer : ICouchServer
    val bus : IBus
    new(couchServer, bus) as this = 
        {couchServer = couchServer; bus = bus} then this.Initialize()
    member public x.Initialize () =
        x.bus.AddEndPoint(fun x -> 
            x.Exchange("SymbioteDemo", ExchangeType.fanout).QueueName("SymbioteDemo")
                .Durable().PersistentDelivery() |> ignore)
    interface IDaemon with
        member x.Start () =
            do Console.WriteLine("The service has started")
            x.bus.Subscribe("SymbioteDemo", null)
            let document = new PersonCouchDocument("123456", "John Doe", "123 Main")
            x.bus.Send("SymbioteDemo", document)
            let documentRetrieved = 
            do Console.WriteLine(
                "The document with name {0} and address {1} was retrieved successfully", 
                documentRetrieved.Name, documentRetrieved.Address)
            do x.couchServer.DeleteDatabase<PersonCouchDocument>()
        member x.Stop () =
            do Console.WriteLine("The service has stopped")

do Assimilate
    .Daemon(fun x -> (x.Name("FSharpDemoService")
                                    .DisplayName("An FSharp Demo Service")
                                    .Description("An FSharp Demo Service")
                                    .Arguments([||]) |> ignore))
    .Relax(fun x -> x.Server("localhost") |> ignore) 
    .Jackalope(fun x -> x.AddServer(fun s -> s.Address("localhost").AMQP08() |> ignore) |> ignore)


As mentioned previously, with the help of Symbiote it only takes a dozen or so additional lines of code to add support for RabbitMQ to our previous example.  The complete source for this solution can be found at

No comments:

Post a Comment