refactor: 修改service层

1.修改service层语言为kotlin
2.使用kotlin中的协程代替原service中的线程池
This commit is contained in:
糕小菜 2024-10-25 23:52:33 +08:00
parent 971a6bace7
commit e81bbf516d
3 changed files with 121 additions and 211 deletions

View File

@ -66,6 +66,9 @@ dependencies {
implementation(libs.glide)
implementation(libs.lottie)
implementation(libs.kotlinx.serialization.json)
implementation(libs.kotlinx.coroutines.core)
implementation(libs.kotlinx.coroutines.android)
// implementation(libs.therouter)
// ksp(libs.therouter.ksp)
}

View File

@ -1,240 +1,144 @@
package com.kaixed.kchat.service;
package com.kaixed.kchat.service
import static com.kaixed.kchat.network.NetworkInterface.WEBSOCKET;
import static com.kaixed.kchat.network.NetworkInterface.WEBSOCKET_SERVER_URL;
import static com.kaixed.kchat.utils.Constants.MMKV_COMMON_DATA;
import android.app.Service
import android.content.Intent
import android.os.Binder
import android.os.IBinder
import android.util.Log
import androidx.lifecycle.LiveData
import androidx.lifecycle.MutableLiveData
import com.google.gson.Gson
import com.kaixed.kchat.database.ObjectBox
import com.kaixed.kchat.database.UserManager
import com.kaixed.kchat.database.entity.Messages
import com.kaixed.kchat.network.NetworkInterface.WEBSOCKET
import com.kaixed.kchat.network.NetworkInterface.WEBSOCKET_SERVER_URL
import com.kaixed.kchat.network.OkhttpHelper
import com.kaixed.kchat.utils.Constants.MMKV_COMMON_DATA
import com.tencent.mmkv.MMKV
import io.objectbox.Box
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers.IO
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import okhttp3.Request
import okhttp3.Response
import okhttp3.WebSocket
import okhttp3.WebSocketListener
import android.app.Service;
import android.content.Intent;
import android.os.Binder;
import android.os.IBinder;
import android.util.Log;
class WebSocketService : Service() {
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import androidx.lifecycle.LiveData;
import androidx.lifecycle.MutableLiveData;
private val TAG = "WebSocketService"
private val HEARTBEAT_ACK = "heartbeat_ack"
private lateinit var messagesBox: Box<Messages>
private lateinit var username: String
private val binder = LocalBinder()
private val messagesMutableLiveData = MutableLiveData<Messages>()
private val HEART_BEAT_RATE = 3000L
private var webSocket: WebSocket? = null
private var heartbeatJob: Job? = null
import com.google.gson.Gson;
import com.kaixed.kchat.database.ObjectBox;
import com.kaixed.kchat.database.UserManager;
import com.kaixed.kchat.database.entity.Messages;
import com.kaixed.kchat.network.OkhttpHelper;
import com.tencent.mmkv.MMKV;
private val serviceJob = Job()
private val serviceScope = CoroutineScope(IO + serviceJob)
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
val liveData: LiveData<Messages> get() = messagesMutableLiveData
import io.objectbox.Box;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
/**
* WebSocket 服务类
*
* @author hui
*/
public class WebSocketService extends Service {
private static final String TAG = "WebSocketService";
private static final String HEARTBEAT_ACK = "heartbeat_ack";
private static final long HEART_BEAT_RATE = 3000;
private WebSocket webSocket;
private ScheduledThreadPoolExecutor heartbeatExecutor;
private Box<Messages> messagesBox;
private String username;
private final IBinder binder = new LocalBinder();
private final MutableLiveData<Messages> messagesMutableLiveData = new MutableLiveData<>();
public LiveData<Messages> getLiveData() {
return messagesMutableLiveData;
}
private static class CustomThreadFactory implements ThreadFactory {
private int counter = 0;
private final String namePrefix;
public CustomThreadFactory(String namePrefix) {
this.namePrefix = namePrefix;
}
@Override
public Thread newThread(Runnable r) {
return new Thread(r, namePrefix + "-thread-" + counter++);
inner class LocalBinder : Binder() {
fun getService(): WebSocketService {
return this@WebSocketService
}
}
private final ExecutorService executorService = new ThreadPoolExecutor(
4,
4,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
new CustomThreadFactory("WebsocketThread")
);
override fun onBind(intent: Intent?): IBinder {
return binder
}
override fun onCreate() {
super.onCreate()
messagesBox = ObjectBox.get().boxFor(Messages::class.java)
username = UserManager.getUsername()
}
public class LocalBinder extends Binder {
public WebSocketService getService() {
return WebSocketService.this;
override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int {
establishConnection()
return START_STICKY
}
fun sendMessage(jsonObject: String, msgLocalId: Long) {
webSocket?.let {
it.send(jsonObject)
val kv = MMKV.mmkvWithID(MMKV_COMMON_DATA)
kv.putLong("msgLocalId", msgLocalId)
}
}
@Nullable
@Override
public IBinder onBind(Intent intent) {
return binder;
private fun establishConnection() {
val request = Request.Builder().url("$WEBSOCKET_SERVER_URL$WEBSOCKET$username").build()
val listener = EchoWebSocketListener()
val client = OkhttpHelper.getInstance()
webSocket = client.newWebSocket(request, listener)
startHeartbeat()
}
@Override
public void onCreate() {
super.onCreate();
// 初始化本地存储及其他配置
messagesBox = ObjectBox.INSTANCE.get().boxFor(Messages.class);
username = UserManager.INSTANCE.getUsername();
// 创建并配置心跳执行器
initializeHeartbeatExecutor();
}
private void initializeHeartbeatExecutor() {
ThreadFactory namedThreadFactory = new CustomThreadFactory("WebSocketServiceHeartbeat");
heartbeatExecutor = new ScheduledThreadPoolExecutor(1, namedThreadFactory);
}
@Override
public int onStartCommand(Intent intent, int flags, int startId) {
establishConnection();
return START_STICKY;
}
public void sendMessage(String jsonObject) {
if (webSocket != null) {
Log.d("haha", jsonObject);
webSocket.send(jsonObject);
MMKV kv = MMKV.mmkvWithID(MMKV_COMMON_DATA);
}
}
public void sendMessage(String jsonObject, Long msgLocalId) {
if (webSocket != null) {
Log.d("haha", jsonObject);
webSocket.send(jsonObject);
MMKV kv = MMKV.mmkvWithID(MMKV_COMMON_DATA);
kv.putLong("msgLocalId", msgLocalId);
}
}
private void establishConnection() {
Request request = new Request.Builder().url(WEBSOCKET_SERVER_URL + WEBSOCKET + username).build();
EchoWebSocketListener listener = new EchoWebSocketListener();
OkHttpClient client = OkhttpHelper.INSTANCE.getInstance();
webSocket = client.newWebSocket(request, listener);
// 启动定时心跳任务
if (heartbeatExecutor.isShutdown() || heartbeatExecutor.isTerminated()) {
initializeHeartbeatExecutor();
private inner class EchoWebSocketListener : WebSocketListener() {
override fun onOpen(webSocket: WebSocket, response: Response) {
Log.d(TAG, "WebSocket Opened")
}
heartbeatExecutor.scheduleWithFixedDelay(() -> {
if (webSocket != null) {
webSocket.send("heartbeat");
}
}, 0, HEART_BEAT_RATE, TimeUnit.MILLISECONDS);
}
private class EchoWebSocketListener extends WebSocketListener {
@Override
public void onOpen(@NonNull WebSocket webSocket, @NonNull Response response) {
Log.d(TAG, "WebSocket Opened");
}
@Override
public void onMessage(@NonNull WebSocket webSocket, @NonNull String text) {
if (HEARTBEAT_ACK.equals(text)) {
// Log.d(TAG, "Received heartbeat ack from server");
return;
override fun onMessage(webSocket: WebSocket, text: String) {
if (HEARTBEAT_ACK == text) {
return
}
// Log.d(TAG, "Received message: " + text);
Gson gson = new Gson();
Messages messages = gson.fromJson(text, Messages.class);
messages.setTakerId(messages.getSenderId());
val messages = Gson().fromJson(text, Messages::class.java)
messages.takerId = messages.senderId
messagesMutableLiveData.postValue(messages)
messagesMutableLiveData.postValue(messages);
// 使用线程池处理消息存储
executorService.submit(() -> {
if ("ack".equals(messages.getType())) {
Messages existingMessage = messagesBox.get(messages.getMsgLocalId());
if (existingMessage != null) {
existingMessage.setTimestamp(messages.getTimestamp());
existingMessage.setMsgSvrId(messages.getMsgSvrId());
messagesBox.put(existingMessage);
} else {
messagesBox.put(messages);
serviceScope.launch {
if ("ack" == messages.type) {
val existingMessage = messagesBox.get(messages.msgLocalId)
existingMessage?.let {
it.timestamp = messages.timestamp
it.msgSvrId = messages.msgSvrId
messagesBox.put(it)
} ?: run {
messagesBox.put(messages)
}
} else {
messagesBox.put(messages);
messagesBox.put(messages)
}
Log.d(TAG, "Message stored: " + messages);
});
}
@Override
public void onClosing(@NonNull WebSocket webSocket, int code, @NonNull String reason) {
webSocket.close(1000, null);
Log.d(TAG, "WebSocket closing: " + reason);
establishConnection();
}
@Override
public void onFailure(@NonNull WebSocket webSocket, @NonNull Throwable t, @Nullable Response response) {
// Log.d(TAG, "WebSocket onFailure: " + t.getMessage());
establishConnection();
}
}
@Override
public void onDestroy() {
super.onDestroy();
if (webSocket != null) {
webSocket.close(1000, "App exited");
}
if (heartbeatExecutor != null && !heartbeatExecutor.isShutdown()) {
heartbeatExecutor.shutdown();
try {
if (!heartbeatExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
heartbeatExecutor.shutdownNow();
}
} catch (InterruptedException e) {
heartbeatExecutor.shutdownNow();
Thread.currentThread().interrupt();
Log.d(TAG, "Message stored: $messages")
}
}
shutdownExecutorService();
}
private void shutdownExecutorService() {
executorService.shutdown();
try {
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
executorService.shutdownNow();
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
Log.e(TAG, "ExecutorService did not terminate");
}
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
webSocket.close(1000, null)
Log.d(TAG, "WebSocket closing: $reason")
establishConnection()
}
override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
establishConnection()
}
}
}
private fun startHeartbeat() {
heartbeatJob = CoroutineScope(IO).launch {
while (isActive) {
webSocket?.send("heartbeat")
delay(HEART_BEAT_RATE)
}
}
}
override fun onDestroy() {
super.onDestroy()
serviceJob.cancel()
heartbeatJob?.cancel()
webSocket?.close(1000, "App exited")
}
}

View File

@ -7,6 +7,7 @@ junit = "4.13.2"
junitVersion = "1.2.1"
espressoCore = "3.6.1"
appcompat = "1.7.0"
kotlinxCoroutinesCore = "1.7.3"
kotlinxSerializationJson = "1.6.3"
lottie = "6.5.2"
material = "1.12.0"
@ -24,6 +25,8 @@ objectbox = "4.0.2"
[libraries]
androidx-core-ktx = { group = "androidx.core", name = "core-ktx", version.ref = "coreKtx" }
kotlinx-coroutines-android = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-android", version.ref = "kotlinxCoroutinesCore" }
kotlinx-coroutines-core = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-core", version.ref = "kotlinxCoroutinesCore" }
therouter-ksp = { module = "cn.therouter:apt", version.ref = "therouter" }
objectbox-android-objectbrowser = { group = "io.objectbox", name = "objectbox-android-objectbrowser", version.ref = "objectbox" }
objectbox-android = { group = "io.objectbox", name = "objectbox-android", version.ref = "objectbox" }