Header menu logo FSharp.ATProto

Streaming

The FSharp.ATProto.Streaming package provides real-time event streams over WebSocket. Two protocols are supported: Jetstream (JSON, simpler) and Firehose (CBOR, full fidelity). Both return IAsyncEnumerable<Result<Event, StreamError>> for incremental consumption.

open FSharp.ATProto.Streaming
open FSharp.ATProto.Syntax
open System.Threading

Jetstream

Jetstream is a JSON-based relay that provides a filtered, lightweight view of the AT Protocol event stream. It is the recommended starting point for most applications.

Key Types

Type

Description

JetstreamEvent

CommitEvent, IdentityEvent, AccountEvent, UnknownEvent

CommitInfo

Commit details: Rev, Operation, Collection, Rkey, Record, Cid

CommitOperation

Create, Update, Delete

JetstreamOptions

Connection config: Endpoint, WantedCollections, WantedDids, Cursor

StreamError

ConnectionFailed, WebSocketError, DeserializationError, Closed

Functions

Function

Description

Jetstream.subscribe

Subscribe to events; returns IAsyncEnumerable

Jetstream.parseEvent

Parse a raw JSON message into a JetstreamEvent

Jetstream.buildUri

Build the WebSocket URI from options

Jetstream.defaultOptions

Default options pointing at jetstream1.us-east.bsky.network

Subscribing to Posts

let cts = new CancellationTokenSource()

let options =
    { Jetstream.defaultOptions with
        WantedCollections = [ "app.bsky.feed.post" ] }

let events = Jetstream.subscribe options cts.Token

task {
    let enumerator = events.GetAsyncEnumerator(cts.Token)

    while! enumerator.MoveNextAsync() do
        match enumerator.Current with
        | Ok (CommitEvent (did, _timeUs, info)) when info.Operation = Create ->
            printfn "New post from %s (rkey: %s)" (Did.value did) info.Rkey
        | Ok (IdentityEvent (did, _timeUs, identity)) ->
            printfn "Identity update: %s" (Did.value identity.Did)
        | Ok (AccountEvent (did, _timeUs, account)) ->
            printfn "Account %s active=%b" (Did.value did) account.Active
        | Error StreamError.Closed ->
            printfn "Stream closed"
        | Error e ->
            printfn "Error: %A" e
        | _ -> ()

    // Cancel to disconnect
    cts.Cancel()
}

Filtering by DID

To receive events only for specific accounts, set WantedDids:

    { Jetstream.defaultOptions with
        WantedDids = [ "did:plc:abc123"; "did:plc:def456" ] }

Resuming from a Cursor

Jetstream cursors are microsecond timestamps. To resume from where you left off:

    { Jetstream.defaultOptions with
        Cursor = Some 1709312400000000L }

Firehose

The Firehose provides the full-fidelity CBOR event stream from com.atproto.sync.subscribeRepos. It includes raw CAR blocks and is suitable for indexers and relay operators.

Key Types

Type

Description

FirehoseEvent

FirehoseCommitEvent, FirehoseIdentityEvent, FirehoseAccountEvent, FirehoseInfoEvent, FirehoseErrorEvent, FirehoseUnknownEvent

FirehoseCommit

Full commit: Seq, Repo, Rev, Ops, Blocks : CarFile, Time

RepoOp

Operation within a commit: Action, Path, Cid

CarFile

Parsed CAR v1: Roots : Cid list, Blocks : Map<string, byte[]>

FirehoseOptions

Connection config: Endpoint, Cursor, MaxMessageSizeBytes

Functions

Function

Description

Firehose.subscribe

Subscribe to events; returns IAsyncEnumerable

Firehose.parseFrame

Parse a raw CBOR frame into a FirehoseEvent

Firehose.buildUri

Build the WebSocket URI from options

Firehose.defaultOptions

Default options pointing at bsky.network

Subscribing to the Firehose

let firehoseOptions =
    { Firehose.defaultOptions with
        Cursor = None }

let firehoseEvents = Firehose.subscribe firehoseOptions cts2.Token

