Skip to main content
Most custodians should use scheduled downloads — see Audit Exports. This page is for the minority of custodians who need programmatic access: custom compliance rails, real-time accounting reconciliation, or downstream systems that can’t consume batch exports.
Musubi’s backend exposes REST + SSE APIs that you can integrate directly. This page covers authentication, the proposal/quote workflow programmatically (as an alternative to the Console), and real-time settlement events.

When to use this

  • Your compliance archive is real-time (event-driven, not batch-ingested)
  • Your accounting system reconciles continuously and can’t wait for daily exports
  • You’re automating the authorization decision itself behind a risk engine
  • You need sub-second settlement notifications (most custodians do not)
If none of those apply, use the Console + Audit Exports — the integration is less work and covers the same ground.

Authentication

JWT bearer token, refreshed roughly every hour. Full details in Authentication. Typical pattern: cache the token, refresh ~60 s before expiry, retry once on 401 after forcing a refresh.
// Token provider — caches the JWT and refreshes on expiry.
@Component
public class MusubiAuthProvider {
  private final WebClient auth;
  private final AtomicReference<Token> cached = new AtomicReference<>();

  public record Token(String value, Instant expiresAt) {}

  public MusubiAuthProvider(WebClient.Builder b, @Value("${musubi.backend-url}") String url) {
    this.auth = b.baseUrl(url).build();
  }

  public String token() {
    var t = cached.get();
    if (t != null && Instant.now().isBefore(t.expiresAt().minusSeconds(60))) return t.value();
    return refresh();
  }

  private synchronized String refresh() {
    var t = auth.post().uri("/auth/token")
        .bodyValue(Map.of(/* your IdP credentials */))
        .retrieve().bodyToMono(Token.class).block();
    cached.set(t);
    return t.value();
  }
}

Proposal + quote workflow via REST

EndpointPurpose
GET /api/v1/orders?status=PENDINGList incoming proposals awaiting your accept/reject
POST /api/v1/orders/{intent_id}/acceptSender custodian accepts the proposal — proceeds to quoting
GET /api/v1/orders/{intent_id}/quotesReview competing market maker quotes
POST /api/v1/orders/{intent_id}/quotes/{quote_id}/acceptSender custodian co-signs the chosen quote
GET /api/v1/orders/events (SSE)Real-time order_updated and quote_received events
Full endpoint shapes, request/response examples: API Reference.
// Typed DTOs. Match the shapes in /custodian/api-reference.
public record ApiResponse<T>(T data) {}
public record OrderDto(
    String intentId, String status, String senderPartyId,
    String sourceCurrency, String targetCurrency, String targetAmount,
    String sourceAmountMax, Instant createdAt, Instant expiresAt) {}
public record QuoteDto(
    String quoteId, String intentId, String marketMakerPartyId,
    String fxRate, String sourceAmount, String targetAmount,
    Instant submittedAt, Instant validUntil, String status) {}
// Service — list proposals, review quotes, accept.
@Service
public class MusubiCustodianClient {
  private final WebClient rest;

  public MusubiCustodianClient(WebClient.Builder b,
                               @Value("${musubi.backend-url}") String url,
                               MusubiAuthProvider auth) {
    this.rest = b.baseUrl(url)
        .filter((req, next) -> next.exchange(
            ClientRequest.from(req).headers(h -> h.setBearerAuth(auth.token())).build()))
        .build();
  }

  public List<OrderDto> listPending() {
    return rest.get().uri("/api/v1/orders?status=PENDING")
        .retrieve()
        .bodyToMono(new ParameterizedTypeReference<ApiResponse<List<OrderDto>>>() {})
        .map(ApiResponse::data)
        .block();
  }

  public List<QuoteDto> listQuotes(String intentId) {
    return rest.get().uri("/api/v1/orders/{id}/quotes", intentId)
        .retrieve()
        .bodyToMono(new ParameterizedTypeReference<ApiResponse<List<QuoteDto>>>() {})
        .map(ApiResponse::data)
        .block();
  }

