Skip to content
This repository has been archived by the owner on Apr 17, 2024. It is now read-only.

add inter-prcess and inter-thread file lock for sqlite #33

Merged
merged 2 commits into from
Mar 27, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.apache.spark.storage.pmof;

import java.nio.channels.FileLock;
import java.sql.Connection;
import org.sqlite.SQLiteConfig;
import java.sql.DatabaseMetaData;
Expand All @@ -12,10 +13,13 @@
import java.util.HashMap;
import java.util.Map;
import java.io.*;

public class PersistentMemoryMetaHandler {

private static String url = "jdbc:sqlite:/tmp/spark_shuffle_meta.db";
private static String fileLockPath = "/tmp/spark_shuffle_file.lock";

private static final File file = new File(fileLockPath);

PersistentMemoryMetaHandler(String root_dir) {
createTable(root_dir);
Expand All @@ -29,54 +33,99 @@ public void createTable(String root_dir) {
+ ");\n";

url = "jdbc:sqlite:" + root_dir + "/spark_shuffle_meta.db";

try {
Connection conn = DriverManager.getConnection(url);
Statement stmt = conn.createStatement();
stmt.execute(sql);

sql = "CREATE TABLE IF NOT EXISTS devices (\n"
synchronized (file) {
FileOutputStream fos = null;
try {
fos = new FileOutputStream(file);
FileLock fileLock = fos.getChannel().lock();
Connection conn = DriverManager.getConnection(url);
Statement stmt = conn.createStatement();
stmt.execute(sql);

sql = "CREATE TABLE IF NOT EXISTS devices (\n"
+ " device text UNIQUE,\n"
+ " mount_count int\n"
+ ");";
stmt.execute(sql);
conn.close();
} catch (SQLException e) {
System.out.println("createTable failed:" + e.getMessage());
stmt.execute(sql);
conn.close();
fos.close();
} catch (SQLException e) {
System.out.println("createTable failed:" + e.getMessage());
} catch (IOException e) {
e.printStackTrace();
} finally {
if (fos != null) {
try {
fos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
System.out.println("Metastore DB connected: " + url);
}

public void insertRecord(String shuffleId, String device) {
String sql = "INSERT OR IGNORE INTO metastore(shuffleId,device) VALUES('" + shuffleId + "','" + device + "')";

try {
SQLiteConfig config = new SQLiteConfig();
config.setBusyTimeout("30000");
Connection conn = DriverManager.getConnection(url, config.toProperties());
Statement stmt = conn.createStatement();
stmt.executeUpdate(sql);
conn.close();
} catch (SQLException e) {
System.err.println("insertRecord failed:" + e.getMessage());
System.exit(-1);
synchronized (file) {
FileOutputStream fos = null;
try {
fos = new FileOutputStream(file);
FileLock fileLock = fos.getChannel().lock();
SQLiteConfig config = new SQLiteConfig();
config.setBusyTimeout("30000");
Connection conn = DriverManager.getConnection(url);
Statement stmt = conn.createStatement();
stmt.executeUpdate(sql);
conn.close();
fos.close();
} catch (SQLException e) {
System.err.println("insertRecord failed:" + e.getMessage());
System.exit(-1);
} catch (IOException e) {
e.printStackTrace();
} finally {
if (fos != null) {
try {
fos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}

public String getShuffleDevice(String shuffleId){
String sql = "SELECT device FROM metastore where shuffleId = ?";
String res = "";

try {
Connection conn = DriverManager.getConnection(url);
PreparedStatement pstmt = conn.prepareStatement(sql);
pstmt.setString(1, shuffleId);
ResultSet rs = pstmt.executeQuery();
if (rs != null) {
res = rs.getString("device");
synchronized (file) {
FileOutputStream fos = null;
try {
fos = new FileOutputStream(file);
FileLock fileLock = fos.getChannel().lock();
Connection conn = DriverManager.getConnection(url);
PreparedStatement pstmt = conn.prepareStatement(sql);
pstmt.setString(1, shuffleId);
ResultSet rs = pstmt.executeQuery();
if (rs != null) {
res = rs.getString("device");
}
conn.close();
fos.close();
} catch (SQLException e) {
} catch (IOException e) {
e.printStackTrace();
} finally {
if (fos != null) {
try {
fos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
conn.close();
} catch (SQLException e) {
}
return res;
}
Expand All @@ -87,40 +136,53 @@ public String getUnusedDevice(ArrayList<String> full_device_list){
HashMap<String, Integer> device_count = new HashMap<String, Integer>();
String device = "";
int count;

try {
SQLiteConfig config = new SQLiteConfig();
config.setBusyTimeout("30000");
Connection conn = DriverManager.getConnection(url);
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(sql);
while (rs.next()) {
device_list.add(rs.getString("device"));
device_count.put(rs.getString("device"), rs.getInt("mount_count"));
}

full_device_list.removeAll(device_list);
if (full_device_list.size() == 0) {
// reuse old device, picked the device has smallest mount_count
device = getDeviceWithMinCount(device_count);
if (device == "") {
throw new SQLException();
synchronized (file) {
FileOutputStream fos = null;
try {
fos = new FileOutputStream(file);
FileLock fileLock = fos.getChannel().lock();
SQLiteConfig config = new SQLiteConfig();
config.setBusyTimeout("30000");
Connection conn = DriverManager.getConnection(url);
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(sql);
while (rs.next()) {
device_list.add(rs.getString("device"));
device_count.put(rs.getString("device"), rs.getInt("mount_count"));
}
count = (Integer)device_count.get(device) + 1;
sql = "UPDATE devices SET mount_count = " + count + " WHERE device = '" + device + "'\n";
} else {
device = full_device_list.get(0);
count = 1;
sql = "INSERT OR IGNORE INTO devices(device, mount_count) VALUES('" + device + "', " + count + ")\n";
}

System.out.println(sql);
full_device_list.removeAll(device_list);
if (full_device_list.size() == 0) {
// reuse old device, picked the device has smallest mount_count
device = getDeviceWithMinCount(device_count);
if (device == "") {
throw new SQLException();
}
count = (Integer) device_count.get(device) + 1;
sql = "UPDATE devices SET mount_count = " + count + " WHERE device = '" + device + "'\n";
} else {
device = full_device_list.get(0);
count = 1;
sql = "INSERT OR IGNORE INTO devices(device, mount_count) VALUES('" + device + "', " + count + ")\n";
}

stmt.executeUpdate(sql);
conn.close();
} catch (SQLException e) {
System.err.println("getUnusedDevice insert device " + device + "failed: " + e.getMessage());
System.exit(-1);
System.out.println(sql);

stmt.executeUpdate(sql);
conn.close();
fos.close();
} catch (SQLException e) {
System.err.println("getUnusedDevice insert device " + device + "failed: " + e.getMessage());
System.exit(-1);
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
fos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
System.out.println("Metastore DB: get unused device, should be " + device + ".");
return device;
Expand Down