task {
    let enumerator = firehoseEvents.GetAsyncEnumerator(cts2.Token)

    while! enumerator.MoveNextAsync() do
        match enumerator.Current with
        | Ok (FirehoseCommitEvent commit) ->
            for op in commit.Ops do
                printfn "[%d] %s %A %s" commit.Seq (Did.value commit.Repo) op.Action op.Path
        | Ok (FirehoseInfoEvent info) ->
            printfn "Info: %s" info.Name
        | Ok (FirehoseErrorEvent (error, message)) ->
            printfn "Error: %s %A" error message
        | Error e ->
            printfn "Stream error: %A" e
        | _ -> ()

    cts2.Cancel()
}

Graceful Shutdown

Both subscribe functions respect CancellationToken. Cancel the token to close the WebSocket connection cleanly. After cancellation, MoveNextAsync returns false and the enumerator disposes the underlying WebSocket.

// Run for 30 seconds then stop
cts.CancelAfter(30_000)
Multiple items
namespace FSharp

--------------------
namespace Microsoft.FSharp
namespace FSharp.ATProto
namespace FSharp.ATProto.Syntax
namespace FSharp.ATProto.Streaming
namespace System
namespace System.Threading
val cts: CancellationTokenSource
Multiple items
type CancellationTokenSource = interface IDisposable new: unit -> unit + 3 overloads member Cancel: unit -> unit + 1 overload member CancelAfter: millisecondsDelay: int -> unit + 1 overload member CancelAsync: unit -> Task member Dispose: unit -> unit member TryReset: unit -> bool static member CreateLinkedTokenSource: token: CancellationToken -> CancellationTokenSource + 3 overloads member IsCancellationRequested: bool member Token: CancellationToken
<summary>Signals to a <see cref="T:System.Threading.CancellationToken" /> that it should be canceled.</summary>

--------------------
CancellationTokenSource() : CancellationTokenSource
CancellationTokenSource(millisecondsDelay: int) : CancellationTokenSource
CancellationTokenSource(delay: System.TimeSpan) : CancellationTokenSource
CancellationTokenSource(delay: System.TimeSpan, timeProvider: System.TimeProvider) : CancellationTokenSource
val options: JetstreamOptions
module Jetstream from FSharp.ATProto.Streaming
val defaultOptions: JetstreamOptions
<summary> Default Jetstream options pointing at the US-East relay. </summary>
val events: System.Collections.Generic.IAsyncEnumerable<Result<JetstreamEvent,StreamError>>
val subscribe: options: JetstreamOptions -> ct: CancellationToken -> System.Collections.Generic.IAsyncEnumerable<Result<JetstreamEvent,StreamError>>
<summary> Subscribe to a Jetstream relay. Returns an IAsyncEnumerable that yields events until the connection closes or the CancellationToken is triggered. On disconnect the final element will be Error Closed. </summary>
property CancellationTokenSource.Token: CancellationToken with get
<summary>Gets the <see cref="T:System.Threading.CancellationToken" /> associated with this <see cref="T:System.Threading.CancellationTokenSource" />.</summary>
<exception cref="T:System.ObjectDisposedException">The token source has been disposed.</exception>
<returns>The <see cref="T:System.Threading.CancellationToken" /> associated with this <see cref="T:System.Threading.CancellationTokenSource" />.</returns>
val task: TaskBuilder
val enumerator: System.Collections.Generic.IAsyncEnumerator<Result<JetstreamEvent,StreamError>>
System.Collections.Generic.IAsyncEnumerable.GetAsyncEnumerator(?cancellationToken: CancellationToken) : System.Collections.Generic.IAsyncEnumerator<Result<JetstreamEvent,StreamError>>
System.Collections.Generic.IAsyncEnumerator.MoveNextAsync() : Tasks.ValueTask<bool>
property System.Collections.Generic.IAsyncEnumerator.Current: Result<JetstreamEvent,StreamError> with get
<summary>Gets the element in the collection at the current position of the enumerator.</summary>
<returns>The element in the collection at the current position of the enumerator.</returns>
union case Result.Ok: ResultValue: 'T -> Result<'T,'TError>
union case JetstreamEvent.CommitEvent: did: Did * timeUs: int64 * commit: CommitInfo -> JetstreamEvent
val did: Did
val _timeUs: int64
val info: CommitInfo
CommitInfo.Operation: CommitOperation
union case CommitOperation.Create: CommitOperation
val printfn: format: Printf.TextWriterFormat<'T> -> 'T
Multiple items
module Did from FSharp.ATProto.Syntax
<summary> Functions for creating, validating, and extracting data from <see cref="Did" /> values. </summary>

