GunDB Integration with Fiber: Building fibergun
The modern web is moving towards decentralization, and traditional centralized databases are increasingly showing their limitations. Data silos, single points of failure, and vendor lock-in are pushing developers to seek alternatives that offer true data ownership and resilience. This is where GunDB shines – a decentralized, real-time database that enables applications to work offline-first while maintaining eventual consistency across distributed peers.
Fibergun was born from our need to bring GunDB’s revolutionary decentralized database capabilities to Fiber applications. This comprehensive post details our complete technical implementation, the architectural decisions we made, and the challenges we faced in creating a seamless integration between these two powerful technologies.
The Centralization Problem
Before diving into our solution, it’s important to understand why traditional centralized databases are becoming inadequate for modern applications:
Single Points of Failure: Centralized databases create bottlenecks and vulnerability points. When the central server goes down, the entire application becomes unavailable, regardless of how many users are online and could potentially share data with each other.
Data Ownership Issues: Users don’t truly own their data when it’s stored in centralized systems. Companies can change terms of service, go out of business, or restrict access, leaving users without their information.
Scalability Limitations: Traditional databases require expensive infrastructure scaling, often leading to vendor lock-in and increasing costs as applications grow.
Network Dependency: Users must be online and connected to the central server to access their data, making applications unusable in areas with poor connectivity.
Privacy Concerns: Centralized systems create attractive targets for hackers and government surveillance, compromising user privacy.
Why GunDB?
GunDB addresses these fundamental issues through its unique architecture and design principles:
Real-time Data Synchronization
GunDB provides instant data synchronization across all connected peers without requiring a central authority. Changes propagate automatically through the network, ensuring that all users see updates in real-time, regardless of which peer initiated the change.
Offline-first Capabilities
Applications built with GunDB work seamlessly offline. Users can read, write, and modify data locally, and all changes automatically synchronize when connectivity is restored. This offline-first approach ensures applications remain functional regardless of network conditions.
Graph-based Data Structure
Unlike traditional relational or document databases, GunDB uses a graph-based structure that naturally represents relationships between data. This approach provides more flexibility in data modeling and enables powerful querying capabilities.
Peer-to-peer Networking Support
GunDB’s built-in P2P networking eliminates the need for centralized servers. Peers can connect directly to each other, creating resilient networks that become stronger as more users join.
Cryptographic Security
GunDB incorporates cryptographic principles to ensure data integrity and enable user authentication without central authorities. Each piece of data is cryptographically signed, providing tamper-proof storage.
Understanding GunDB’s Architecture
Before implementing our Fiber integration, we need to understand how GunDB works at a fundamental level:
The Graph Structure
GunDB stores all data as a graph where each node represents a piece of data and edges represent relationships. This structure is more flexible than traditional schemas and allows for dynamic data modeling:
// Traditional JSON structure
{
"users": {
"john": {
"name": "John Doe",
"email": "[email protected]",
"posts": ["post1", "post2"]
}
}
}
// GunDB graph structure
{
"users": {"john": {"#": "user_john_ref"}},
"user_john_ref": {
"name": "John Doe",
"email": "[email protected]",
"posts": {"#": "posts_ref"}
},
"posts_ref": {
"post1": {"#": "post1_ref"},
"post2": {"#": "post2_ref"}
}
}
Conflict Resolution
GunDB uses a conflict-free replicated data type (CRDT) approach to handle conflicts when multiple peers modify the same data simultaneously. The resolution algorithm prioritizes the most recent timestamp while maintaining consistency across all peers.
Storage Layers
GunDB supports multiple storage layers that can be combined:
- In-memory storage for fast access
- Local file system storage for persistence
- Cloud storage (S3, etc.) for backup and scaling
- Custom storage adapters for specific needs
Technical Implementation
Now let’s dive into the actual implementation of our Fiber-GunDB integration. Our goal was to create a middleware that would be intuitive for Fiber developers while providing full access to GunDB’s capabilities.
1. Core Architecture and Design Patterns
The foundation of fibergun is built on several key design patterns that ensure scalability and maintainability:
// Core interfaces that define our abstractions
type GunDB interface {
Get(path string) Node
Put(path string, data interface{}) error
On(path string, callback func(data interface{}, key string)) Subscription
Off(subscription Subscription)
Map(path string) MapNode
Set(path string) SetNode
Close() error
}
type Node interface {
Val() interface{}
Put(data interface{}) error
On(callback func(data interface{}, key string)) Subscription
Off(subscription Subscription)
Map() MapNode
Set() SetNode
}
// Main GunDB implementation
type gunDB struct {
peers []string
graph *Graph
options *Options
handlers map[string]Handler
subscriptions *SubscriptionManager
storage []StorageAdapter
eventBus *EventBus
metrics *Metrics
}
type Options struct {
Peers []string // List of peer addresses
File string // Local file storage path
Radisk bool // Enable local disk storage
S3 *S3Config // S3 storage configuration
WebSocket bool // Enable WebSocket transport
Port int // Port for peer connections
Timeout time.Duration // Connection timeout
MaxPeers int // Maximum number of peer connections
StorageQuota int64 // Storage quota in bytes
LogLevel LogLevel // Logging level
}
func NewGunDB(opts *Options) (GunDB, error) {
if opts == nil {
opts = &Options{}
}
// Set defaults
if opts.Timeout == 0 {
opts.Timeout = 30 * time.Second
}
if opts.MaxPeers == 0 {
opts.MaxPeers = 100
}
if opts.Port == 0 {
opts.Port = 8080
}
gun := &gunDB{
options: opts,
handlers: make(map[string]Handler),
subscriptions: NewSubscriptionManager(),
eventBus: NewEventBus(),
metrics: NewMetrics(),
}
// Initialize graph
gun.graph = NewGraph()
// Setup storage layers
if err := gun.setupStorage(); err != nil {
return nil, fmt.Errorf("failed to setup storage: %w", err)
}
// Initialize peer connections
if err := gun.initializePeers(); err != nil {
return nil, fmt.Errorf("failed to initialize peers: %w", err)
}
return gun, nil
}
2. Storage Layer Implementation
One of GunDB’s key strengths is its flexible storage system. Our implementation supports multiple storage backends that can be used simultaneously:
type StorageAdapter interface {
Get(key string) ([]byte, error)
Put(key string, value []byte) error
Del(key string) error
List(prefix string) ([]string, error)
Close() error
}
// File system storage adapter
type FileSystemStorage struct {
basePath string
mutex sync.RWMutex
}
func NewFileSystemStorage(basePath string) (*FileSystemStorage, error) {
if err := os.MkdirAll(basePath, 0755); err != nil {
return nil, err
}
return &FileSystemStorage{
basePath: basePath,
}, nil
}
func (fs *FileSystemStorage) Get(key string) ([]byte, error) {
fs.mutex.RLock()
defer fs.mutex.RUnlock()
filePath := filepath.Join(fs.basePath, fs.keyToPath(key))
return os.ReadFile(filePath)
}
func (fs *FileSystemStorage) Put(key string, value []byte) error {
fs.mutex.Lock()
defer fs.mutex.Unlock()
filePath := filepath.Join(fs.basePath, fs.keyToPath(key))
if err := os.MkdirAll(filepath.Dir(filePath), 0755); err != nil {
return err
}
return os.WriteFile(filePath, value, 0644)
}
// S3 storage adapter for cloud backup
type S3Storage struct {
client *s3.Client
bucket string
prefix string
}
func NewS3Storage(config *S3Config) (*S3Storage, error) {
cfg, err := awsconfig.LoadDefaultConfig(context.TODO(),
awsconfig.WithRegion(config.Region),
)
if err != nil {
return nil, err
}
return &S3Storage{
client: s3.NewFromConfig(cfg),
bucket: config.Bucket,
prefix: config.Prefix,
}, nil
}
func (s3s *S3Storage) Get(key string) ([]byte, error) {
result, err := s3s.client.GetObject(context.TODO(), &s3.GetObjectInput{
Bucket: aws.String(s3s.bucket),
Key: aws.String(s3s.prefix + key),
})
if err != nil {
return nil, err
}
defer result.Body.Close()
return io.ReadAll(result.Body)
}
func (s3s *S3Storage) Put(key string, value []byte) error {
_, err := s3s.client.PutObject(context.TODO(), &s3.PutObjectInput{
Bucket: aws.String(s3s.bucket),
Key: aws.String(s3s.prefix + key),
Body: bytes.NewReader(value),
})
return err
}
3. WebSocket Transport Implementation
Real-time synchronization is crucial for GunDB’s functionality. Our WebSocket implementation provides efficient, bidirectional communication between peers:
type WebSocketTransport struct {
connections map[string]*WebSocketConnection
upgrader websocket.Upgrader
gun *gunDB
mutex sync.RWMutex
}
type WebSocketConnection struct {
conn *websocket.Conn
peerID string
gun *gunDB
sendQueue chan []byte
recvQueue chan []byte
closeChan chan struct{}
metrics *ConnectionMetrics
}
func (g *gunDB) setupWebSocket() error {
if !g.options.WebSocket {
return nil
}
transport := &WebSocketTransport{
connections: make(map[string]*WebSocketConnection),
upgrader: websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
// Implement proper origin checking in production
return true
},
ReadBufferSize: 1024,
WriteBufferSize: 1024,
},
gun: g,
}
g.wsTransport = transport
return nil
}
func (wst *WebSocketTransport) HandleConnection(c *fiber.Ctx) error {
conn, err := wst.upgrader.Upgrade(c.Response(), c.Request(), nil)
if err != nil {
return err
}
peerID := generatePeerID()
wsConn := &WebSocketConnection{
conn: conn,
peerID: peerID,
gun: wst.gun,
sendQueue: make(chan []byte, 1000),
recvQueue: make(chan []byte, 1000),
closeChan: make(chan struct{}),
metrics: NewConnectionMetrics(),
}
wst.mutex.Lock()
wst.connections[peerID] = wsConn
wst.mutex.Unlock()
// Start connection handlers
go wsConn.handleSend()
go wsConn.handleReceive()
go wsConn.handleMessages()
// Wait for connection to close
<-wsConn.closeChan
// Cleanup
wst.mutex.Lock()
delete(wst.connections, peerID)
wst.mutex.Unlock()
return nil
}
func (wsc *WebSocketConnection) handleReceive() {
defer close(wsc.recvQueue)
for {
select {
case <-wsc.closeChan:
return
default:
messageType, data, err := wsc.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("WebSocket error: %v", err)
}
close(wsc.closeChan)
return
}
if messageType == websocket.TextMessage {
wsc.metrics.MessagesReceived++
wsc.metrics.BytesReceived += int64(len(data))
select {
case wsc.recvQueue <- data:
case <-wsc.closeChan:
return
}
}
}
}
}
func (wsc *WebSocketConnection) handleSend() {
ticker := time.NewTicker(54 * time.Second) // WebSocket ping interval
defer ticker.Stop()
for {
select {
case <-wsc.closeChan:
return
case <-ticker.C:
if err := wsc.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
close(wsc.closeChan)
return
}
case message := <-wsc.sendQueue:
if err := wsc.conn.WriteMessage(websocket.TextMessage, message); err != nil {
close(wsc.closeChan)
return
}
wsc.metrics.MessagesSent++
wsc.metrics.BytesSent += int64(len(message))
}
}
}
4. Data Synchronization Engine
The heart of GunDB is its synchronization engine, which ensures data consistency across all peers:
type SyncEngine struct {
gun *gunDB
pendingWrites map[string]*PendingWrite
conflictQueue chan *ConflictEvent
syncQueue chan *SyncEvent
peerStates map[string]*PeerState
mutex sync.RWMutex
}
type PendingWrite struct {
Key string
Value interface{}
Timestamp int64
PeerID string
Retries int
}
type ConflictEvent struct {
Key string
LocalVal interface{}
RemoteVal interface{}
LocalTime int64
RemoteTime int64
PeerID string
}
type SyncEvent struct {
Type SyncEventType
Key string
Value interface{}
Timestamp int64
PeerID string
}
func NewSyncEngine(gun *gunDB) *SyncEngine {
se := &SyncEngine{
gun: gun,
pendingWrites: make(map[string]*PendingWrite),
conflictQueue: make(chan *ConflictEvent, 1000),
syncQueue: make(chan *SyncEvent, 1000),
peerStates: make(map[string]*PeerState),
}
// Start sync processors
go se.processConflicts()
go se.processSyncEvents()
go se.processRetries()
return se
}
func (se *SyncEngine) Put(key string, value interface{}) error {
timestamp := time.Now().UnixNano()
peerID := se.gun.GetPeerID()
// Check for local conflicts
if existing, exists := se.gun.graph.Get(key); exists {
if existingTime, ok := existing["_"][">"]; ok {
if timestamp <= existingTime.(int64) {
return ErrStaleWrite
}
}
}
// Prepare write with metadata
writeData := map[string]interface{}{
"_": map[string]interface{}{
">": timestamp,
"#": peerID,
},
}
// Merge value with metadata
if valueMap, ok := value.(map[string]interface{}); ok {
for k, v := range valueMap {
writeData[k] = v
}
} else {
writeData["="] = value
}
// Store locally
se.gun.graph.Put(key, writeData)
// Add to pending writes for peer synchronization
se.mutex.Lock()
se.pendingWrites[key] = &PendingWrite{
Key: key,
Value: writeData,
Timestamp: timestamp,
PeerID: peerID,
Retries: 0,
}
se.mutex.Unlock()
// Trigger sync
se.syncQueue <- &SyncEvent{
Type: SyncEventPut,
Key: key,
Value: writeData,
Timestamp: timestamp,
PeerID: peerID,
}
return nil
}
func (se *SyncEngine) processConflicts() {
for conflict := range se.conflictQueue {
// Implement last-write-wins with vector clock comparison
if conflict.RemoteTime > conflict.LocalTime {
// Remote write is newer, accept it
se.gun.graph.Put(conflict.Key, conflict.RemoteVal)
se.gun.eventBus.Publish("data.updated", map[string]interface{}{
"key": conflict.Key,
"value": conflict.RemoteVal,
"peer": conflict.PeerID,
})
} else if conflict.RemoteTime == conflict.LocalTime {
// Same timestamp, use deterministic resolution based on peer ID
localPeerID := se.gun.GetPeerID()
if conflict.PeerID > localPeerID {
se.gun.graph.Put(conflict.Key, conflict.RemoteVal)
se.gun.eventBus.Publish("data.updated", map[string]interface{}{
"key": conflict.Key,
"value": conflict.RemoteVal,
"peer": conflict.PeerID,
})
}
}
// If local time is newer, keep local value (do nothing)
}
}
5. Integration with Fiber Applications
Making fibergun easy to use in Fiber applications was a primary design goal. Here’s how developers can integrate it:
package main
import (
"log"
"time"
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/cors"
"github.com/gofiber/fiber/v2/middleware/logger"
"github.com/gofiber/websocket/v2"
"your-package/fibergun"
)
func main() {
app := fiber.New(fiber.Config{
ErrorHandler: func(c *fiber.Ctx, err error) error {
code := fiber.StatusInternalServerError
if e, ok := err.(*fiber.Error); ok {
code = e.Code
}
return c.Status(code).JSON(fiber.Map{
"error": err.Error(),
})
},
})
// Middleware
app.Use(logger.New())
app.Use(cors.New(cors.Config{
AllowOrigins: "*",
AllowCredentials: true,
}))
// Initialize GunDB
gun, err := fibergun.New(&fibergun.Options{
Peers: []string{"ws://peer1.com/gun", "ws://peer2.com/gun"},
Radisk: true,
File: "./data",
WebSocket: true,
Port: 8080,
MaxPeers: 50,
StorageQuota: 1024 * 1024 * 1024, // 1GB
})
if err != nil {
log.Fatal(err)
}
defer gun.Close()
// WebSocket upgrade middleware
app.Use("/gun", websocket.New(func(c *websocket.Conn) {
gun.HandleWebSocket(c)
}))
// REST API endpoints
setupAPIRoutes(app, gun)
// Static file serving
app.Static("/", "./public")
log.Fatal(app.Listen(":3000"))
}
func setupAPIRoutes(app *fiber.App, gun fibergun.GunDB) {
api := app.Group("/api/v1")
// Get data
api.Get("/data/*", func(c *fiber.Ctx) error {
path := c.Params("*")
node := gun.Get(path)
value := node.Val()
if value == nil {
return c.Status(fiber.StatusNotFound).JSON(fiber.Map{
"error": "Data not found",
})
}
return c.JSON(fiber.Map{
"data": value,
"path": path,
})
})
// Put data
api.Put("/data/*", func(c *fiber.Ctx) error {
path := c.Params("*")
var data interface{}
if err := c.BodyParser(&data); err != nil {
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
"error": "Invalid JSON",
})
}
if err := gun.Put(path, data); err != nil {
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
"error": err.Error(),
})
}
return c.JSON(fiber.Map{
"status": "success",
"path": path,
})
})
// Subscribe to data changes
api.Get("/subscribe/*", websocket.New(func(c *websocket.Conn) {
path := c.Params("*")
subscription := gun.On(path, func(data interface{}, key string) {
message := map[string]interface{}{
"type": "update",
"path": path,
"key": key,
"data": data,
}
if err := c.WriteJSON(message); err != nil {
gun.Off(subscription)
c.Close()
}
})
defer gun.Off(subscription)
// Keep connection alive
for {
_, _, err := c.ReadMessage()
if err != nil {
break
}
}
}))
}
Advanced Features and Use Cases
1. Real-time Collaborative Applications
Fibergun excels at building real-time collaborative applications:
// Real-time chat implementation
func setupChatRoutes(app *fiber.App, gun fibergun.GunDB) {
chat := app.Group("/chat")
// Send message
chat.Post("/send", func(c *fiber.Ctx) error {
var message struct {
Room string `json:"room"`
User string `json:"user"`
Message string `json:"message"`
}
if err := c.BodyParser(&message); err != nil {
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
"error": "Invalid message format",
})
}
messageID := generateMessageID()
messageData := map[string]interface{}{
"user": message.User,
"message": message.Message,
"timestamp": time.Now().UnixNano(),
}
// Store message in room
path := fmt.Sprintf("chat/rooms/%s/messages/%s", message.Room, messageID)
if err := gun.Put(path, messageData); err != nil {
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
"error": "Failed to send message",
})
}
return c.JSON(fiber.Map{
"status": "sent",
"messageId": messageID,
})
})
// Subscribe to room messages
chat.Get("/room/:room/subscribe", websocket.New(func(c *websocket.Conn) {
room := c.Params("room")
path := fmt.Sprintf("chat/rooms/%s/messages", room)
subscription := gun.Map(path).On(func(message interface{}, messageID string) {
response := map[string]interface{}{
"type": "message",
"room": room,
"messageId": messageID,
"data": message,
}
if err := c.WriteJSON(response); err != nil {
gun.Off(subscription)
c.Close()
}
})
defer gun.Off(subscription)
// Keep connection alive
for {
_, _, err := c.ReadMessage()
if err != nil {
break
}
}
}))
}
2. Offline-first Todo Application
// Todo application with offline support
type Todo struct {
ID string `json:"id"`
Title string `json:"title"`
Description string `json:"description"`
Completed bool `json:"completed"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
UserID string `json:"userId"`
}
func setupTodoRoutes(app *fiber.App, gun fibergun.GunDB) {
todos := app.Group("/todos")
// Create todo
todos.Post("/", func(c *fiber.Ctx) error {
var todo Todo
if err := c.BodyParser(&todo); err != nil {
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
"error": "Invalid todo format",
})
}
todo.ID = generateTodoID()
todo.CreatedAt = time.Now()
todo.UpdatedAt = time.Now()
// Store in user's todo list
userTodosPath := fmt.Sprintf("users/%s/todos/%s", todo.UserID, todo.ID)
if err := gun.Put(userTodosPath, todo); err != nil {
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
"error": "Failed to create todo",
})
}
return c.Status(fiber.StatusCreated).JSON(todo)
})
// Get user's todos
todos.Get("/user/:userId", func(c *fiber.Ctx) error {
userID := c.Params("userId")
todosPath := fmt.Sprintf("users/%s/todos", userID)
todosNode := gun.Get(todosPath)
todosData := todosNode.Val()
if todosData == nil {
return c.JSON([]Todo{})
}
var todos []Todo
if todosMap, ok := todosData.(map[string]interface{}); ok {
for _, todoData := range todosMap {
if todoData != nil {
var todo Todo
if err := mapstructure.Decode(todoData, &todo); err == nil {
todos = append(todos, todo)
}
}
}
}
return c.JSON(todos)
})
// Update todo
todos.Put("/:id", func(c *fiber.Ctx) error {
todoID := c.Params("id")
userID := c.Query("userId")
var updates map[string]interface{}
if err := c.BodyParser(&updates); err != nil {
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
"error": "Invalid update format",
})
}
updates["updatedAt"] = time.Now()
todoPath := fmt.Sprintf("users/%s/todos/%s", userID, todoID)
// Get existing todo and merge updates
existing := gun.Get(todoPath).Val()
if existing == nil {
return c.Status(fiber.StatusNotFound).JSON(fiber.Map{
"error": "Todo not found",
})
}
// Merge updates with existing data
if existingMap, ok := existing.(map[string]interface{}); ok {
for key, value := range updates {
existingMap[key] = value
}
if err := gun.Put(todoPath, existingMap); err != nil {
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
"error": "Failed to update todo",
})
}
return c.JSON(existingMap)
}
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
"error": "Invalid todo data",
})
})
}
Performance Optimization Strategies
Building a performant distributed database requires careful optimization at multiple levels:
1. Connection Pooling and Management
type ConnectionPool struct {
connections chan *Connection
factory func() (*Connection, error)
maxSize int
currentSize int
mutex sync.Mutex
}
func NewConnectionPool(maxSize int, factory func() (*Connection, error)) *ConnectionPool {
return &ConnectionPool{
connections: make(chan *Connection, maxSize),
factory: factory,
maxSize: maxSize,
}
}
func (cp *ConnectionPool) Get() (*Connection, error) {
select {
case conn := <-cp.connections:
return conn, nil
default:
cp.mutex.Lock()
defer cp.mutex.Unlock()
if cp.currentSize < cp.maxSize {
conn, err := cp.factory()
if err != nil {
return nil, err
}
cp.currentSize++
return conn, nil
}
// Wait for available connection
return <-cp.connections, nil
}
}
func (cp *ConnectionPool) Put(conn *Connection) {
select {
case cp.connections <- conn:
default:
// Pool is full, close connection
conn.Close()
cp.mutex.Lock()
cp.currentSize--
cp.mutex.Unlock()
}
}
2. Intelligent Caching System
type CacheLayer struct {
memory *lru.Cache
disk *DiskCache
metrics *CacheMetrics
mutex sync.RWMutex
}
func NewCacheLayer(memorySize int, diskPath string) (*CacheLayer, error) {
memCache, err := lru.New(memorySize)
if err != nil {
return nil, err
}
diskCache, err := NewDiskCache(diskPath)
if err != nil {
return nil, err
}
return &CacheLayer{
memory: memCache,
disk: diskCache,
metrics: NewCacheMetrics(),
}, nil
}
func (cl *CacheLayer) Get(key string) (interface{}, bool) {
cl.mutex.RLock()
defer cl.mutex.RUnlock()
// Try memory cache first
if value, ok := cl.memory.Get(key); ok {
cl.metrics.MemoryHits++
return value, true
}
// Try disk cache
if value, ok := cl.disk.Get(key); ok {
cl.metrics.DiskHits++
// Promote to memory cache
cl.memory.Add(key, value)
return value, true
}
cl.metrics.Misses++
return nil, false
}
func (cl *CacheLayer) Put(key string, value interface{}) {
cl.mutex.Lock()
defer cl.mutex.Unlock()
// Store in both memory and disk
cl.memory.Add(key, value)
cl.disk.Put(key, value)
}
3. Message Batching and Compression
type MessageBatcher struct {
messages []Message
batchSize int
flushTimer *time.Timer
flushPeriod time.Duration
compressor Compressor
sender MessageSender
mutex sync.Mutex
}
func NewMessageBatcher(batchSize int, flushPeriod time.Duration) *MessageBatcher {
mb := &MessageBatcher{
messages: make([]Message, 0, batchSize),
batchSize: batchSize,
flushPeriod: flushPeriod,
compressor: NewGzipCompressor(),
}
mb.flushTimer = time.AfterFunc(flushPeriod, mb.flush)
return mb
}
func (mb *MessageBatcher) Add(message Message) {
mb.mutex.Lock()
defer mb.mutex.Unlock()
mb.messages = append(mb.messages, message)
if len(mb.messages) >= mb.batchSize {
mb.flushLocked()
}
}
func (mb *MessageBatcher) flushLocked() {
if len(mb.messages) == 0 {
return
}
// Create batch
batch := MessageBatch{
Messages: mb.messages,
Timestamp: time.Now().UnixNano(),
}
// Compress batch
compressed, err := mb.compressor.Compress(batch)
if err != nil {
log.Printf("Failed to compress message batch: %v", err)
return
}
// Send batch
go mb.sender.Send(compressed)
// Reset batch
mb.messages = mb.messages[:0]
mb.flushTimer.Reset(mb.flushPeriod)
}
Testing and Quality Assurance
Comprehensive testing is crucial for a distributed database system:
1. Unit Testing
func TestGunDBBasicOperations(t *testing.T) {
// Setup test environment
tmpDir, err := os.MkdirTemp("", "gundb_test")
require.NoError(t, err)
defer os.RemoveAll(tmpDir)
gun, err := NewGunDB(&Options{
File: tmpDir,
Radisk: true,
})
require.NoError(t, err)
defer gun.Close()
// Test basic put/get operations
testData := map[string]interface{}{
"name": "John Doe",
"age": 30,
}
err = gun.Put("users/john", testData)
require.NoError(t, err)
node := gun.Get("users/john")
value := node.Val()
require.NotNil(t, value)
// Verify data integrity
if valueMap, ok := value.(map[string]interface{}); ok {
assert.Equal(t, "John Doe", valueMap["name"])
assert.Equal(t, float64(30), valueMap["age"]) // JSON numbers are float64
} else {
t.Fatal("Expected map[string]interface{}")
}
}
func TestConflictResolution(t *testing.T) {
// Create two GunDB instances
gun1, err := NewGunDB(&Options{})
require.NoError(t, err)
defer gun1.Close()
gun2, err := NewGunDB(&Options{})
require.NoError(t, err)
defer gun2.Close()
// Simulate concurrent writes
key := "test/conflict"
// Write to gun1
err = gun1.Put(key, map[string]interface{}{"value": "from_gun1"})
require.NoError(t, err)
// Write to gun2 slightly later
time.Sleep(time.Millisecond)
err = gun2.Put(key, map[string]interface{}{"value": "from_gun2"})
require.NoError(t, err)
// Sync between instances
err = gun1.Sync(gun2)
require.NoError(t, err)
// Both should have the later write (from gun2)
value1 := gun1.Get(key).Val()
value2 := gun2.Get(key).Val()
assert.Equal(t, value1, value2)
if valueMap, ok := value1.(map[string]interface{}); ok {
assert.Equal(t, "from_gun2", valueMap["value"])
}
}
2. Integration Testing
func TestPeerSynchronization(t *testing.T) {
// Start multiple peer instances
peers := make([]*gunDB, 3)
ports := []int{8081, 8082, 8083}
for i := 0; i < 3; i++ {
gun, err := NewGunDB(&Options{
Port: ports[i],
WebSocket: true,
Peers: getPeerURLs(ports, i),
})
require.NoError(t, err)
peers[i] = gun
}
// Cleanup
defer func() {
for _, peer := range peers {
peer.Close()
}
}()
// Wait for peers to connect
time.Sleep(2 * time.Second)
// Write data to first peer
testData := map[string]interface{}{
"message": "Hello from peer 0",
"timestamp": time.Now().Unix(),
}
err := peers[0].Put("sync/test", testData)
require.NoError(t, err)
// Wait for synchronization
time.Sleep(1 * time.Second)
// Verify data is available on all peers
for i, peer := range peers {
value := peer.Get("sync/test").Val()
require.NotNil(t, value, "Peer %d should have synced data", i)
if valueMap, ok := value.(map[string]interface{}); ok {
assert.Equal(t, "Hello from peer 0", valueMap["message"])
}
}
}
func getPeerURLs(ports []int, exclude int) []string {
var urls []string
for i, port := range ports {
if i != exclude {
urls = append(urls, fmt.Sprintf("ws://localhost:%d/gun", port))
}
}
return urls
}
3. Load Testing
func TestHighLoadOperations(t *testing.T) {
gun, err := NewGunDB(&Options{
MaxPeers: 100,
})
require.NoError(t, err)
defer gun.Close()
// Test concurrent writes
numRoutines := 100
numWrites := 1000
var wg sync.WaitGroup
errors := make(chan error, numRoutines*numWrites)
for i := 0; i < numRoutines; i++ {
wg.Add(1)
go func(routineID int) {
defer wg.Done()
for j := 0; j < numWrites; j++ {
key := fmt.Sprintf("load/test/%d/%d", routineID, j)
data := map[string]interface{}{
"routine": routineID,
"write": j,
"time": time.Now().UnixNano(),
}
if err := gun.Put(key, data); err != nil {
errors <- err
return
}
}
}(i)
}
wg.Wait()
close(errors)
// Check for errors
var errorCount int
for err := range errors {
t.Logf("Write error: %v", err)
errorCount++
}
assert.Equal(t, 0, errorCount, "No errors should occur during load test")
// Verify some random keys exist
for i := 0; i < 10; i++ {
routineID := rand.Intn(numRoutines)
writeID := rand.Intn(numWrites)
key := fmt.Sprintf("load/test/%d/%d", routineID, writeID)
value := gun.Get(key).Val()
assert.NotNil(t, value, "Random key %s should exist", key)
}
}
Monitoring and Observability
Production distributed systems require comprehensive monitoring:
type Metrics struct {
// Operation counters
ReadOperations prometheus.Counter
WriteOperations prometheus.Counter
SyncOperations prometheus.Counter
// Performance metrics
OperationDuration prometheus.HistogramVec
SyncLatency prometheus.Histogram
// System metrics
ConnectedPeers prometheus.Gauge
StorageUsage prometheus.Gauge
MemoryUsage prometheus.Gauge
// Error metrics
OperationErrors prometheus.CounterVec
SyncErrors prometheus.Counter
}
func NewMetrics() *Metrics {
m := &Metrics{
ReadOperations: prometheus.NewCounter(prometheus.CounterOpts{
Name: "gundb_read_operations_total",
Help: "Total number of read operations",
}),
WriteOperations: prometheus.NewCounter(prometheus.CounterOpts{
Name: "gundb_write_operations_total",
Help: "Total number of write operations",
}),
OperationDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "gundb_operation_duration_seconds",
Help: "Duration of operations",
}, []string{"operation", "status"}),
ConnectedPeers: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "gundb_connected_peers",
Help: "Number of connected peers",
}),
}
// Register metrics
prometheus.MustRegister(
m.ReadOperations,
m.WriteOperations,
m.OperationDuration,
m.ConnectedPeers,
)
return m
}
func (m *Metrics) RecordOperation(operation string, duration time.Duration, err error) {
status := "success"
if err != nil {
status = "error"
}
m.OperationDuration.WithLabelValues(operation, status).Observe(duration.Seconds())
switch operation {
case "read":
m.ReadOperations.Inc()
case "write":
m.WriteOperations.Inc()
}
}
Future Roadmap and Improvements
Our Fiber-GunDB integration continues to evolve. Here are some planned improvements:
1. Enhanced Security Features
- End-to-end encryption for sensitive data
- User authentication and authorization layers
- Audit logging and compliance features
- Advanced key management systems
2. Performance Optimizations
- Improved caching strategies with intelligent prefetching
- Better compression algorithms for network traffic
- Optimized storage backends for different use cases
- Enhanced conflict resolution algorithms
3. Developer Experience
- Comprehensive documentation with interactive examples
- Developer tools for debugging and profiling
- Code generation tools for common patterns
- Integration with popular development frameworks
4. Enterprise Features
- Multi-tenant support with data isolation
- Advanced monitoring and alerting
- Backup and disaster recovery solutions
- Compliance tools for regulatory requirements
Resources and Community
To learn more about GunDB and our Fiber integration:
- GunDB Official Documentation - Comprehensive GunDB documentation
- Fiber Framework - Fiber web framework documentation
- Fibergun Repository - Complete source code and examples
- Community Discord - Join the GunDB community discussions
- Example Applications - Real-world example applications
Conclusion
Integrating GunDB with Fiber provides developers with a powerful foundation for building distributed, real-time applications that work seamlessly offline and online. The combination of Fiber’s performance and simplicity with GunDB’s decentralized architecture creates unique opportunities for building resilient, user-centric applications.
Our implementation demonstrates that decentralized databases can be successfully integrated into modern web frameworks without sacrificing performance or developer experience. The modular design allows for easy customization and extension while maintaining the core benefits of decentralization.
As we continue to develop fibergun, we remain committed to improving both performance and developer experience, making it easier for developers to build the distributed applications of tomorrow.
The future of web applications lies not in centralized architectures, but in systems that empower users with true data ownership and work reliably regardless of network conditions. GunDB and Fiber together provide the foundation for building these next-generation applications.
This integration represents our ongoing commitment to advancing decentralized web technologies and empowering developers to build more resilient, user-centric applications.