Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add LogRecord table update callbacks #2159

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
import net.sourceforge.ganttproject.storage.ServerCommitResponse;
import net.sourceforge.ganttproject.task.CustomColumnsStorage;
import net.sourceforge.ganttproject.task.Task;
import net.sourceforge.ganttproject.task.event.TaskListenerAdapter;
import net.sourceforge.ganttproject.undo.GPUndoListener;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -208,10 +207,6 @@ public GanttProject(boolean isOnlyViewer) {
getWebSocket().register(null);
getWebSocket().onCommitResponseReceived(this::fireXlogReceived);
getWebSocket().onBaseTxnIdReceived(this::onBaseTxnIdReceived);
var taskListenerAdapter = new TaskListenerAdapter();
// TODO: add listeners sensibly.
taskListenerAdapter.setTaskAddedHandler(event -> this.sendProjectStateLogs());
getTaskManager().addTaskListener(taskListenerAdapter);
}

area = new GanttGraphicArea(this, getTaskManager(), getZoomManager(), getUndoManager(),
Expand Down Expand Up @@ -956,33 +951,62 @@ public void refresh() {
super.repaint();
}

// TODO: Accumulate changes instead of sending it every time.
private interface TxnSendListener {
void onSendCompleted();
}

private final AtomicReference<TxnSendListener> txnSendingListener = new AtomicReference<>();

private Unit sendProjectStateLogs() {
gpLogger.debug("Sending project state logs");
if (txnSendingListener.get() != null) return Unit.INSTANCE;
try {
var baseTxnCommitInfo = myBaseTxnCommitInfo.get();
var txns = myProjectDatabase.fetchTransactions(baseTxnCommitInfo.right + 1, 1);
if (!txns.isEmpty()) {
getWebSocket().sendLogs(new InputXlog(
baseTxnCommitInfo.left,
"userId",
"refid",
txns
));
var listener = new TxnSendListener() {
@Override
public void onSendCompleted() {
txnSendingListener.compareAndSet(this, null);
}
};
if (txnSendingListener.compareAndSet(null, listener)) {
npostnikova marked this conversation as resolved.
Show resolved Hide resolved
getWebSocket().sendLogs(new InputXlog(
baseTxnCommitInfo.left,
"userId",
"refid",
txns
));
}
}
} catch (ProjectDatabaseException e) {
gpLogger.error("Failed to send logs", new Object[]{}, ImmutableMap.of(), e);
dbarashev marked this conversation as resolved.
Show resolved Hide resolved
}
return Unit.INSTANCE;
}

@Override
protected Unit onProjectLogUpdate() {
if (isColloboqueLocalTest()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like that we have more and more these checks. I thought that we would have just a few differences between the "local test" and "prod" modes (url, authentication), but now it seems that we will just not do anything in prod.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this check? And maybe the super call too?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm
We perform websocket handshake (with base txn id & project refid exchange) only under this check.
If we remove the check, we'll log an error every time a new log record is added for an offline document, won't we?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, in this case we shall check "if we are working with an online document, in any sense", not "is it a local online document setup".

We probably need to modify the local dev server so that it could respond to project read and write operations, just like a regular GP Cloud server. In this case the check may look like if (project.document is OnlineDocument)

super.onProjectLogUpdate();
sendProjectStateLogs();
}
return Unit.INSTANCE;
}

private Unit fireXlogReceived(ServerCommitResponse response) {
myBaseTxnCommitInfo.update(response.getBaseTxnId(), response.getNewBaseTxnId(), 1);
txnSendingListener.get().onSendCompleted();
sendProjectStateLogs();
return Unit.INSTANCE;
}

private Unit onBaseTxnIdReceived(String baseTxnId) {
myBaseTxnCommitInfo.update("", baseTxnId, 0);
var listener = txnSendingListener.get();
// Websocket is [re-]started. Previous messages are discarded.
if (listener != null) listener.onSendCompleted();
sendProjectStateLogs();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears that the comment above (previous messages are discarded) and these two lines (sendCompleted, sendLogs) are kinda controversial. We obviously need to make sure that the state which we have locally matches the server state which corresponds to this baseTxnId. Until it is not done, we probably want to really discard the changes, that is, clear the state, whatever it is, but not do what we do in the normal success case.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!
Added a TODO for performing synchronization with the server. For now, logs are sent only when the server specifies that the project is empty.

return Unit.INSTANCE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ of the License, or (at your option) any later version.
import javafx.beans.property.SimpleIntegerProperty;
import javafx.beans.property.SimpleObjectProperty;
import javafx.collections.FXCollections;
import kotlin.Unit;
import kotlin.jvm.functions.Function0;
import net.sourceforge.ganttproject.chart.Chart;
import net.sourceforge.ganttproject.chart.ChartModelBase;
Expand Down Expand Up @@ -79,6 +80,8 @@ of the License, or (at your option) any later version.
import java.util.Map;
import java.util.function.Supplier;

import static biz.ganttproject.storage.cloud.GPCloudHttpImplKt.isColloboqueLocalTest;

/**
* This class is designed to be a GanttProject-after-refactorings. I am going to
* refactor GanttProject in order to make true view communicating with other
Expand Down Expand Up @@ -192,7 +195,10 @@ GPOptionGroup getTaskOptions() {

protected GanttProjectBase() {
super("GanttProject");
var databaseProxy = new LazyProjectDatabaseProxy(SqlProjectDatabaseImpl.Factory::createInMemoryDatabase, this::getTaskManager);
var databaseProxy = new LazyProjectDatabaseProxy(
() -> SqlProjectDatabaseImpl.Factory.createInMemoryDatabase(this::onProjectLogUpdate),
this::getTaskManager
);

myProjectDatabase = databaseProxy;
myTaskManagerConfig = new TaskManagerConfigImpl();
Expand Down Expand Up @@ -258,6 +264,9 @@ protected ParserFactory getParserFactory() {
protected GanttProjectImpl getProjectImpl() {
return myProjectImpl;
}

protected Unit onProjectLogUpdate() { return Unit.INSTANCE; }

@Override
public void restore(@NotNull Document fromDocument) throws Document.DocumentException, IOException {
GanttProjectImplKt.restoreProject(this, fromDocument, myProjectImpl.getListeners());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,33 @@ class SqlProjectDatabaseImpl(private val dataSource: DataSource) : ProjectDataba
dataSource.setURL(H2_IN_MEMORY_URL)
return SqlProjectDatabaseImpl(dataSource)
}

fun createInMemoryDatabase(logUpdateCallback: () -> Unit): ProjectDatabase {
val dataSource = JdbcDataSource()
dataSource.setURL(H2_IN_MEMORY_URL)
val database = SqlProjectDatabaseImpl(dataSource)
database.addLogUpdateCallback(logUpdateCallback)
return database
}
}

/** Queries which belong to the current transaction. Null if each statement should be committed separately. */
private var currentTxn: TransactionImpl? = null
private var localTxnId: Int = 1

private val logUpdateCallbacks: MutableList<() -> Unit> = mutableListOf()

/** Log update callbacks are invoked when a new log record is added. */
fun addLogUpdateCallback(listener: () -> Unit) = logUpdateCallbacks.add(listener)

private fun onLogUpdate() = logUpdateCallbacks.forEach {
try {
it.invoke()
} catch (e: Exception) {
LOG.error("Failed to execute update callback", e)
}
}

private fun <T> withDSL(
errorMessage: () -> String = { "Failed to execute query" },
body: (dsl: DSLContext) -> T
Expand Down Expand Up @@ -90,6 +111,7 @@ class SqlProjectDatabaseImpl(private val dataSource: DataSource) : ProjectDataba
}
}
}
onLogUpdate()
}

/** Add a query to the current txn. Executes immediately if no transaction started. */
Expand Down Expand Up @@ -155,6 +177,7 @@ class SqlProjectDatabaseImpl(private val dataSource: DataSource) : ProjectDataba
@Throws(ProjectDatabaseException::class)
internal fun commitTransaction(txn: TransactionImpl) {
try {
if (txn.statements.isEmpty()) return
executeAndLog(txn.statements, localTxnId)
localTxnId++ // Increment only on success.
} finally {
Expand Down