--------------------
type Did = private | Did of string override ToString: unit -> string
<summary> A decentralized identifier (DID) as defined by the AT Protocol. DIDs are the primary stable identifier for accounts. Two methods are currently supported: <c>did:plc:</c> (hosted, managed by PLC directory) and <c>did:web:</c> (self-hosted, DNS-based). </summary>
<remarks> See the AT Protocol specification: https://atproto.com/specs/did and the W3C DID specification: https://www.w3.org/TR/did-core/ </remarks>
val value: Did -> string
<summary> Extract the string representation of a DID. </summary>
<param name="did">The DID to extract the value from.</param>
<returns>The full DID string (e.g. <c>"did:plc:z72i7hdynmk6r22z27h6tvur"</c>).</returns>
CommitInfo.Rkey: string
union case JetstreamEvent.IdentityEvent: did: Did * timeUs: int64 * identity: IdentityInfo -> JetstreamEvent
val identity: IdentityInfo
IdentityInfo.Did: Did
union case JetstreamEvent.AccountEvent: did: Did * timeUs: int64 * account: AccountInfo -> JetstreamEvent
val account: AccountInfo
AccountInfo.Active: bool
union case Result.Error: ErrorValue: 'TError -> Result<'T,'TError>
type StreamError = | ConnectionFailed of message: string | WebSocketError of message: string | DeserializationError of message: string | Closed
<summary> Errors that can occur during Jetstream event streaming. </summary>
union case StreamError.Closed: StreamError
val e: StreamError
CancellationTokenSource.Cancel() : unit
CancellationTokenSource.Cancel(throwOnFirstException: bool) : unit
val options2: JetstreamOptions
val options3: JetstreamOptions
union case Option.Some: Value: 'T -> Option<'T>
val cts2: CancellationTokenSource
val firehoseOptions: FirehoseOptions
module Firehose from FSharp.ATProto.Streaming
val defaultOptions: FirehoseOptions
<summary> Default firehose options pointing at the main Bluesky relay. </summary>
union case Option.None: Option<'T>
val firehoseEvents: System.Collections.Generic.IAsyncEnumerable<Result<FirehoseEvent,StreamError>>
val subscribe: options: FirehoseOptions -> ct: CancellationToken -> System.Collections.Generic.IAsyncEnumerable<Result<FirehoseEvent,StreamError>>
<summary> Subscribe to the AT Protocol firehose. Returns an IAsyncEnumerable that yields parsed events until the connection closes or the CancellationToken is triggered. </summary>
val enumerator: System.Collections.Generic.IAsyncEnumerator<Result<FirehoseEvent,StreamError>>
System.Collections.Generic.IAsyncEnumerable.GetAsyncEnumerator(?cancellationToken: CancellationToken) : System.Collections.Generic.IAsyncEnumerator<Result<FirehoseEvent,StreamError>>
property System.Collections.Generic.IAsyncEnumerator.Current: Result<FirehoseEvent,StreamError> with get
<summary>Gets the element in the collection at the current position of the enumerator.</summary>
<returns>The element in the collection at the current position of the enumerator.</returns>
union case FirehoseEvent.FirehoseCommitEvent: FirehoseCommit -> FirehoseEvent
val commit: FirehoseCommit
val op: RepoOp
FirehoseCommit.Ops: RepoOp list
FirehoseCommit.Seq: int64
FirehoseCommit.Repo: Did
RepoOp.Action: CommitOperation
RepoOp.Path: string
union case FirehoseEvent.FirehoseInfoEvent: FirehoseInfo -> FirehoseEvent
val info: FirehoseInfo
FirehoseInfo.Name: string
union case FirehoseEvent.FirehoseErrorEvent: error: string * message: string option -> FirehoseEvent
val error: string
val message: string option
CancellationTokenSource.CancelAfter(delay: System.TimeSpan) : unit
CancellationTokenSource.CancelAfter(millisecondsDelay: int) : unit

Type something to start searching.