  public OrderDto acceptProposal(String intentId) {
    return rest.post().uri("/api/v1/orders/{id}/accept", intentId)
        .retrieve()
        .bodyToMono(new ParameterizedTypeReference<ApiResponse<OrderDto>>() {})
        .map(ApiResponse::data)
        .block();
  }

  public OrderDto acceptQuote(String intentId, String quoteId) {
    return rest.post().uri("/api/v1/orders/{id}/quotes/{qid}/accept", intentId, quoteId)
        .retrieve()
        .bodyToMono(new ParameterizedTypeReference<ApiResponse<OrderDto>>() {})
        .map(ApiResponse::data)
        .block();
  }
}
Error handling worth adding: catch WebClientResponseException.Unauthorized on 401 → force-refresh the token and retry once; surface Conflict on 409 (expired quote, order already advanced) back to whatever subsystem triggered the call.

Real-time settlement events via SSE

public record OrderEvent(
    String intentId, String status, String transactionHash,
    Instant settledAt, String sourceAmountActual, String targetAmount,
    String fxRate) {}
@Component
public class MusubiSettlementListener {
  private static final Logger log = LoggerFactory.getLogger(MusubiSettlementListener.class);

  private final WebClient rest;
  private final AccountingService accounting;
  private final ComplianceArchive archive;

  public MusubiSettlementListener(WebClient.Builder b,
                                  @Value("${musubi.backend-url}") String url,
                                  MusubiAuthProvider auth,
                                  AccountingService accounting,
                                  ComplianceArchive archive) {
    this.rest = b.baseUrl(url)
        .filter((req, next) -> next.exchange(
            ClientRequest.from(req).headers(h -> h.setBearerAuth(auth.token())).build()))
        .build();
    this.accounting = accounting;
    this.archive = archive;
  }

  @EventListener(ApplicationReadyEvent.class)
  public void subscribe() {
    var type = new ParameterizedTypeReference<ServerSentEvent<OrderEvent>>() {};
    rest.get().uri("/api/v1/orders/events")
        .accept(MediaType.TEXT_EVENT_STREAM)
        .retrieve()
        .bodyToFlux(type)
        .retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(1))
            .maxBackoff(Duration.ofSeconds(30))
            .doBeforeRetry(s -> log.warn("SSE disconnected, retrying: {}", s.failure().toString())))
        .filter(sse -> "order_updated".equals(sse.event()))
        .mapNotNull(ServerSentEvent::data)
        .filter(ev -> "SETTLED".equals(ev.status()))
        .doOnNext(this::handle)
        .subscribe();
  }

  private void handle(OrderEvent ev) {
    log.info("Settled {} at {} (tx={})", ev.intentId(), ev.settledAt(), ev.transactionHash());
    accounting.reconcile(ev.intentId(), ev.transactionHash(), ev.settledAt(),
                         ev.sourceAmountActual(), ev.targetAmount(), ev.fxRate());
    archive.store(ev);
  }
}

Production notes

  • Idempotency. Handle SETTLED events idempotently — SSE reconnects can replay the latest event. Key your accounting write by intentId + transactionHash.
  • Ordering. order_updated events are per-contract — you may see EXECUTING then SETTLED for the same intent. Filter to SETTLED only if that’s the only state your accounting cares about.
  • Backpressure. If your downstream is slow (compliance archive is commonly the bottleneck), switch .doOnNext to .concatMap(ev -> Mono.fromCallable(() -> handle(ev)).subscribeOn(Schedulers.boundedElastic())) so slow handlers don’t starve the event loop.

Other stacks

Same shape, different clients:
  • Nodefetch for REST; EventSource (native) or eventsource package for SSE
  • Pythonhttpx or requests for REST; httpx-sse or sseclient-py for SSE
  • Kotlin — same WebClient as Java; or ktor-client-cio + SSE
The pattern in all cases: authenticate with JWT, filter SSE to order_updated with status == SETTLED, dispatch to your downstream handlers with idempotency keyed on intentId + transactionHash.