Streaming
The FSharp.ATProto.Streaming package provides real-time event streams over WebSocket. Two protocols are supported: Jetstream (JSON, simpler) and Firehose (CBOR, full fidelity). Both return IAsyncEnumerable<Result<Event, StreamError>> for incremental consumption.
open FSharp.ATProto.Streaming
open FSharp.ATProto.Syntax
open System.Threading
Jetstream
Jetstream is a JSON-based relay that provides a filtered, lightweight view of the AT Protocol event stream. It is the recommended starting point for most applications.
Key Types
Type |
Description |
|---|---|
|
|
|
Commit details: |
|
|
|
Connection config: |
|
|
Functions
Function |
Description |
|---|---|
|
Subscribe to events; returns |
|
Parse a raw JSON message into a |
|
Build the WebSocket URI from options |
|
Default options pointing at |
Subscribing to Posts
let cts = new CancellationTokenSource()
let options =
{ Jetstream.defaultOptions with
WantedCollections = [ "app.bsky.feed.post" ] }
let events = Jetstream.subscribe options cts.Token
task {
let enumerator = events.GetAsyncEnumerator(cts.Token)
while! enumerator.MoveNextAsync() do
match enumerator.Current with
| Ok (CommitEvent (did, _timeUs, info)) when info.Operation = Create ->
printfn "New post from %s (rkey: %s)" (Did.value did) info.Rkey
| Ok (IdentityEvent (did, _timeUs, identity)) ->
printfn "Identity update: %s" (Did.value identity.Did)
| Ok (AccountEvent (did, _timeUs, account)) ->
printfn "Account %s active=%b" (Did.value did) account.Active
| Error StreamError.Closed ->
printfn "Stream closed"
| Error e ->
printfn "Error: %A" e
| _ -> ()
// Cancel to disconnect
cts.Cancel()
}
Filtering by DID
To receive events only for specific accounts, set WantedDids:
{ Jetstream.defaultOptions with
WantedDids = [ "did:plc:abc123"; "did:plc:def456" ] }
Resuming from a Cursor
Jetstream cursors are microsecond timestamps. To resume from where you left off:
{ Jetstream.defaultOptions with
Cursor = Some 1709312400000000L }
Firehose
The Firehose provides the full-fidelity CBOR event stream from com.atproto.sync.subscribeRepos. It includes raw CAR blocks and is suitable for indexers and relay operators.
Key Types
Type |
Description |
|---|---|
|
|
|
Full commit: |
|
Operation within a commit: |
|
Parsed CAR v1: |
|
Connection config: |
Functions
Function |
Description |
|---|---|
|
Subscribe to events; returns |
|
Parse a raw CBOR frame into a |
|
Build the WebSocket URI from options |
|
Default options pointing at |
Subscribing to the Firehose
let firehoseOptions =
{ Firehose.defaultOptions with
Cursor = None }
let firehoseEvents = Firehose.subscribe firehoseOptions cts2.Token
task {
let enumerator = firehoseEvents.GetAsyncEnumerator(cts2.Token)
while! enumerator.MoveNextAsync() do
match enumerator.Current with
| Ok (FirehoseCommitEvent commit) ->
for op in commit.Ops do
printfn "[%d] %s %A %s" commit.Seq (Did.value commit.Repo) op.Action op.Path
| Ok (FirehoseInfoEvent info) ->
printfn "Info: %s" info.Name
| Ok (FirehoseErrorEvent (error, message)) ->
printfn "Error: %s %A" error message
| Error e ->
printfn "Stream error: %A" e
| _ -> ()
cts2.Cancel()
}
Graceful Shutdown
Both subscribe functions respect CancellationToken. Cancel the token to close the WebSocket connection cleanly. After cancellation, MoveNextAsync returns false and the enumerator disposes the underlying WebSocket.
// Run for 30 seconds then stop
cts.CancelAfter(30_000)
namespace FSharp
--------------------
namespace Microsoft.FSharp
type CancellationTokenSource = interface IDisposable new: unit -> unit + 3 overloads member Cancel: unit -> unit + 1 overload member CancelAfter: millisecondsDelay: int -> unit + 1 overload member CancelAsync: unit -> Task member Dispose: unit -> unit member TryReset: unit -> bool static member CreateLinkedTokenSource: token: CancellationToken -> CancellationTokenSource + 3 overloads member IsCancellationRequested: bool member Token: CancellationToken
<summary>Signals to a <see cref="T:System.Threading.CancellationToken" /> that it should be canceled.</summary>
--------------------
CancellationTokenSource() : CancellationTokenSource
CancellationTokenSource(millisecondsDelay: int) : CancellationTokenSource
CancellationTokenSource(delay: System.TimeSpan) : CancellationTokenSource
CancellationTokenSource(delay: System.TimeSpan, timeProvider: System.TimeProvider) : CancellationTokenSource
<summary> Default Jetstream options pointing at the US-East relay. </summary>
<summary> Subscribe to a Jetstream relay. Returns an IAsyncEnumerable that yields events until the connection closes or the CancellationToken is triggered. On disconnect the final element will be Error Closed. </summary>
<summary>Gets the <see cref="T:System.Threading.CancellationToken" /> associated with this <see cref="T:System.Threading.CancellationTokenSource" />.</summary>
<exception cref="T:System.ObjectDisposedException">The token source has been disposed.</exception>
<returns>The <see cref="T:System.Threading.CancellationToken" /> associated with this <see cref="T:System.Threading.CancellationTokenSource" />.</returns>
<summary>Gets the element in the collection at the current position of the enumerator.</summary>
<returns>The element in the collection at the current position of the enumerator.</returns>
module Did from FSharp.ATProto.Syntax
<summary> Functions for creating, validating, and extracting data from <see cref="Did" /> values. </summary>
--------------------
type Did = private | Did of string override ToString: unit -> string
<summary> A decentralized identifier (DID) as defined by the AT Protocol. DIDs are the primary stable identifier for accounts. Two methods are currently supported: <c>did:plc:</c> (hosted, managed by PLC directory) and <c>did:web:</c> (self-hosted, DNS-based). </summary>
<remarks> See the AT Protocol specification: https://atproto.com/specs/did and the W3C DID specification: https://www.w3.org/TR/did-core/ </remarks>
<summary> Extract the string representation of a DID. </summary>
<param name="did">The DID to extract the value from.</param>
<returns>The full DID string (e.g. <c>"did:plc:z72i7hdynmk6r22z27h6tvur"</c>).</returns>
<summary> Errors that can occur during Jetstream event streaming. </summary>
CancellationTokenSource.Cancel(throwOnFirstException: bool) : unit
<summary> Default firehose options pointing at the main Bluesky relay. </summary>
<summary> Subscribe to the AT Protocol firehose. Returns an IAsyncEnumerable that yields parsed events until the connection closes or the CancellationToken is triggered. </summary>
<summary>Gets the element in the collection at the current position of the enumerator.</summary>
<returns>The element in the collection at the current position of the enumerator.</returns>
CancellationTokenSource.CancelAfter(millisecondsDelay: int) : unit