Introduction

Bluesky has emerged as an intriguing competitor in the social media landscape, offering features familiar to Twitter users while embracing a more open, decentralized approach. What makes it particularly exciting for developers is its accessibility: Bluesky provides a websocket firehose that streams platform events in real-time, with no authentication required and no sharding complexity. This makes it an ideal playground for experimenting with high-throughput data processing.

The firehose streams various platform events, including:

  • Post creations and updates
  • User identity changes
  • Account status modifications
  • Platform-wide activity signals

Building a Firehose Consumer in Go

I chose Go for this project for a few compelling reasons: its excellent concurrency support, efficient handling of high-throughput network operations, and writing in the language brings me an immense amount of joy. Go’s simple yet powerful approach to websockets and JSON processing makes it particularly well-suited for this task.

Let’s dive into the implementation, starting with our data models:

// Represents the main message structure from the firehose
type Event struct {
	Did      string    `json:"did"`
	TimeUS   int64     `json:"time_us"`
	Kind     string    `json:"kind,omitempty"`
	Commit   *Commit   `json:"commit,omitempty"`
	Identity *Identity `json:"identity,omitempty"`
	Account  *Account  `json:"account,omitempty"`
}

// Represents the commit information in an event
type Commit struct {
	Rev        string          `json:"rev,omitempty"`
	Operation  string          `json:"operation,omitempty"`
	Collection string          `json:"collection,omitempty"`
	RKey       string          `json:"rkey,omitempty"`
	Record     json.RawMessage `json:"record,omitempty"`
	CID        string          `json:"cid,omitempty"`
}

// Represents identity changes like handle updates
type Identity struct {
	Handle      string `json:"handle,omitempty"`
	DisplayName string `json:"displayName,omitempty"`
	Description string `json:"description,omitempty"`
	Seq         int64  `json:"seq"`
	Time        string `json:"time"`
}

// Represents account status changes
type Account struct {
	Active bool   `json:"active"`
	Seq    int64  `json:"seq"`
	Time   string `json:"time"`
}

// Represents the structure of a post record
type Post struct {
	Type      string    `json:"$type,omitempty"`
	Text      string    `json:"text"`
	CreatedAt time.Time `json:"createdAt"`
}

With our models defined, here’s how we route and process different event types:

func processEvent(event Event) {
	switch event.Kind {
	case "commit":
		if event.Commit != nil {
			processCommit(event)
		}
	case "identity":
		if event.Identity != nil {
			processIdentity(event)
		}
	case "account":
		if event.Account != nil {
			processAccount(event)
		}
	}
}

func processCommit(event Event) {
	if event.Commit.Operation == "create" || event.Commit.Operation == "update" {
		// If it's a post, try to decode the post content
		if event.Commit.Collection == "app.bsky.feed.post" {
			var post Post
			if err := json.Unmarshal(event.Commit.Record, &post); err != nil {
				log.Printf("Error unmarshaling post: %v", err)
				return
			}

fmt.Printf("Post Text: %s\n", post.Text)
			fmt.Printf("Post %sd At: %s\n", event.Commit.Operation, post.CreatedAt)
		}
	}
}

func processIdentityUpdate(event Event) {
	fmt.Printf("\n--- Identity Update ---\n")
	fmt.Printf("DID: %s\n", event.Did)
	fmt.Printf("Handle: %s\n", event.Identity.Handle)
	fmt.Printf("Display Name: %s\n", event.Identity.DisplayName)
	fmt.Printf("Description: %s\n", event.Identity.Description)
	fmt.Printf("Sequence: %d\n", event.Identity.Seq)
	fmt.Printf("Time: %s\n", event.Identity.Time)
}

func processAccountUpdate(event Event) {
	fmt.Printf("\n--- Account Update ---\n")
	fmt.Printf("DID: %s\n", event.Did)
	fmt.Printf("Active: %v\n", event.Account.Active)
	fmt.Printf("Sequence: %d\n", event.Account.Seq)
	fmt.Printf("Time: %s\n", event.Account.Time)
}

