@k-msg/messaging
High-level messaging facade for k-msg.
This package provides KMsg, which normalizes user input, routes to a provider, and returns a Result.
Installation
Section titled “Installation”npm install @k-msg/messaging @k-msg/core# orbun add @k-msg/messaging @k-msg/coreRuntime Adapters
Section titled “Runtime Adapters”@k-msg/messaging root export is runtime-neutral.
- Bun runtime adapters:
@k-msg/messaging/adapters/bunBunSqlDeliveryTrackingStore,SqliteDeliveryTrackingStore,SQLiteJobQueue
- Node runtime adapters:
@k-msg/messaging/adapters/nodeDeliveryTracker,JobProcessor,MessageJobProcessor,MessageRetryHandler
- Cloudflare runtime adapters:
@k-msg/messaging/adapters/cloudflare- SQL adapters for Hyperdrive/Postgres/MySQL and D1 (driver-injected)
- Drizzle-wrapped SQL client/store factories
- SQL/Drizzle schema generators
- Object-storage adapters for KV/R2/DO-backed tracking/queue
Migration (Breaking)
Section titled “Migration (Breaking)”| Old import (removed from root) | New import |
|---|---|
BunSqlDeliveryTrackingStore | @k-msg/messaging/adapters/bun |
SqliteDeliveryTrackingStore | @k-msg/messaging/adapters/bun |
SQLiteJobQueue | @k-msg/messaging/adapters/bun |
JobProcessor / MessageJobProcessor | @k-msg/messaging/adapters/node |
MessageRetryHandler | @k-msg/messaging/adapters/node |
createDeliveryTrackingHooks / DeliveryTrackingService / InMemoryDeliveryTrackingStore | @k-msg/messaging/tracking |
BulkMessageSender | @k-msg/messaging/sender |
Job / JobQueue / JobStatus | @k-msg/messaging/queue |
VariableReplacer / VariableUtils / defaultVariableReplacer | @k-msg/template (TemplatePersonalizer / TemplateVariableUtils / defaultTemplatePersonalizer) |
JobProcessor and MessageJobProcessor now require explicit jobQueue injection.
MessageRetryHandler is an application-level retry orchestrator for provider gaps. You must supply an execute(attempt, item) callback that performs the real resend logic, and the handler will only manage retry timing, policy, and queue state.
import { MessageRetryHandler } from "@k-msg/messaging/adapters/node";
const retryHandler = new MessageRetryHandler({ policy: { maxAttempts: 3, backoffMultiplier: 2, initialDelay: 5000, maxDelay: 300000, jitter: true, retryableStatuses: ["FAILED"], retryableErrorCodes: ["NETWORK_TIMEOUT"], }, checkInterval: 1000, maxQueueSize: 1000, execute: async (attempt) => { return await resendMessage(attempt.messageId); },});Quick Start
Section titled “Quick Start”import { KMsg } from "@k-msg/messaging";import { SolapiProvider } from "@k-msg/provider/solapi";
const kmsg = new KMsg({ providers: [ new SolapiProvider({ apiKey: process.env.SOLAPI_API_KEY!, apiSecret: process.env.SOLAPI_API_SECRET!, defaultFrom: "01000000000", }), ], defaults: { sms: { autoLmsBytes: 90 }, },});
// Default SMS (type omitted). If the content is long, it can auto-upgrade to LMS.await kmsg.send({ to: "01012345678", text: "hello" });
// Explicit typed sendawait kmsg.send({ type: "ALIMTALK", to: "01012345678", templateId: "AUTH_OTP", variables: { code: "123456" },});Routing
Section titled “Routing”import { KMsg } from "@k-msg/messaging";import { IWINVProvider } from "@k-msg/provider";import { SolapiProvider } from "@k-msg/provider/solapi";
const kmsg = new KMsg({ providers: [ new IWINVProvider({ apiKey: process.env.IWINV_API_KEY!, baseUrl: "https://alimtalk.bizservice.iwinv.kr", smsApiKey: process.env.IWINV_SMS_API_KEY, smsAuthKey: process.env.IWINV_SMS_AUTH_KEY, }), new SolapiProvider({ apiKey: process.env.SOLAPI_API_KEY!, apiSecret: process.env.SOLAPI_API_SECRET!, defaultFrom: "01000000000", kakaoPfId: process.env.SOLAPI_KAKAO_PF_ID, rcsBrandId: process.env.SOLAPI_RCS_BRAND_ID, }), ], routing: { defaultProviderId: "solapi", byType: { ALIMTALK: "iwinv", SMS: ["solapi"], }, strategy: "first", },});Bulk Sending
Section titled “Bulk Sending”Use sendMany() for controlled concurrency.
const results = await kmsg.sendMany( [ { to: "01011112222", text: "hello 1" }, { to: "01033334444", text: "hello 2" }, ], { concurrency: 10 },);Delivery Tracking (PULL)
Section titled “Delivery Tracking (PULL)”After a message is accepted by a provider (including scheduled sends), you can poll provider status APIs to reconcile delivery state and update your internal records.
DeliveryTrackingService is storage-backed and supports:
- In-memory store (runtime-neutral default)
- SQLite/Bun.SQL via
@k-msg/messaging/adapters/bun - Cloudflare SQL/KV/R2/DO via
@k-msg/messaging/adapters/cloudflare
import { createDeliveryTrackingHooks, DeliveryTrackingService, InMemoryDeliveryTrackingStore,} from "@k-msg/messaging/tracking";import { KMsg } from "@k-msg/messaging";import { SolapiProvider } from "@k-msg/provider/solapi";
const providers = [ new SolapiProvider({ apiKey: process.env.SOLAPI_API_KEY!, apiSecret: process.env.SOLAPI_API_SECRET!, }),];
const tracking = new DeliveryTrackingService({ providers, store: new InMemoryDeliveryTrackingStore(),});
const kmsg = new KMsg({ providers, hooks: createDeliveryTrackingHooks(tracking),});
// On successful send, providerMessageId is recorded into the tracking store.await kmsg.send({ to: "01012345678", text: "hello" });
// Run as a cron/worker looptracking.start();// or single pass (manual/cron)await tracking.runOnce();Bun SQLite Example
Section titled “Bun SQLite Example”import { DeliveryTrackingService } from "@k-msg/messaging/tracking";import { SqliteDeliveryTrackingStore } from "@k-msg/messaging/adapters/bun";
const tracking = new DeliveryTrackingService({ providers, store: new SqliteDeliveryTrackingStore({ dbPath: "./kmsg.sqlite" }),});Cloudflare D1/KV/R2/DO Example
Section titled “Cloudflare D1/KV/R2/DO Example”import { DeliveryTrackingService } from "@k-msg/messaging/tracking";import { createD1DeliveryTrackingStore, createKvDeliveryTrackingStore,} from "@k-msg/messaging/adapters/cloudflare";
// D1const d1Store = createD1DeliveryTrackingStore(env.DB);
// KV (or use createR2DeliveryTrackingStore / createDurableObjectDeliveryTrackingStore)const kvStore = createKvDeliveryTrackingStore(env.KMSG_KV);
const tracking = new DeliveryTrackingService({ providers, store: d1Store, // swap to kvStore as needed});SQL Schema (D1/Postgres/MySQL)
Section titled “SQL Schema (D1/Postgres/MySQL)”createD1DeliveryTrackingStore() and HyperdriveDeliveryTrackingStore share the same logical table/index schema.
DeliveryTrackingService.init() creates these automatically.
Tracking table/index defaults are generated from the adapter schema spec:
- Tracking table default:
kmsg_delivery_tracking(override withtableName) - Primary key:
message_id - Core columns:
provider_id,provider_message_id,type,to,from,status - Time columns:
requested_at,status_updated_at,next_check_at,sent_at,delivered_at,failed_at,last_checked_at,scheduled_at - Meta columns:
attempt_count,provider_status_code,provider_status_message,last_error,metadata rawcolumn is disabled by default (storeRaw: false) and enabled only whenstoreRaw: trueis set.- Index:
idx_kmsg_delivery_due(status, next_check_at) - Index:
idx_kmsg_delivery_provider_msg(provider_id, provider_message_id) - Index:
idx_kmsg_delivery_requested_at(requested_at)
Notes by dialect:
- D1 (SQLite): JSON fields are stored as
TEXT - Postgres: JSON fields are stored as
JSONB - MySQL: identifier type differs (
VARCHAR), JSON fields currently stored asTEXT
Queue table (when using HyperdriveJobQueue / createD1JobQueue): kmsg_jobs
- Primary key:
id - Main columns:
type,data,status,priority,attempts,max_attempts,delay - Time columns:
created_at,process_at,completed_at,failed_at - Meta columns:
error,metadata
Queue indexes:
idx_kmsg_jobs_dequeue(status, priority, process_at, created_at)idx_kmsg_jobs_id(id)
Schema Utility API (Cloudflare Adapter)
Section titled “Schema Utility API (Cloudflare Adapter)”import { buildCloudflareSqlSchemaSql, buildDeliveryTrackingSchemaSql, buildJobQueueSchemaSql, initializeCloudflareSqlSchema, renderDrizzleSchemaSource,} from "@k-msg/messaging/adapters/cloudflare";
// SQL DDL stringconst ddl = buildCloudflareSqlSchemaSql({ dialect: "postgres", target: "both",});
// Idempotent runtime initializer (duplicate/exists errors are ignored only for that case)await initializeCloudflareSqlSchema(client, { target: "both" });
// Drizzle schema source (TypeScript string)const drizzleSource = renderDrizzleSchemaSource({ dialect: "postgres", target: "both",});Drizzle Adapter Factories
Section titled “Drizzle Adapter Factories”import { createDrizzleDeliveryTrackingStore, createDrizzleJobQueue,} from "@k-msg/messaging/adapters/cloudflare";
const trackingStore = createDrizzleDeliveryTrackingStore({ dialect: "postgres", db, // drizzle db with execute()/transaction()});
const queue = createDrizzleJobQueue({ dialect: "postgres", db,});Tracking Schema Customization
Section titled “Tracking Schema Customization”storeRaw defaults to false. Enable it only when you explicitly need provider raw payload storage.
import { buildDeliveryTrackingSchemaSql, createD1DeliveryTrackingStore, getDeliveryTrackingSchemaSpec,} from "@k-msg/messaging/adapters/cloudflare";
const trackingOptions = { tableName: "otp_delivery_tracking", columnMap: { messageId: "id", nextCheckAt: "next_check_at_ms", }, typeStrategy: { messageId: "uuid", timestamp: "integer", }, storeRaw: true,} as const;
const store = createD1DeliveryTrackingStore(env.DB, trackingOptions);const ddl = buildDeliveryTrackingSchemaSql({ dialect: "postgres", ...trackingOptions,});const spec = getDeliveryTrackingSchemaSpec(trackingOptions);Supported Drizzle Versions
Section titled “Supported Drizzle Versions”@k-msg/messaging | Supported drizzle-orm |
|---|---|
0.19.x | `^0.44.0 |
Compatibility for this line is validated in CI against the drizzle-compat matrix:
drizzle-orm@0.44.7drizzle-orm@0.45.1drizzle-orm@beta
Tracking-based API failover
Section titled “Tracking-based API failover”When provider-native ALIMTALK failover is unsupported or partial, you can enable tracking-based API failover.
- Triggers only for
ALIMTALKwithfailover.enabled === true - Triggers only when tracking status is
FAILEDand classified as non-Kakao-user failure - Attempts fallback exactly once per original message
- Requires providers with
getDeliveryStatus()support
import { createDeliveryTrackingHooks, DeliveryTrackingService,} from "@k-msg/messaging/tracking";import { KMsg } from "@k-msg/messaging";import { SolapiProvider } from "@k-msg/provider/solapi";
const providers = [ new SolapiProvider({ apiKey: process.env.SOLAPI_API_KEY!, apiSecret: process.env.SOLAPI_API_SECRET!, defaultFrom: "01000000000", }),];
let kmsg!: KMsg;const tracking = new DeliveryTrackingService({ providers, apiFailover: { // Re-send fallback SMS/LMS through the same KMsg pipeline sender: (input) => kmsg.send(input), },});
kmsg = new KMsg({ providers, hooks: createDeliveryTrackingHooks(tracking),});
await kmsg.send({ type: "ALIMTALK", to: "01012345678", templateId: "AUTH_OTP", variables: { code: "123456" }, failover: { enabled: true, fallbackChannel: "sms", fallbackContent: "[안내] 카카오톡 미사용자로 SMS 대체 발송", },});