Electric与React Native:移动应用实时数据同步新范式
【免费下载链接】electric electric-sql/electric: 这是一个用于查询数据库的JavaScript库,支持多种数据库。适合用于需要使用JavaScript查询数据库的场景。特点:易于使用,支持多种数据库,具有灵活的查询构建和结果处理功能。 项目地址: https://gitcode.***/GitHub_Trending/el/electric
你是否还在为React Native应用的离线数据同步烦恼?用户抱怨网络不稳定时数据丢失?多设备切换导致状态不一致?本文将带你一文掌握Electric+React Native的实时数据同步方案,从架构设计到代码实现,彻底解决移动开发中的数据一致性难题。
读完本文你将获得:
- 理解Electric如何实现PostgreSQL与React Native的双向实时同步
- 掌握离线优先架构在移动应用中的最佳实践
- 学会使用ShapeStream API构建响应式数据层
- 解决弱网环境下的数据冲突与合并问题
- 优化移动同步性能的10个实用技巧
移动开发的数据同步困境
传统移动应用开发中,数据同步通常依赖RESTful API或GraphQL手动实现,这种方式存在三大痛点:
| 痛点 | 传统解决方案 | Electric方案 |
|---|---|---|
| 离线数据访问 | 本地SQLite+手动同步逻辑 | 自动双向同步+持久化缓存 |
| 实时更新 | 轮询/长轮询(高延迟) | Server-Sent Events(SSE)实时推送 |
| 冲突解决 | 服务端覆盖写入 | 基于CRDT的智能合并 |
| 带宽消耗 | 全量数据传输 | 增量变更同步(节省70%+流量) |
| 代码复杂度 | 数千行同步逻辑 | 声明式API(3行代码实现同步) |
Electric作为PostgreSQL的实时同步引擎,通过ShapeStream协议解决了移动应用的数据同步难题,其核心优势在于:
- 低延迟:毫秒级数据更新推送
- 高可用:断网继续工作,联网自动同步
- 省流量:仅传输变更数据而非全量
- 易集成:与React Native生态无缝衔接
- 强一致:PostgreSQL事务保证+冲突自动解决
架构设计:Electric同步原理
Electric的移动同步架构基于三层设计:
- 数据源头:PostgreSQL数据库通过逻辑复制将变更发送到Electric服务
- 同步服务:Electric服务将数据变更转化为ShapeStream格式,通过HTTP API暴露
- 客户端连接:React Native应用通过SSE(Server-Sent Events)或长轮询连接Electric服务
- 本地处理:ShapeStream客户端处理增量更新,维护本地状态
- 离线支持:使用AsyncStorage持久化数据,离线时记录操作队列
- 冲突解决:重连后自动同步离线操作,Electric服务处理冲突
这种架构实现了"一次编写,处处运行"的数据同步模式,开发者只需关注业务逻辑,无需编写同步代码。
环境搭建:从零开始配置
1. 准备工作
首先确保你的开发环境满足以下要求:
- Node.js 18+
- PostgreSQL 14+(启用逻辑复制)
- React Native 0.70+
- Electric服务 1.0+
2. 启动Electric服务
使用Docker快速启动Electric服务和PostgreSQL:
# 克隆仓库
git clone https://gitcode.***/GitHub_Trending/el/electric
cd electric
# 使用docker-***pose启动服务
docker ***pose -f .support/docker-***pose.yml up
3. 创建React Native项目
npx react-native init ElectricDemo --template react-native-template-typescript
cd ElectricDemo
# 安装Electric客户端依赖
npm install @electric-sql/typescript-client @react-native-async-storage/async-storage
4. 配置PostgreSQL
连接PostgreSQL并创建测试表:
psql "postgresql://postgres:password@localhost:54321/electric"
创建一个简单的任务表:
CREATE TABLE tasks (
id SERIAL PRIMARY KEY,
title VARCHAR(255) NOT NULL,
***pleted BOOLEAN DEFAULT false,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 启用Electric复制
ALTER TABLE tasks ENABLE ELECTRIC;
核心实现:实时任务列表应用
1. 初始化Electric客户端
创建src/services/electric.ts:
import { ShapeStream } from '@electric-sql/typescript-client'
import AsyncStorage from '@react-native-async-storage/async-storage';
// 从持久化存储加载最后同步的偏移量
const loadLastOffset = async (): Promise<string | undefined> => {
try {
return await AsyncStorage.getItem('electric_offset');
} catch (error) {
console.error('Failed to load offset:', error);
return undefined;
}
};
// 保存最新的同步偏移量
const saveLastOffset = async (offset: string): Promise<void> => {
try {
await AsyncStorage.setItem('electric_offset', offset);
} catch (error) {
console.error('Failed to save offset:', error);
}
};
export const initElectric = async () => {
// 加载最后保存的偏移量
const lastOffset = await loadLastOffset();
// 创建ShapeStream实例
const stream = new ShapeStream({
url: 'http://localhost:3000/v1/shape',
offset: lastOffset || '-1',
params: {
table: 'tasks',
replica: 'full'
},
subscribe: true,
experimentalLiveSse: true,
onError: async (error) => {
console.error('Electric error:', error);
// 处理认证错误,例如刷新token
if (error.message.includes('Unauthorized')) {
const newToken = await refreshAuthToken();
return {
headers: {
Authorization: `Bearer ${newToken}`
}
};
}
return undefined;
}
});
// 监听偏移量变化并保存
stream.subscribe(async (messages) => {
if (messages.length > 0) {
const lastMessage = messages[messages.length - 1];
if ('offset' in lastMessage) {
await saveLastOffset(lastMessage.offset);
}
}
});
return stream;
};
// 模拟刷新认证token
const refreshAuthToken = async (): Promise<string> => {
// 实际实现中调用你的认证服务
return 'new-auth-token';
};
2. 创建自定义Hook
创建src/hooks/useTasks.ts:
import { useEffect, useState, useCallback } from 'react';
import { Message } from '@electric-sql/typescript-client';
import { initElectric } from '../services/electric';
// 任务类型定义
export interface Task {
id: number;
title: string;
***pleted: boolean;
created_at: string;
updated_at: string;
}
export const useTasks = () => {
const [tasks, setTasks] = useState<Task[]>([]);
const [loading, setLoading] = useState(true);
const [error, setError] = useState<Error | null>(null);
const [stream, setStream] = useState<any>(null);
// 初始化Electric连接
useEffect(() => {
const initialize = async () => {
try {
setLoading(true);
const electricStream = await initElectric();
setStream(electricStream);
// 订阅任务数据更新
const unsubscribe = electricStream.subscribe((messages: Message<Task>[]) => {
if (messages.length > 0) {
setTasks(prev => {
// 创建任务ID到任务的映射
const taskMap = new Map(prev.map(task => [task.id, task]));
// 处理每条消息
messages.forEach(message => {
if ('value' in message) {
const task = message.value as Task;
if (message.headers?.operation === 'delete') {
taskMap.delete(task.id);
} else {
taskMap.set(task.id, task);
}
}
});
// 返回排序后的任务列表
return Array.from(taskMap.values()).sort((a, b) =>
new Date(b.updated_at).getTime() - new Date(a.updated_at).getTime()
);
});
}
}, (err: Error) => {
setError(err);
console.error('Subscription error:', err);
});
return () => {
unsubscribe();
electricStream.unsubscribeAll();
};
} catch (err) {
setError(err as Error);
console.error('Initialization error:', err);
} finally {
setLoading(false);
}
};
initialize();
}, []);
// 添加新任务
const addTask = useCallback(async (title: string) => {
// 在实际应用中,这里应该调用你的API来创建任务
// Electric会自动同步PostgreSQL中的新任务
try {
await fetch('http://localhost:3000/api/tasks', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ title }),
});
} catch (err) {
setError(err as Error);
console.error('Failed to add task:', err);
}
}, []);
// 切换任务完成状态
const toggleTask = useCallback(async (id: number, ***pleted: boolean) => {
try {
await fetch(`http://localhost:3000/api/tasks/${id}`, {
method: 'PATCH',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ ***pleted, updated_at: new Date().toISOString() }),
});
} catch (err) {
setError(err as Error);
console.error('Failed to toggle task:', err);
}
}, []);
// 删除任务
const deleteTask = useCallback(async (id: number) => {
try {
await fetch(`http://localhost:3000/api/tasks/${id}`, {
method: 'DELETE',
});
} catch (err) {
setError(err as Error);
console.error('Failed to delete task:', err);
}
}, []);
return { tasks, loading, error, addTask, toggleTask, deleteTask };
};
3. 实现任务列表组件
创建src/***ponents/TaskList.tsx:
import React, { useState } from 'react';
import { View, Text, TextInput, Button, FlatList, TouchableOpacity, StyleSheet } from 'react-native';
import { useTasks } from '../hooks/useTasks';
const TaskList: React.FC = () => {
const [newTaskTitle, setNewTaskTitle] = useState('');
const { tasks, loading, error, addTask, toggleTask, deleteTask } = useTasks();
const handleAddTask = () => {
if (newTaskTitle.trim()) {
addTask(newTaskTitle);
setNewTaskTitle('');
}
};
if (loading) {
return <Text style={styles.loading}>加载中...</Text>;
}
if (error) {
return <Text style={styles.error}>出错了: {error.message}</Text>;
}
return (
<View style={styles.container}>
<View style={styles.inputContainer}>
<TextInput
style={styles.input}
value={newTaskTitle}
onChangeText={setNewTaskTitle}
placeholder="输入新任务..."
/>
<Button title="添加" onPress={handleAddTask} />
</View>
{tasks.length === 0 ? (
<Text style={styles.emptyMessage}>暂无任务,添加你的第一个任务吧!</Text>
) : (
<FlatList
data={tasks}
keyExtractor={(item) => item.id.toString()}
renderItem={({ item }) => (
<View style={styles.taskItem}>
<TouchableOpacity
style={styles.checkbox}
onPress={() => toggleTask(item.id, !item.***pleted)}
>
{item.***pleted && <Text style={styles.checkmark}>✓</Text>}
</TouchableOpacity>
<Text style={[styles.taskText, item.***pleted && styles.***pletedTask]}>
{item.title}
</Text>
<Button title="删除" onPress={() => deleteTask(item.id)} />
</View>
)}
/>
)}
</View>
);
};
const styles = StyleSheet.create({
container: {
flex: 1,
padding: 20,
},
inputContainer: {
flexDirection: 'row',
marginBottom: 20,
gap: 10,
},
input: {
flex: 1,
borderWidth: 1,
borderColor: '#***c',
padding: 10,
borderRadius: 4,
},
taskItem: {
flexDirection: 'row',
alignItems: 'center',
gap: 10,
paddingVertical: 10,
borderBottomWidth: 1,
borderBottomColor: '#eee',
},
checkbox: {
width: 24,
height: 24,
borderWidth: 1,
borderColor: '#***c',
borderRadius: 4,
justifyContent: 'center',
alignItems: 'center',
},
checkmark: {
color: 'green',
fontWeight: 'bold',
},
taskText: {
flex: 1,
fontSize: 16,
},
***pletedTask: {
textDecorationLine: 'line-through',
color: '#888',
},
loading: {
textAlign: 'center',
padding: 20,
fontSize: 16,
},
error: {
textAlign: 'center',
padding: 20,
fontSize: 16,
color: 'red',
},
emptyMessage: {
textAlign: 'center',
padding: 20,
fontSize: 16,
color: '#888',
},
});
export default TaskList;
4. 集成到主应用
修改App.tsx:
import React from 'react';
import { SafeAreaView, StyleSheet } from 'react-native';
import TaskList from './src/***ponents/TaskList';
const App: React.FC = () => {
return (
<SafeAreaView style={styles.container}>
<TaskList />
</SafeAreaView>
);
};
const styles = StyleSheet.create({
container: {
flex: 1,
},
});
export default App;
离线优先:处理网络不稳定场景
Electric的离线支持是其核心优势之一。以下是实现离线优先体验的关键技术点:
1. 本地持久化策略
// src/services/offlineStorage.ts
import AsyncStorage from '@react-native-async-storage/async-storage';
// 保存离线操作队列
export const saveOfflineAction = async (action: any) => {
try {
const queue = JSON.parse(await AsyncStorage.getItem('offline_queue') || '[]');
queue.push({
...action,
timestamp: new Date().toISOString(),
id: Date.now().toString()
});
await AsyncStorage.setItem('offline_queue', JSON.stringify(queue));
return true;
} catch (error) {
console.error('Failed to save offline action:', error);
return false;
}
};
// 获取离线操作队列
export const getOfflineActions = async () => {
try {
const queue = JSON.parse(await AsyncStorage.getItem('offline_queue') || '[]');
return queue;
} catch (error) {
console.error('Failed to get offline actions:', error);
return [];
}
};
// 清除已同步的离线操作
export const clearSyncedActions = async (ids: string[]) => {
try {
const queue = JSON.parse(await AsyncStorage.getItem('offline_queue') || '[]');
const remaining = queue.filter((action: any) => !ids.includes(action.id));
await AsyncStorage.setItem('offline_queue', JSON.stringify(remaining));
} catch (error) {
console.error('Failed to clear synced actions:', error);
}
};
2. 网络状态监听
// src/hooks/use***workStatus.ts
import { useEffect, useState } from 'react';
import ***Info from '@react-native-***munity/***info';
export const use***workStatus = () => {
const [isConnected, setIsConnected] = useState(true);
useEffect(() => {
const unsubscribe = ***Info.addEventListener(state => {
setIsConnected(state.isConnected || false);
});
return () => {
unsubscribe();
};
}, []);
return { isConnected };
};
3. 离线操作处理
修改useTasks hook,添加离线支持:
// 添加到useTasks hook中
const { isConnected } = use***workStatus();
// 修改addTask方法以支持离线操作
const addTask = useCallback(async (title: string) => {
if (!isConnected) {
// 离线状态下保存到本地队列
await saveOfflineAction({
type: 'add',
payload: { title }
});
// 乐观更新UI
setTasks(prev => [
{
id: Date.now(), // 临时ID,同步后会被服务器ID替换
title,
***pleted: false,
created_at: new Date().toISOString(),
updated_at: new Date().toISOString(),
isOffline: true // 标记为离线任务
},
...prev
]);
return;
}
// 在线状态下直接调用API
try {
await fetch('http://localhost:3000/api/tasks', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ title }),
});
} catch (err) {
setError(err as Error);
console.error('Failed to add task:', err);
}
}, [isConnected]);
// 添加离线操作同步逻辑
useEffect(() => {
const syncOfflineActions = async () => {
if (isConnected) {
const actions = await getOfflineActions();
if (actions.length > 0) {
try {
// 批量同步离线操作
const response = await fetch('http://localhost:3000/api/offline/sync', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ actions }),
});
if (response.ok) {
const { syncedIds } = await response.json();
await clearSyncedActions(syncedIds);
}
} catch (err) {
console.error('Failed to sync offline actions:', err);
}
}
}
};
// 当网络恢复时同步离线操作
if (isConnected) {
syncOfflineActions();
}
}, [isConnected]);
性能优化:移动场景最佳实践
1. 数据部分同步
使用Electric的where参数只同步所需数据,减少带宽消耗和客户端内存占用:
// 只同步未完成的任务
const stream = new ShapeStream({
url: 'http://localhost:3000/v1/shape',
params: {
table: 'tasks',
where: '***pleted = false',
columns: ['id', 'title', '***pleted'] // 只获取需要的列
},
// ...其他配置
});
2. 批处理更新
在React Native中,频繁的状态更新会导致性能问题。优化渲染性能的关键是批处理更新:
// 优化前:每次消息单独更新
stream.subscribe((message: Message<Task>) => {
setTasks(prev => updateTask(prev, message));
});
// 优化后:批处理更新
stream.subscribe((messages: Message<Task>[]) => {
if (messages.length > 0) {
setTasks(prev => {
let updated = [...prev];
messages.forEach(message => {
updated = updateTask(updated, message);
});
return updated;
});
}
});
3. 组件渲染优化
使用memo和useCallback减少不必要的重渲染:
import React, { memo, useCallback } from 'react';
import { View, Text, TouchableOpacity, StyleSheet } from 'react-native';
// 使用memo包装任务项组件
const TaskItem = memo(({
task,
onToggle,
onDelete
}: {
task: Task,
onToggle: (id: number, ***pleted: boolean) => void,
onDelete: (id: number) => void
}) => {
// 使用useCallback确保函数引用稳定
const handleToggle = useCallback(() => {
onToggle(task.id, !task.***pleted);
}, [task.id, task.***pleted, onToggle]);
const handleDelete = useCallback(() => {
onDelete(task.id);
}, [task.id, onDelete]);
return (
<View style={styles.taskItem}>
{/* 组件内容 */}
</View>
);
});
4. 同步策略调整
根据应用场景调整同步策略:
// 电池优化:低电量时降低同步频率
const stream = new ShapeStream({
// ...其他配置
backoffOptions: {
initialDelay: 1000,
maxDelay: 30000,
// 根据电池状态动态调整退避策略
onFailedAttempt: () => {
checkBatteryLevel().then(level => {
if (level < 0.2) { // 电量低于20%
stream.setBackoffOptions({ maxDelay: 60000 });
}
});
}
}
});
常见问题与解决方案
1. 数据冲突如何处理?
Electric提供两种冲突解决策略:
- 最后写入 wins:默认策略,基于时间戳
-
自定义合并:通过
replica: 'full'获取旧值,实现自定义合并逻辑
stream.subscribe((messages: Message<Task>[]) => {
messages.forEach(message => {
if (message.headers?.operation === 'update' && message.old_value) {
// 自定义合并逻辑
const currentTask = taskMap.get(message.value.id);
if (currentTask?.isOffline) {
// 本地离线修改优先
mergeTasks(currentTask, message.value, message.old_value);
}
}
});
});
2. 如何处理大型数据集?
对于超过1000条记录的数据集,建议使用分页和虚拟列表:
// 分页同步大型数据集
const stream = new ShapeStream({
url: 'http://localhost:3000/v1/shape',
params: {
table: 'large_dataset',
where: 'category = $1',
params: { '1': 'mobile' },
limit: '100',
offset: '0'
},
// ...其他配置
});
// 结合FlashList实现虚拟列表
import { FlashList } from '@shopify/flash-list';
// ...组件中使用FlashList替代FlatList
<FlashList
data={tasks}
estimatedItemSize={80}
renderItem={({ item }) => <TaskItem task={item} />}
/>
3. 如何调试同步问题?
Electric提供详细的调试工具:
const stream = new ShapeStream({
// ...其他配置
onError: (error) => {
console.error('Electric error:', error);
// 记录错误到监控服务
logToMonitoringService({
type: 'electric_error',
message: error.message,
stack: error.stack,
timestamp: new Date().toISOString()
});
}
});
// 启用详细日志
stream.subscribe((messages) => {
if (__DEV__) {
console.log('Received messages:', messages);
}
// ...处理消息
});
总结与未来展望
Electric为React Native应用提供了强大的实时数据同步能力,彻底解决了移动开发中的离线数据访问、实时更新和冲突解决等痛点问题。通过本文介绍的架构设计和实现方案,你可以构建出真正离线优先的移动应用,为用户提供流畅的体验,无论网络状况如何。
随着Electric 1.0的发布,其React Native生态将更加完善,包括:
- 官方React Native客户端库
- 与Watermelon DB的深度集成
- 内置离线冲突解决策略
- 性能监控工具
通过采用Electric+React Native的技术栈,你可以将更多精力放在业务逻辑和用户体验上,而非数据同步的细节实现。立即尝试将Electric集成到你的移动项目中,体验实时数据同步带来的开发效率提升和用户体验改善!
扩展学习资源
- Electric官方文档:https://electric-sql.***/docs
- 源代码仓库:https://gitcode.***/GitHub_Trending/el/electric
- React Native性能优化指南:https://reactnative.dev/docs/performance
- 离线优先架构设计模式:https://offlinefirst.org/
如果你觉得本文对你有帮助,请点赞、收藏并关注,下期我们将深入探讨Electric的高级特性和性能调优技巧!
【免费下载链接】electric electric-sql/electric: 这是一个用于查询数据库的JavaScript库,支持多种数据库。适合用于需要使用JavaScript查询数据库的场景。特点:易于使用,支持多种数据库,具有灵活的查询构建和结果处理功能。 项目地址: https://gitcode.***/GitHub_Trending/el/electric