At this point we’re ready to connect to the firefox. We’re able to read bytes from the websocket, and use our processEvent function we wrote above to process each event appropriately.

import (
	"github.com/gorilla/websocket"
)

var wsURL = "wss://jetstream2.us-east.bsky.network/subscribe"

func main() {
	// Connect to websocket
	c, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
	if err != nil {
		log.Fatal("dial:", err)
	}
	defer c.Close()

	// Start reading messages
	done := make(chan struct{})
	go func() {
		defer close(done)
		for {
			_, message, err := c.ReadMessage()
			if err != nil {
				log.Println("read:", err)
				return
			}

			var event Event
			if err := json.Unmarshal(message, &event); err != nil {
				log.Printf("Error unmarshaling event: %v", err)
				continue
			}

			processEvent(event)
		}
	}()
}

Running Our Program

Let’s run our program, and look at a snippet of its output:

$ go run main.go
Post Text: First hand to hand combat. Let's go!                                                                                                                                                                                                                                         
Post created At: 2024-12-23 20:05:16.366 +0000 UTC                                                                                                                                                                                                                                      
Post Text: Blautopfhöhle Rabbithole vs. Klaustrophobie, anschauen tu ich’s gerne aber nachvollziehen kann ich’s nicht 😅                                                                                                                                                                
Post created At: 2024-12-23 20:05:19.397 +0000 UTC
Post Text: Ion post yall what yall be saying about me 😂😂
Post created At: 2024-12-23 20:05:19.641 +0000 UTC
Post Text: 17 year old Bluesky accounts original....
Post created At: 2024-12-23 20:05:20.066 +0000 UTC
Post Text: this is a fantastic argument for rebirth not just as the game of this year, but as a turning point in the history of the final fantasy series                                                                                                                                
Post created At: 2024-12-23 20:05:19.532 +0000 UTC
Post Text: 😍😍😍😍🥰🥰🥰🥰🤗🤗🤗🤗😘😘😘😘
Post created At: 2024-12-23 20:05:19.251 +0000 UTC
Post Text: www.lemonde.fr/idees/articl...
Post created At: 2024-12-23 20:05:17.694 +0000 UTC
Post Text: ❤️‍🔥
Post created At: 2024-12-23 20:05:18.129 +0000 UTC
Post Text: What a dispicable man, like most in the GOP.                                                                                                                                                                                                                                 
Post created At: 2024-12-23 20:05:19.683 +0000 UTC
Post Text: Yes I knew his name was Christian! Couldn’t remember the last name tho
Post created At: 2024-12-23 20:05:19.581 +0000 UTC
Post Text:
Post created At: 2024-12-23 20:05:17.996 +0000 UTC                                                                                                                                                                                                                                      
Post Text: Finished Sociopath: A Memoir waiting for my turn in the passport renewal line. Fascinating read. I love growing my understanding of how people's minds and thoughts work and this one was honest, insightful, and full of messy humanity. Definitely recommend. #booksky @epldotca.bsky.social                                                                                                                                                                                                                                                                       
Post created At: 2024-12-23 20:05:02.841 +0000 UTC
...

I added in a measurement of how many events we’re processing at a time, and the count fluctuated between 600, and 800 events per second.

Future Possibilities

While this implementation simply prints events to the console, it serves as a foundation for more sophisticated applications. Here are some exciting directions to explore:

  • Time-Series Analysis: Capture and store events in Parquet files for efficient historical analysis
  • Real-Time Dashboard: Build a web application with live visualizations of platform activity
  • Data Science & Machine Learning: Use captured data for trend analysis and user behavior studies using Jupyter notebooks
  • Network Analysis: Map user interactions and community formation patterns

The complete implementation is available on Github here

Conclusion

Building a Bluesky firehose consumer in Go demonstrates the language’s strengths in handling real-time data streams. Bluesky’s accessible API creates opportunities for fascinating data engineering projects.

Special thanks to my friend Max for introducing me to Bluesky’s firehose and helping refine the data models used in this implementation.

References